上篇文章讲解了自定义通信协议,本章节介绍如何支持多种协议。

会构建一个Server,同时支持Cat,Dog和People通信协议。有二种实现方式:

第一种方式利用了自定义协议,传递消息的时候,对消息的前几位(比如2位)进行自定义的位置(比如AB)解码器解析的时候前二位为AB表示一种协议类型,CD一种协议类型。这种方式没有利用protobuf,而是直接使用Netty自定义协议来解决的方案。

第二种方式使用protobuf来实现,实际上是对消息的定义方式进行规定,因为netty本身,客户端和服务器端建立的是一条TCP连接,一方必须要判断对方发送过来的对象是什么类型。

Protocol Buffers实现netty的多种传输协议

我们知道使用Protocol Buffers首先定义一个.proto文件

定义一个最外层的消息,最外层的消息(MyMessage)包含了所有传递的消息类型,所有的消息类型嵌套在最外层的消息类型中,每次传递都将传递具体消息类型(以最外层消息类型的枚举类型传递)

syntax ="proto2";

package com.zhihao.miao.netty.sixthexample;

option optimize_for = SPEED;

option java_package = "com.zhihao.miao.netty.seventhexample";

option java_outer_classname="MyDataInfo";

message MyMessage {

enum DataType{

PeopleType = 1;

DogType = 2;

CatType = 3;

}

required DataType data_type = 1;

//oneof的意思:如果有多个可选字段,在某一个时刻只能只有一个值被设置,可以节省内存空间

oneof dataBody {

People people = 2;

Dog dog = 3;

Cat cat = 4;

}

}

message People{

optional string name = 1;

optional int32 age = 2;

optional string address = 3;

}

message Dog{

optional string name = 1;

optional string age = 2;

}

message Cat{

optional string name = 1;

optional string city = 2;

}

使用编译器编译生成代码

protoc --java_out=src/main/java src/protobuf/People.proto

关于proto协议中的Oneof含义,如果有多个可选字段,在某一个时刻只能只有一个值被设置,官方链接,生成MyDataInfo类,类代码太多,这边不贴出了

服务端代码:

package com.zhihao.miao.netty.seventhexample;

import io.netty.bootstrap.ServerBootstrap;

import io.netty.channel.ChannelFuture;

import io.netty.channel.EventLoopGroup;

import io.netty.channel.nio.NioEventLoopGroup;

import io.netty.channel.socket.nio.NioServerSocketChannel;

import io.netty.handler.logging.LogLevel;

import io.netty.handler.logging.LoggingHandler;

public class TestServer {

public static void main(String[] args) throws Exception{

EventLoopGroup bossGroup = new NioEventLoopGroup();

EventLoopGroup wokerGroup = new NioEventLoopGroup();

try{

ServerBootstrap serverBootstrap = new ServerBootstrap();

serverBootstrap.group(bossGroup,wokerGroup).channel(NioServerSocketChannel.class)

.handler(new LoggingHandler(LogLevel.INFO))

.childHandler(new TestServerInitializer());

ChannelFuture channelFuture = serverBootstrap.bind(8888).sync();

channelFuture.channel().closeFuture().sync();

}finally {

bossGroup.shutdownGracefully();

wokerGroup.shutdownGracefully();

}

}

}

服务端初始化链接:

package com.zhihao.miao.netty.seventhexample;

import io.netty.channel.ChannelInitializer;

import io.netty.channel.ChannelPipeline;

import io.netty.channel.socket.SocketChannel;

import io.netty.handler.codec.protobuf.ProtobufDecoder;

import io.netty.handler.codec.protobuf.ProtobufEncoder;

import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;

import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;

public class TestServerInitializer extends ChannelInitializer{

@Override

protected void initChannel(SocketChannel ch) throws Exception {

ChannelPipeline pipeline = ch.pipeline();

pipeline.addLast(new ProtobufVarint32FrameDecoder());

//使用最外层的消息实例

pipeline.addLast(new ProtobufDecoder(MyDataInfo.MyMessage.getDefaultInstance()));

pipeline.addLast(new ProtobufVarint32LengthFieldPrepender());

pipeline.addLast(new ProtobufEncoder());

pipeline.addLast(new TestServerHandler());

}

}

其实实现的关键就在于此,使用MyDataInfo.MyMessage实列(MyDataInfo.MyMessage是枚举类型),而我们定义的三种对象刚好就是其枚举对象

自定义的服务端的Handler,根据通道中传递数据的不同dataType值来解析程具体的类型:

