Java

Netty 入门

勤劳的小蜜蜂 · 12月27日 · 2019年

Netty 简介

Netty 是一款提供异步、时间驱动,用于构建高性能网络应用程序的架构,主要功能就是建立起两端的通信,是 RPC 的基础设施。Netty 是基于 NIO,并且参考 Reactor 设计模式来进行设计的。解决了很多 JDK 原生存在的问题,比如解决了原生 NIO 存在 Epoll Bug,并且加入了自己的特性,效率提高了很多

为什么是 Netty ?

  • 使用 JDK 自己的 NIO 需要了解太多概念,编程复杂
  • Netty 解决了 JDK NIO 很多Bug,比如空轮询
  • Netty 自带的拆包解包,异常检测等机制让你从 NIO 的繁重细节中脱离出来,让你只需要关心业务逻辑
  • 自带各种协议栈
  • Netty 社区活跃
  • Netty 已经历各大 RPC 框架,消息中间件,分布式通信中间件线上的广泛验证,健壮性无比强大

Netty 核心组件介绍

Bootstrap & ServerBootstrap

这两个是引导类,都继承了 AbstractBootstrap 类。

作用:将各个组件进行组装,比如 Channel、EventLoopGroup、ChannelHandler 等

Channel

Channel 是一个接口,它是 Netty 网络操作抽象类,它除了定义了基本的 I/O 操作以外,还定义了注册、绑定、连接等方法,并且还定义了和 Netty 相关的方法,比如获取该 Channel 的 EventLoop 等。

Channel 生命周期

  • 注册状态:当一个 Channel 注册到 EventLoop 上,ChannelInboundHandler 中的 channelRegistered 方法将会在注册时被调用
  • 未注册状态:Channel 没有注册在 EventLoop 上,当一个 Channel 从它的 EventLoop 上解除注册,ChannelInboundHandler 中的 channelUnregistered 方法将会被调用
  • 活跃状态:Channel 时连接/绑定、就绪的,则是活跃状态,当 Channel 变成活跃状态时 ChannelInBoundHandler 中的 channelActive 被调用
  • 为活跃状态:不在连接到某个远端时处于未活跃状态,当 Channel 离开活跃状态,ChannelInboundHandler 中的 channelInactive 被调用

ChannelFuture

ChannelFuture 是在 JDK 的 Future 基础上进行改造的。Netty 的所有 I/O都会返回一个 ChannFuture 对象,所有操作都不会阻塞,I/O操作是否完成,执行结果等信息都可以在 ChannelFuture 中获取,这也说明了 Netty 支持全异步。而且还可以为 ChannelFuture 添加监听器 ChannelFutureListener 来提供通知机制,这样就可以知道操作何时完成,见下列:

ChannelFuture channelFuture = bootstrap.bind(IP, port).sync();
channelFuture.addListener(new ChannelFutureListener(){
    public void operationComplete(ChannelFuture channelFuture) throws Exception {
        System.out.println("连接完成");
    }
});

EventLoop & EventLoopGroup

Channel 可以注册在一个 EventLoop 上,当时间到来时,EventLoop 根据不同的事件来选择 ChannelHandler 进行处理。在一个 EventLoop 的生命周期中,只能和一个线程进行绑定,也就是 EventLoop 和 Thread 一一对应。

EventLoopGroup 可以理解为线程池

ChannelHandler

回调来处理事件,当一个回调被触发时,相关事件就会被一个 ChannelHandler 的实现处理。

ChannelHandler 有两个子接口ChannelInboundHandler 和 ChannelOutboundHandler

  • ChannelInboundHandler:定义了入站数据以及状态变化
  • ChannelOutboundHandler:定义了出站数据以及拦截所有出站的操作

生命周期:

  • handlerAdded:把 ChannelHandler 添加到 ChannelPipeline 的时候被调用
  • handlerRemoved:把 ChannelHandler 从 ChannelPipeline 移除的时候被调用

ChannelPipeline

一个客户端连接肯定会发送各类事件的请求,也就是说一个 Channel 内必然会有多个事件产生,会有多个不同的 Handler 实现来响应对应的事件。这就需要用到 ChannelPipeline,可以把 ChannelPipeline 认为是处理 ChannelHandler 的链,ChannelPipeline 管理着该通道的所有 ChannelHandler,包括 ChannelHandler 的生命周期以及执行顺序,还提供了操作 ChannelHandler 的丰富方法,比如添加、删除或者替换 ChannelHandler,ChannelPipeline 还有丰富的 API 可以用来影响出站和进站事件。

每个 Channel 在创建的时候都会创建一个 ChannelPipeline。Channel 和 ChannelPipeline 是一一对应的,在整个 Channel 的生命周期里面都不会改变。

