Netty 简介
Netty 是一款提供异步、时间驱动,用于构建高性能网络应用程序的架构,主要功能就是建立起两端的通信,是 RPC 的基础设施。Netty 是基于 NIO,并且参考 Reactor 设计模式来进行设计的。解决了很多 JDK 原生存在的问题,比如解决了原生 NIO 存在 Epoll Bug,并且加入了自己的特性,效率提高了很多
为什么是 Netty ?
- 使用 JDK 自己的 NIO 需要了解太多概念,编程复杂
- Netty 解决了 JDK NIO 很多Bug,比如空轮询
- Netty 自带的拆包解包,异常检测等机制让你从 NIO 的繁重细节中脱离出来,让你只需要关心业务逻辑
- 自带各种协议栈
- Netty 社区活跃
- Netty 已经历各大 RPC 框架,消息中间件,分布式通信中间件线上的广泛验证,健壮性无比强大
Netty 核心组件介绍
Bootstrap & ServerBootstrap
这两个是引导类,都继承了 AbstractBootstrap 类。
作用:将各个组件进行组装,比如 Channel、EventLoopGroup、ChannelHandler 等
Channel
Channel 是一个接口,它是 Netty 网络操作抽象类,它除了定义了基本的 I/O 操作以外,还定义了注册、绑定、连接等方法,并且还定义了和 Netty 相关的方法,比如获取该 Channel 的 EventLoop 等。
Channel 生命周期
- 注册状态:当一个 Channel 注册到 EventLoop 上,ChannelInboundHandler 中的 channelRegistered 方法将会在注册时被调用
- 未注册状态:Channel 没有注册在 EventLoop 上,当一个 Channel 从它的 EventLoop 上解除注册,ChannelInboundHandler 中的 channelUnregistered 方法将会被调用
- 活跃状态:Channel 时连接/绑定、就绪的,则是活跃状态,当 Channel 变成活跃状态时 ChannelInBoundHandler 中的 channelActive 被调用
- 为活跃状态:不在连接到某个远端时处于未活跃状态,当 Channel 离开活跃状态,ChannelInboundHandler 中的 channelInactive 被调用
ChannelFuture
ChannelFuture 是在 JDK 的 Future 基础上进行改造的。Netty 的所有 I/O都会返回一个 ChannFuture 对象,所有操作都不会阻塞,I/O操作是否完成,执行结果等信息都可以在 ChannelFuture 中获取,这也说明了 Netty 支持全异步。而且还可以为 ChannelFuture 添加监听器 ChannelFutureListener 来提供通知机制,这样就可以知道操作何时完成,见下列:
ChannelFuture channelFuture = bootstrap.bind(IP, port).sync();
channelFuture.addListener(new ChannelFutureListener(){
public void operationComplete(ChannelFuture channelFuture) throws Exception {
System.out.println("连接完成");
}
});
EventLoop & EventLoopGroup
Channel 可以注册在一个 EventLoop 上,当时间到来时,EventLoop 根据不同的事件来选择 ChannelHandler 进行处理。在一个 EventLoop 的生命周期中,只能和一个线程进行绑定,也就是 EventLoop 和 Thread 一一对应。
EventLoopGroup 可以理解为线程池
ChannelHandler
回调来处理事件,当一个回调被触发时,相关事件就会被一个 ChannelHandler 的实现处理。
ChannelHandler 有两个子接口ChannelInboundHandler 和 ChannelOutboundHandler
- ChannelInboundHandler:定义了入站数据以及状态变化
- ChannelOutboundHandler:定义了出站数据以及拦截所有出站的操作
生命周期:
- handlerAdded:把 ChannelHandler 添加到 ChannelPipeline 的时候被调用
- handlerRemoved:把 ChannelHandler 从 ChannelPipeline 移除的时候被调用
ChannelPipeline
一个客户端连接肯定会发送各类事件的请求,也就是说一个 Channel 内必然会有多个事件产生,会有多个不同的 Handler 实现来响应对应的事件。这就需要用到 ChannelPipeline,可以把 ChannelPipeline 认为是处理 ChannelHandler 的链,ChannelPipeline 管理着该通道的所有 ChannelHandler,包括 ChannelHandler 的生命周期以及执行顺序,还提供了操作 ChannelHandler 的丰富方法,比如添加、删除或者替换 ChannelHandler,ChannelPipeline 还有丰富的 API 可以用来影响出站和进站事件。