package com.zhihao.miao.netty.seventhexample;

import io.netty.channel.ChannelHandlerContext;

import io.netty.channel.SimpleChannelInboundHandler;

public class TestServerHandler extends SimpleChannelInboundHandler {

@Override

protected void channelRead0(ChannelHandlerContext ctx, MyDataInfo.MyMessage msg) throws Exception {

MyDataInfo.MyMessage.DataType dataType = msg.getDataType();

if(dataType == MyDataInfo.MyMessage.DataType.PeopleType){

MyDataInfo.People people = msg.getPeople();

System.out.println(people.getName());

System.out.println(people.getAge());

System.out.println(people.getAddress());

}else if(dataType == MyDataInfo.MyMessage.DataType.DogType){

MyDataInfo.Dog dog = msg.getDog();

System.out.println(dog.getName());

System.out.println(dog.getAge());

}else if(dataType == MyDataInfo.MyMessage.DataType.CatType){

MyDataInfo.Cat cat = msg.getCat();

System.out.println(cat.getName());

System.out.println(cat.getCity());

}

}

}

客户端代码:

package com.zhihao.miao.netty.seventhexample;

import io.netty.bootstrap.Bootstrap;

import io.netty.channel.ChannelFuture;

import io.netty.channel.EventLoopGroup;

import io.netty.channel.nio.NioEventLoopGroup;

import io.netty.channel.socket.nio.NioSocketChannel;

public class TestClient {

public static void main(String[] args) throws Exception{

EventLoopGroup eventLoopGroup = new NioEventLoopGroup();

try{

Bootstrap bootstrap = new Bootstrap();

bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class)

.handler(new TestClientInitializer());

ChannelFuture channelFuture = bootstrap.connect("localhost",8888).sync();

channelFuture.channel().closeFuture().sync();

}finally {

eventLoopGroup.shutdownGracefully();

}

}

}

客户端的初始化链接:

package com.zhihao.miao.netty.seventhexample;

import io.netty.channel.ChannelInitializer;

import io.netty.channel.ChannelPipeline;

import io.netty.channel.socket.SocketChannel;

import io.netty.handler.codec.protobuf.ProtobufDecoder;

import io.netty.handler.codec.protobuf.ProtobufEncoder;

import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;

import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;

public class TestClientInitializer extends ChannelInitializer {

@Override

protected void initChannel(SocketChannel ch) throws Exception {

ChannelPipeline pipeline = ch.pipeline();

pipeline.addLast(new ProtobufVarint32FrameDecoder());

//使用最外层的消息实例

pipeline.addLast(new ProtobufDecoder(MyDataInfo.MyMessage.getDefaultInstance()));

pipeline.addLast(new ProtobufVarint32LengthFieldPrepender());

pipeline.addLast(new ProtobufEncoder());

pipeline.addLast(new TestClientHandler());

}

}

自定义处理器端的handler,随机发送不同协议的数据:

package com.zhihao.miao.netty.seventhexample;

import io.netty.channel.ChannelHandlerContext;

import io.netty.channel.SimpleChannelInboundHandler;

import java.util.Random;

public class TestClientHandler extends SimpleChannelInboundHandler {

@Override

protected void channelRead0(ChannelHandlerContext ctx, MyDataInfo.MyMessage msg) throws Exception {

}

//客户端像服务器端发送数据

public void channelActive(ChannelHandlerContext ctx) throws Exception {

int randomInt = new Random().nextInt(3);

MyDataInfo.MyMessage myMessage = null;

if(0 == randomInt){

myMessage = MyDataInfo.MyMessage.newBuilder().

setDataType(MyDataInfo.MyMessage.DataType.PeopleType).

setPeople(MyDataInfo.People.newBuilder().setName("张三").

setAddress("上海").setAge(26).build()).build();

}else if(1 == randomInt){

myMessage = MyDataInfo.MyMessage.newBuilder().

setDataType(MyDataInfo.MyMessage.DataType.DogType).

setDog(MyDataInfo.Dog.newBuilder().setName("旺财")

.setAge("2").build()).build();

}else if(2 == randomInt){

myMessage = MyDataInfo.MyMessage.newBuilder().

setDataType(MyDataInfo.MyMessage.DataType.CatType).

setCat(MyDataInfo.Cat.newBuilder().setName("汤姆")

.setCity("上海").build()).build();

}

ctx.channel().writeAndFlush(myMessage);

}

}