线程模型

实战:客户端与服务端双向通信

引入依赖

<dependency>
  <groupId>io.netty</groupId>
  <artifactId>netty-all</artifactId>
  <version>4.1.6.Final</version>
</dependency>

服务端

/** 
* NettyServer.java
*/
public class NettyServer {

    private static final int PORT = 8000;

    public static void main(String[] args) {
        NioEventLoopGroup boosGroup = new NioEventLoopGroup();
        NioEventLoopGroup workerGroup = new NioEventLoopGroup();

        final ServerBootstrap serverBootstrap = new ServerBootstrap();
        serverBootstrap
                .group(boosGroup, workerGroup)
                .channel(NioServerSocketChannel.class)
                .option(ChannelOption.SO_BACKLOG, 1024)
                .childOption(ChannelOption.SO_KEEPALIVE, true)
                .childOption(ChannelOption.TCP_NODELAY, true)
                .childHandler(new ChannelInitializer<NioSocketChannel>() {
                    protected void initChannel(NioSocketChannel ch) {
                        ch.pipeline().addLast(new FirstServerHandler());
                    }
                });


        bind(serverBootstrap, PORT);
    }

    private static void bind(final ServerBootstrap serverBootstrap, final int port) {
        serverBootstrap.bind(port).addListener(future -> {
            if (future.isSuccess()) {
                System.out.println("端口[" + port + "]绑定成功!");
            } else {
                System.err.println("端口[" + port + "]绑定失败!");
            }
        });
    }
}
/** 
* FirstServerHandler.java
*/
public class FirstServerHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        ByteBuf byteBuf = (ByteBuf) msg;

        System.out.println(new Date() + ": 服务端读到数据 -> " + byteBuf.toString(Charset.forName("utf-8")));

        // 回复数据到客户端
        System.out.println(new Date() + ": 服务端写出数据");
        ByteBuf out = getByteBuf(ctx);
        ctx.channel().writeAndFlush(out);
    }

    private ByteBuf getByteBuf(ChannelHandlerContext ctx) {
        byte[] bytes = "你好,Netty Cilent!".getBytes(Charset.forName("utf-8"));

        ByteBuf buffer = ctx.alloc().buffer();

        buffer.writeBytes(bytes);

        return buffer;
    }
}

客户端

/** 
* NettyClient.java
*/
public class NettyClient {
    private static final int MAX_RETRY = 5;
    private static final String HOST = "127.0.0.1";
    private static final int PORT = 8000;


    public static void main(String[] args) {
        NioEventLoopGroup workerGroup = new NioEventLoopGroup();

        Bootstrap bootstrap = new Bootstrap();
        bootstrap
                .group(workerGroup)
                .channel(NioSocketChannel.class)
                .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000)
                .option(ChannelOption.SO_KEEPALIVE, true)
                .option(ChannelOption.TCP_NODELAY, true)
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    public void initChannel(SocketChannel ch) {
                        ch.pipeline().addLast(new FirstClientHandler());
                    }
                });

        connect(bootstrap, HOST, PORT, MAX_RETRY);
    }

    private static void connect(Bootstrap bootstrap, String host, int port, int retry) {
        bootstrap.connect(host, port).addListener(future -> {
            if (future.isSuccess()) {
                System.out.println("连接成功!");
            } else if (retry == 0) {
                System.err.println("重试次数已用完,放弃连接!");
            } else {
                // 第几次重连
                int order = (MAX_RETRY - retry) + 1;
                // 本次重连的间隔
                int delay = 1 << order;
                System.err.println(new Date() + ": 连接失败,第" + order + "次重连……");
                bootstrap.config().group().schedule(() -> connect(bootstrap, host, port, retry - 1), delay, TimeUnit
                        .SECONDS);
            }
        });
    }
}
/** 
* FirstClientHandler.java
*/
public class FirstClientHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelActive(ChannelHandlerContext ctx) {
        System.out.println(new Date() + ": 客户端写出数据");

        // 1.获取数据
        ByteBuf buffer = getByteBuf(ctx);

        // 2.写数据
        ctx.channel().writeAndFlush(buffer);
    }

    private ByteBuf getByteBuf(ChannelHandlerContext ctx) {
        byte[] bytes = "你好,Netty Server!".getBytes(Charset.forName("utf-8"));

        ByteBuf buffer = ctx.alloc().buffer();

        buffer.writeBytes(bytes);

        return buffer;
    }


    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        ByteBuf byteBuf = (ByteBuf) msg;

        System.out.println(new Date() + ": 客户端读到数据 -> " + byteBuf.toString(Charset.forName("utf-8")));
    }
}

效果

先启动服务器,再启动客户端

0 条回应