每个 Channel 在创建的时候都会创建一个 ChannelPipeline。Channel 和 ChannelPipeline 是一一对应的,在整个 Channel 的生命周期里面都不会改变。
线程模型

实战:客户端与服务端双向通信
引入依赖
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.6.Final</version>
</dependency>
服务端
/**
* NettyServer.java
*/
public class NettyServer {
private static final int PORT = 8000;
public static void main(String[] args) {
NioEventLoopGroup boosGroup = new NioEventLoopGroup();
NioEventLoopGroup workerGroup = new NioEventLoopGroup();
final ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap
.group(boosGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childOption(ChannelOption.TCP_NODELAY, true)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
protected void initChannel(NioSocketChannel ch) {
ch.pipeline().addLast(new FirstServerHandler());
}
});
bind(serverBootstrap, PORT);
}
private static void bind(final ServerBootstrap serverBootstrap, final int port) {
serverBootstrap.bind(port).addListener(future -> {
if (future.isSuccess()) {
System.out.println("端口[" + port + "]绑定成功!");
} else {
System.err.println("端口[" + port + "]绑定失败!");
}
});
}
}
/**
* FirstServerHandler.java
*/
public class FirstServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf byteBuf = (ByteBuf) msg;
System.out.println(new Date() + ": 服务端读到数据 -> " + byteBuf.toString(Charset.forName("utf-8")));
// 回复数据到客户端
System.out.println(new Date() + ": 服务端写出数据");
ByteBuf out = getByteBuf(ctx);
ctx.channel().writeAndFlush(out);
}
private ByteBuf getByteBuf(ChannelHandlerContext ctx) {
byte[] bytes = "你好,Netty Cilent!".getBytes(Charset.forName("utf-8"));
ByteBuf buffer = ctx.alloc().buffer();
buffer.writeBytes(bytes);
return buffer;
}
}
客户端
/**
* NettyClient.java
*/
public class NettyClient {
private static final int MAX_RETRY = 5;
private static final String HOST = "127.0.0.1";
private static final int PORT = 8000;
public static void main(String[] args) {
NioEventLoopGroup workerGroup = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();
bootstrap
.group(workerGroup)
.channel(NioSocketChannel.class)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000)
.option(ChannelOption.SO_KEEPALIVE, true)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) {
ch.pipeline().addLast(new FirstClientHandler());
}
});
connect(bootstrap, HOST, PORT, MAX_RETRY);
}
private static void connect(Bootstrap bootstrap, String host, int port, int retry) {
bootstrap.connect(host, port).addListener(future -> {
if (future.isSuccess()) {
System.out.println("连接成功!");
} else if (retry == 0) {
System.err.println("重试次数已用完,放弃连接!");
} else {
// 第几次重连
int order = (MAX_RETRY - retry) + 1;
// 本次重连的间隔
int delay = 1 << order;
System.err.println(new Date() + ": 连接失败,第" + order + "次重连……");
bootstrap.config().group().schedule(() -> connect(bootstrap, host, port, retry - 1), delay, TimeUnit
.SECONDS);
}
});
}
}
/**
* FirstClientHandler.java
*/
public class FirstClientHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) {
System.out.println(new Date() + ": 客户端写出数据");
// 1.获取数据
ByteBuf buffer = getByteBuf(ctx);
// 2.写数据
ctx.channel().writeAndFlush(buffer);
}
private ByteBuf getByteBuf(ChannelHandlerContext ctx) {
byte[] bytes = "你好,Netty Server!".getBytes(Charset.forName("utf-8"));
ByteBuf buffer = ctx.alloc().buffer();
buffer.writeBytes(bytes);
return buffer;
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf byteBuf = (ByteBuf) msg;
System.out.println(new Date() + ": 客户端读到数据 -> " + byteBuf.toString(Charset.forName("utf-8")));
}
}
效果
先启动服务器,再启动客户端