启动服务器端,然后启动客户端多执行几次,服务器的控制台显示:

七月 05, 2017 10:10:37 下午 io.netty.handler.logging.LoggingHandler channelRead

信息: [id: 0xd5f957bd, L:/0:0:0:0:0:0:0:0:8888] READ: [id: 0x82a26e9f, L:/127.0.0.1:8888 - R:/127.0.0.1:51777]

七月 05, 2017 10:10:37 下午 io.netty.handler.logging.LoggingHandler channelReadComplete

信息: [id: 0xd5f957bd, L:/0:0:0:0:0:0:0:0:8888] READ COMPLETE

汤姆

上海

七月 05, 2017 10:11:38 下午 io.netty.handler.logging.LoggingHandler channelRead

信息: [id: 0xd5f957bd, L:/0:0:0:0:0:0:0:0:8888] READ: [id: 0x128da3e7, L:/127.0.0.1:8888 - R:/127.0.0.1:52049]

七月 05, 2017 10:11:38 下午 io.netty.handler.logging.LoggingHandler channelReadComplete

信息: [id: 0xd5f957bd, L:/0:0:0:0:0:0:0:0:8888] READ COMPLETE

张三

26

上海

七月 05, 2017 10:11:49 下午 io.netty.handler.logging.LoggingHandler channelRead

信息: [id: 0xd5f957bd, L:/0:0:0:0:0:0:0:0:8888] READ: [id: 0xa8220c73, L:/127.0.0.1:8888 - R:/127.0.0.1:52097]

七月 05, 2017 10:11:49 下午 io.netty.handler.logging.LoggingHandler channelReadComplete

信息: [id: 0xd5f957bd, L:/0:0:0:0:0:0:0:0:8888] READ COMPLETE

汤姆

上海

七月 05, 2017 10:11:55 下午 io.netty.handler.logging.LoggingHandler channelRead

信息: [id: 0xd5f957bd, L:/0:0:0:0:0:0:0:0:8888] READ: [id: 0x9ac52ec1, L:/127.0.0.1:8888 - R:/127.0.0.1:52125]

七月 05, 2017 10:11:55 下午 io.netty.handler.logging.LoggingHandler channelReadComplete

信息: [id: 0xd5f957bd, L:/0:0:0:0:0:0:0:0:8888] READ COMPLETE

张三

26

上海

七月 05, 2017 10:12:07 下午 io.netty.handler.logging.LoggingHandler channelRead

信息: [id: 0xd5f957bd, L:/0:0:0:0:0:0:0:0:8888] READ: [id: 0x797d03b6, L:/127.0.0.1:8888 - R:/127.0.0.1:52178]

七月 05, 2017 10:12:07 下午 io.netty.handler.logging.LoggingHandler channelReadComplete

信息: [id: 0xd5f957bd, L:/0:0:0:0:0:0:0:0:8888] READ COMPLETE

旺财

2

使用netty实现多种传输协议

官网类似的demo,自己写了很长也参考了官网才写出这个demo,对netty的理解又加深了:

三种协议实体类:

Person协议

package com.zhihao.miao.test.day10;

public class Person {

private String username;

private int age;

//get set方法

}

Dog协议

package com.zhihao.miao.test.day10;

public class Dog {

private String name;

private String age;

//get set方法

}

Cat协议

package com.zhihao.miao.test.day10;

public class Cat {

private String name;

private String city;

//get set方法

}

服务端:

import io.netty.bootstrap.ServerBootstrap;

import io.netty.channel.Channel;

import io.netty.channel.ChannelOption;

import io.netty.channel.EventLoopGroup;

import io.netty.channel.nio.NioEventLoopGroup;

import io.netty.channel.socket.nio.NioServerSocketChannel;

import io.netty.handler.logging.LogLevel;

import io.netty.handler.logging.LoggingHandler;

public class MultiServer {

public static void main(String args[]) throws Exception {

EventLoopGroup bossGroup = new NioEventLoopGroup(1);

EventLoopGroup workerGroup = new NioEventLoopGroup();

ServerBootstrap serverBootstrap = new ServerBootstrap();

// 指定socket的一些属性

serverBootstrap.option(ChannelOption.SO_BACKLOG, 1024);

serverBootstrap.group(bossGroup, workerGroup)

.channel(NioServerSocketChannel.class) // 指定是一个NIO连接通道

.handler(new LoggingHandler(LogLevel.INFO))

.childHandler(new ServerChannelInitializer());

// 绑定对应的端口号,并启动开始监听端口上的连接

Channel ch = serverBootstrap.bind(8899).sync().channel();

// 等待关闭,同步端口

ch.closeFuture().sync();

}

}

