author = “pikachu” title = “Netty实战” date = “2022-07-24” description = " 简单netty通信案例 " draft = false tags = [ “netty” ] categories = [ “it”, “中间件” ]
Netty实战
个人demo:
大佬demo:
前言
这几天进行了情报部通信相关的开发,想要Netty加入到开发过程中,在整合到过程出现了一些难以解决的问题,情报板发送分为四个阶段:
- 发送删除旧的情报板文件请求
- 发送创建新的文件请求
- 发送文件内容
- 发送播放文件的请求
按照传统的IO模型,我们可以每次发送socket后阻塞等待返回,以确定是否进行下一步操作,但是采用Netty后我就有点懵了,因为Netty本身定位为非阻塞型IO,不知道如何进行阻塞某一个的结果并根据结果确定下一步执行,本次Demo通过Netty的Handler + Promise实现上述内容。
原来的情报板实现发送的逻辑如下:
- 接收到Kafka消息后,启动新线程去执行情报板发送
- 在发送情报板的Service类中,BIO Socket作为static静态变量进行通信(这意味着多线程下会共享同个Socket,数据的写入将会出现无法预期的混乱情况(并发写入Socket),并且发布情报板的四个阶段也会因为并发操作而出现问题。)
解决方案:
- 采用Netty NIO框架
- 单线程池替换new Thread的方式 -> 采用可缓存线程池(多线程还是需要开启,因为对多个情报板同时更新是完全没问题的,单线程无法保证其最大并发)
- 对情报板的四步操作加锁保证其原子性,并且为了提高并发,降低锁的粒度,对设备IP进行加锁,即保证同个情报板的更新是同步的。
疑惑:
- Netty的NIO在代码层面到底具体在哪部分?
目前的猜测,传统的IO操作,我们直接对socket操作,并耦合业务代码,这导致我们的工作线程需要阻塞等待IO,而Netty的业务我们可以封装到handler中,通过channel异步写入数据后,后续的业务操作由Netty专有线程池控制Handler进行操作,当前工作线程可以返回。由于情报板的更新有序性及操作原子性特性,所以需要对情报板加锁处理,保证当前操作完成后进行下一次的更新。
依赖
- 该springboot依赖包含netty-all依赖包
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
客户端
1. 客户端服务启动类
public class NettyClient {
public static void start() throws InterruptedException {
NioEventLoopGroup nioEventLoopGroup = new NioEventLoopGroup();
// promise用于获取业务的执行结果
EventExecutor eventExecutor = new DefaultEventExecutor();
Promise<Integer> promise = new DefaultPromise<>(eventExecutor);
Bootstrap bootstrap = new Bootstrap();
bootstrap
.group(nioEventLoopGroup)
.channel(NioSocketChannel.class)
.option(ChannelOption.SO_KEEPALIVE, true)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline.addLast(new ClientDecoder());
pipeline.addLast(new ClientByteEncoder());
pipeline.addLast(new ClientHandler(promise));
}
});
// channel无法序列化,所以不能直接传输
Channel channel = bootstrap.connect("127.0.0.1", 8090).await().channel();
OperateDto operateDto = OperateDto.builder()
.operate("1")
.content("这是client的1")
.build();
channel.writeAndFlush(operateDto);
System.out.println("发送内容:" + operateDto);
try {
Integer integer = promise.await().get();
System.out.println("最终执行结果:" + integer);
} catch (ExecutionException e) {
System.out.println("最终执行结果:" + e.getCause().getMessage());
}
}
}
- 客户端Handler处理类
- 编码类:用于做客户端的编码工作,通常协议的构建就是在此类中进行,然后发送给服务端
public class ClientByteEncoder extends MessageToByteEncoder<OperateDto> {
@Override
protected void encode(ChannelHandlerContext channelHandlerContext, OperateDto operateDto, ByteBuf byteBuf) throws Exception {
byte[] serialize = SerializationUtils.serialize(operateDto);
System.out.println("客户端encoder发送:" + Arrays.toString(serialize));
byteBuf.writeBytes(serialize);
}
}
- 解码类:用于接收和解析从服务端发送来的数据,校验及内容的提取就是在此类进行
public class ClientDecoder extends ByteToMessageDecoder {
// 第二个参数为in(from server),第三个参数为out(out to next handler)
@Override
protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {
// ByteBuf byteBuf = channelHandlerContext.alloc().buffer();
// byteBuf用于存储通信内容
byte[] resp = new byte[byteBuf.readableBytes()];
// 将byteBuf的内容读取到resp数组中
byteBuf.readBytes(resp);
// 将读取到到byte数组进行反序列化操作
ResultDto resultDto = (ResultDto) SerializationUtils.deserialize(resp);
System.out.println("接收到服务器响应:" + resultDto);
// list即为out,加入后会传送给下一个handler
list.add(resultDto);
}
}
- 业务处理类:该类可以认为是Decoder类的下一层解析,Decoder解析协议并提取出内容,Handler将内容进行业务层相关的解析工作,并且只有当监听到相同类型的对象时才会有效,否则会忽略,例如下面的handler,需要监听到上游handler发送了ResultDto类型的数据才会响应。
public class ClientHandler extends SimpleChannelInboundHandler<ResultDto> {
/**
* 接收来自客户端的promise,当业务达到结束条件后,setSuccess或者setFailure,
* 客户端监听到结果后后进行相关处理,例如通过addListening异步监听,或者通过sync同步阻塞等待结果
*/
private final Promise<Integer> promise;
public ClientHandler(Promise<Integer> promise) {
this.promise = promise;
}
/**
* 将channel和promise的操作放在handler中,与service解耦
* promise:用于确定业务的执行结果,并将结果返回到client
* channel:用于服务端和客户端间到通信,向channel写入数据即可向对方通信
* ByteBuf:用于存储接收到的通信内容,通常需要从中读取内容并进行接下来的业务操作
*/
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, ResultDto resultDto) throws Exception {
ClientService clientService = SpringContextUtil.getBean(ClientService.class);
Channel channel = channelHandlerContext.channel();
// 将业务逻辑抽离到service层,获取业务执行结果
OperateDto operateDto = clientService.handleResult(resultDto);
channel.writeAndFlush(operateDto);
// 根据结果判定是否达到业务结束条件,将结果添加到promise中
String operate = operateDto.getOperate();
if ("3".equals(operate)) {
promise.setSuccess(10000);
// promise.setFailure(new Exception("出错啦"));
}
}
}
3. 客户端Service层业务处理
- 抽离出来的业务执行类,输入是服务器的响应内容,输出是处理后接下来的执行内容
@Service
public class ClientService {
public OperateDto handleResult(ResultDto resultDto) {
String flag = resultDto.getFlag();
OperateDto operateDto = OperateDto.builder().build();
switch (flag) {
case "1":
operateDto.setOperate("2");
operateDto.setContent("接收到1,开始执行2");
break;
case "2":
operateDto.setOperate("3");
operateDto.setContent("接收到2,开始执行3");
break;
}
return operateDto;
}
}
服务端
- 服务端启动类
public class NettyServer {
public static void start() {
NioEventLoopGroup bossExecutors = new NioEventLoopGroup();
NioEventLoopGroup workerExecutors = new NioEventLoopGroup();
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap
.group(bossExecutors, workerExecutors)
.channel(NioServerSocketChannel.class)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childOption(ChannelOption.TCP_NODELAY, true)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline.addLast(new ServerByteEncoder());
pipeline.addLast(new ServerByteDecoder());
pipeline.addLast(new ServerHandler());
}
});
serverBootstrap.bind(8090);
}
}
2. 服务端Handler处理类
- 服务端解码类
public class ServerByteDecoder extends ByteToMessageDecoder {
@Override
protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {
byte[] con = new byte[byteBuf.readableBytes()];
byteBuf.readBytes(con);
OperateDto operateDto = (OperateDto) SerializationUtils.deserialize(con);
System.out.println("服务端decoder收到消息:" + operateDto);
list.add(operateDto);
}
}
- 服务端编码类
public class ServerByteEncoder extends MessageToByteEncoder<ResultDto> {
@Override
protected void encode(ChannelHandlerContext channelHandlerContext, ResultDto resultDto, ByteBuf byteBuf) throws Exception {
System.out.println("服务端响应处理结果:" + resultDto);
byteBuf.writeBytes(SerializationUtils.serialize(resultDto));
}
}
- 服务端处理类
public class ServerHandler extends SimpleChannelInboundHandler<OperateDto> {
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, OperateDto operateDto) {
System.out.println("server handler 接收到:" + operateDto);
ServerService serverService = SpringContextUtil.getBean(ServerService.class);
Channel channel = channelHandlerContext.channel();
ResultDto resultDto = serverService.operateClientReq(operateDto);
// 服务端可以通过channel跟客户端进行通信,每个客户端与服务端的连接都会有一个channel
channel.writeAndFlush(resultDto);
}
}
3. 服务端Service层业务处理
@Service
public class ServerService {
public ResultDto operateClientReq(OperateDto operateDto) {
String operate = operateDto.getOperate();
ResultDto resultDto = ResultDto.builder().build();
switch (operate) {
case "1":
resultDto.setFlag("1");
resultDto.setResult("这是server对你1的回复");
break;
case "2":
resultDto.setFlag("2");
resultDto.setResult("这是server回复你2的回复");
break;
}
return resultDto;
}
}
其他
- OperateDto
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class OperateDto implements Serializable {
private String operate;
private String content;
}
- ResultDto
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class ResultDto implements Serializable {
private String flag;
private String result;
}
- SpringContextUtil
@Component
public class SpringContextUtil implements ApplicationContextAware {
private static ApplicationContext applicationContext;
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
SpringContextUtil.applicationContext = applicationContext;
}
//获取applicationContext
public static ApplicationContext getApplicationContext() {
return applicationContext;
}
//通过name获取 Bean.
public static Object getBean(String name) {
return getApplicationContext().getBean(name);
}
//通过class获取Bean.
public static <T> T getBean(Class<T> clazz) {
return getApplicationContext().getBean(clazz);
}
//通过name,以及Clazz返回指定的Bean
public static <T> T getBean(String name, Class<T> clazz) {
return getApplicationContext().getBean(name, clazz);
}
}