服务器端初始化lInitializer

import io.netty.channel.ChannelInitializer;

import io.netty.channel.ChannelPipeline;

import io.netty.channel.socket.SocketChannel;

public class ServerChannelInitializer extends ChannelInitializer {

@Override

protected void initChannel(SocketChannel ch) throws Exception {

ChannelPipeline pipeline = ch.pipeline();

//解析handler

pipeline.addLast(new ServlerDecoder());

pipeline.addLast(new TestServerHandler());

}

}

服务端解码器Handler,如果解析的位置数据是0则按照 Person协议进行解码,如果传递的位置数据是1,则按照Dog协议进行解码,如果传递的位置数据是2,则按照Cat协议进行解码:

public class ServlerDecoder extends ByteToMessageDecoder {

@Override

protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) throws Exception {

int flag = in.readInt();

if(flag == 0){

int usernamelength = in.readInt();

byte[] usernamebys = new byte[usernamelength];

in.readBytes(usernamebys);

String username = new String(usernamebys);

int age = in.readInt();

Person pserson = new Person();

pserson.setUsername(username);

pserson.setAge(age);

out.add(pserson);

}

if(flag ==1){

int namelength =in.readInt();

byte[] namebys = new byte[namelength];

in.readBytes(namebys);

String name = new String(namebys);

byte[] agebys = new byte[in.readableBytes()];

in.readBytes(agebys);

String age = new String(agebys);

Dog dog = new Dog();

dog.setName(name);

dog.setAge(age);

out.add(dog);

}

if(flag ==2){

int namelength = in.readInt();

byte[] nameByte = new byte[namelength];

in.readBytes(nameByte);

String name = new String(nameByte);

byte[] colorbys = new byte[in.readableBytes()];

in.readBytes(colorbys);

String color = new String(colorbys);

Cat cat = new Cat();

cat.setName(name);

cat.setColor(color);

out.add(cat);

}

}

自定义服务器端Handler:

import io.netty.channel.ChannelHandlerContext;

import io.netty.channel.SimpleChannelInboundHandler;

public class TestServerHandler extends SimpleChannelInboundHandler {

@Override

protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {

if(msg instanceof Person){

System.out.println(((Person) msg).getUsername());

System.out.println(((Person) msg).getAge());

}

if(msg instanceof Dog){

System.out.println(((Dog) msg).getName());

System.out.println(((Dog) msg).getAge());

}

if(msg instanceof Cat){

System.out.println(((Cat) msg).getName());

System.out.println(((Cat) msg).getColor());

}

}

}

客户端:

import io.netty.bootstrap.Bootstrap;

import io.netty.channel.Channel;

import io.netty.channel.EventLoopGroup;

import io.netty.channel.nio.NioEventLoopGroup;

import io.netty.channel.socket.nio.NioSocketChannel;

public class MultiClient {

public static void main(String[] args) throws Exception {

EventLoopGroup group = new NioEventLoopGroup();

Bootstrap b = new Bootstrap();

b.group(group).channel(NioSocketChannel.class).handler(new ClientChannelInitializer());

// Start the connection attempt.

Channel ch = b.connect("127.0.0.1", 8899).sync().channel();

ch.flush();

}

}

客户端初始化Initializer

import io.netty.channel.ChannelInitializer;

import io.netty.channel.ChannelPipeline;

import io.netty.channel.socket.SocketChannel;

import java.util.Random;

public class ClientChannelInitializer extends ChannelInitializer {

@Override

protected void initChannel(SocketChannel ch) throws Exception {

ChannelPipeline pipeline = ch.pipeline();

int randomInt = new Random().nextInt(3);

/**

* 编码动作,如果随机参数是1,则传输Person协议,如果随机参数是2,则传递Dog协议,

* 如果随机参数是3,则传递Cat协议

*

* Person协议就是传递一个标识位为0,然后将Person对象编码成二进制传输

* Dog协议传递一个标识位为1,然后将Dog对象编码成二进制进行传输

* Cat协议传递一个标识为2,然后将Cat对象编码成二进制进行传输

*/

if(0 == randomInt){

pipeline.addLast(new PersonEncoder());

Person person = new Person();

person.setUsername("zhihao");

person.setAge(27);

pipeline.addLast(new TestClientHandler(person));

}

if(1 == randomInt){

pipeline.addLast(new DogEncoder());

Dog dog = new Dog();

dog.setName("wangcai");

dog.setAge("2");

pipeline.addLast(new TestClientHandler(dog));

}

if(2 == randomInt){

pipeline.addLast(new CatEncoder());

Cat cat = new Cat();

cat.setName("maomi");

cat.setColor("yellow");

pipeline.addLast(new TestClientHandler(cat));

}

}

}

三种自定义编码协议,与服务器端进行对应传输Person数据的时候,在Person数据之前加上标识位置数据0,在Dog数据之前加上标识位置数据1,在Cat数据之前加上标识位置数据2,然后将其与本身的数据一起编码成二进制进行传输。

import io.netty.buffer.ByteBuf;

import io.netty.channel.ChannelHandlerContext;

import io.netty.handler.codec.MessageToByteEncoder;

public class PersonEncoder extends MessageToByteEncoder {

@Override

protected void encode(ChannelHandlerContext ctx, Person msg, ByteBuf out) throws Exception {

String username = msg.getUsername();

int usernamelength = username.length();

int age = msg.getAge();

out.writeInt(0); //标识位

out.writeInt(usernamelength);

out.writeBytes(username.getBytes());

out.writeInt(age);

}

}

Dog协议编码器

import io.netty.buffer.ByteBuf;

import io.netty.channel.ChannelHandlerContext;

import io.netty.handler.codec.MessageToByteEncoder;

public class DogEncoder extends MessageToByteEncoder {

@Override

protected void encode(ChannelHandlerContext ctx, Dog msg, ByteBuf out) throws Exception {

String name = msg.getName();

int namelength = name.length();

String age = msg.getAge();

out.writeInt(1); //标识位

out.writeInt(namelength);

out.writeBytes(name.getBytes());

out.writeBytes(age.getBytes());

}

}

Cat协议编码器:

import io.netty.buffer.ByteBuf;

import io.netty.channel.ChannelHandlerContext;

import io.netty.handler.codec.MessageToByteEncoder;

public class CatEncoder extends MessageToByteEncoder {

@Override

protected void encode(ChannelHandlerContext ctx, Cat msg, ByteBuf out) throws Exception {

String name = msg.getName();

int namelength = name.length();

String color = msg.getColor();

out.writeInt(2); //标识位

out.writeInt(namelength);

out.writeBytes(name.getBytes());

out.writeBytes(color.getBytes());

}

}

自定义客户端处理器:

import io.netty.channel.ChannelHandlerContext;

import io.netty.channel.ChannelInboundHandlerAdapter;

public class TestClientHandler extends ChannelInboundHandlerAdapter {

private Person person;

private Cat cat;

private Dog dog;

public TestClientHandler(Person person){

this.person = person;

}

public TestClientHandler(Dog dog){

this.dog = dog;

}

public TestClientHandler(Cat cat){

this.cat =cat;

}

@Override

public void channelActive(ChannelHandlerContext ctx) throws Exception {

if(person != null){

ctx.channel().writeAndFlush(person);

}

if(dog != null){

ctx.channel().writeAndFlush(dog);

}

if(cat != null){

ctx.channel().writeAndFlush(cat);

}

}

}

启动服务端,再多次启动客户端,服务器控制台打印出不同协议传输的结果

maomi

yellow

十月 15, 2017 4:33:43 下午 io.netty.handler.logging.LoggingHandler channelRead

信息: [id: 0x930eab24, L:/0:0:0:0:0:0:0:0:8899] READ: [id: 0xf40f7b07, L:/127.0.0.1:8899 - R:/127.0.0.1:57879]

十月 15, 2017 4:33:43 下午 io.netty.handler.logging.LoggingHandler channelReadComplete

信息: [id: 0x930eab24, L:/0:0:0:0:0:0:0:0:8899] READ COMPLETE

wangcai

2

十月 15, 2017 4:33:48 下午 io.netty.handler.logging.LoggingHandler channelRead

信息: [id: 0x930eab24, L:/0:0:0:0:0:0:0:0:8899] READ: [id: 0x3384f158, L:/127.0.0.1:8899 - R:/127.0.0.1:57914]

十月 15, 2017 4:33:48 下午 io.netty.handler.logging.LoggingHandler channelReadComplete

信息: [id: 0x930eab24, L:/0:0:0:0:0:0:0:0:8899] READ COMPLETE

zhihao

27

demo链接

Previous Post 拣大白菜贴士