Netty底层机制

一、NIO回顾与Netty简介

1. NIO

​ Java NIO : 同步非阻塞,服务器实现模式为一个线程处理多个请求(连接),即客户端发送的连接请求都会注册到多路复用器Selector上,多路复用器轮询到连接有I/O请求就进行处理。NIO是面向缓冲区(块)编程的。

NIO 基于 Channel(通道)和 Buffer(缓冲区)进行操作,数据总是从通道读取到缓冲区中,或者从缓冲区写入到通道中。Selector(选择器)用于监听多个通道的事件(比如:连接请求,数据到达等),因此使用单个线程就可以监听多个客户端通道

在这里插入图片描述

2. 原生NIO问题

​ 1)NIO的类库和API非常繁杂,使用不便,需要非常熟练掌握Selector、SocketChannel、ServerSocketChannel、ByteBuffer等等。

​ 2)因为NIO编程涉及到 Reactor 模式,你必须对多线程和网络编程非常熟悉,才能编写出高质量的 NIO 程序。

​ 3)开发工作量和难度都非常大,例如客户端面临断连重连、网络闪断、半包读写、失败缓存、网络拥塞和异常流的处理等等。

​ 4)JDK NIO 存在Bug:Epoll Bug,它会导致 Selector 空轮询,最终导致 CPU 100%。

3. Netty简介

​ Netty官网对Netty的描述:Netty是一个异步的,基于事件驱动的网络应用框架,用以快速开发高性能、高可靠性的网络 IO 程序。

Netty is an asynchronous event-driven network application framework for rapid development of maintainable high performance protocol servers & clients

Netty主要针对在TCP协议下,面向Clients端的高并发应用,本质是一个NIO框架,适用于服务器通讯相关的多种应用场景。

在这里插入图片描述

​ 在分布式系统中,各个节点之间需要远程服务调用,高性能的 RPC 框架必不可少,Netty 作为异步高性能的通信框架,往往作为基础通信组件被这些 RPC 框架使用。

4. Netty的优点

​ Netty 对JDK自带的 NIO 的 API 进行了封装。

​ 1)设计优雅:适用于各种传输类型的统一 API 阻塞和非阻塞 Socket;基于灵活且可扩展的事件模型,可以清晰地分离关注点;高度可定制的线程模型 - 单线程,一个或多个线程池.

​ 2)高性能、吞吐量更高:延迟更低;减少资源消耗;最小化不必要的内存复制。

​ 3)安全:完整的 SSL/TLS 和 StartTLS 支持。

二、Netty 高性能架构设计

1. 线程模型

​ 线程模式会对程序的性能有很大影响,为了明白Netty的线程模式,我们来系统的讲解下各个线程模式。

目前存在的线程模型有如下模型,而Netty线程模式,主要基于主从Reactor多线程模型做了一定的改进,其中主从Reactor多线程模型有多个 Reactor。

  • 传统阻塞I/O服务模型
  • Reactor模式
    • 单Reactor单线程
    • 单Reactor多线程
    • 主从Reactor多线程

2. 传统阻塞的I/O服务模型

每个请求都需要一个独立的线程完成 建立连接,数据的输入,业务处理,数据返回,黄色框代表是一个handler对象,蓝色是线程。

在这里插入图片描述

问题分析:

(1)当并发数很大时,由于每个请求都需要一个线程,则会占用很多系统资源。

(2)连接建立了,但如果没有进行数据读,则线程会阻塞在read操作,造成资源的浪费。

3. Reactor模型

​ 所以针对传统IO的两个问题,我们的解决方法是:

(1)通过基于I/O复用模型来实现多个请求共用一个阻塞对象ServiceHandler(只阻塞一个线程),客户端的应用程序只需要在一个阻塞对象等待,无需阻塞等待所有的请求。如果请求连接有数据传输和处理,OS会通知应用程序,线程由阻塞到开始进行业务处理。

(2)基于线程池复用线程资源:不必再为每个连接创建线程将连接完成后的业务处理任务分配给线程池中的线程进行处理,所以一个线程可以处理多个连接的业务,而不是绑定。

在这里插入图片描述

​ Reactor 模式使用IO复用监听事件, 收到事件后,分发给某个线程(进程), 这点就是网络服务器高并发处理关键

1)Reactor:Reactor 在一个单独的线程中运行,负责监听和分发事件,分发给适当的处理程序来对 IO 事件做出反应。

2)Handlers:处理程序执行 I/O 事件要完成的实际事件,Reactor 通过调度适当的Handler来响应 I/O 事件,处理程序执行非阻塞操作。

​ 根据 Reactor 的数量和处理资源池线程的数量不同,有 3 种典型的实现

  1. 单Reactor单线程

    特点:Handler会处理掉完整的业务请求,前面的NIO属于这种例子,但是客户端连接太多,则无法支撑

    在这里插入图片描述

    • Select 是网络编程 API,可以实现应用程序通过一个阻塞对象监听多路连接请求,Reactor 对象通过 Select 监控客户端请求事件,收到事件后通过 Dispatch进行分发。

    • 如果是建立连接请求事件,则由 Acceptor 通过 Accept 处理连接请求(与Tomcat中EndPoint一致),然后创建一个 Handler 对象处理连接完成后的后续业务处理。

      如果不是建立连接事件,则 Reactor 会分发调用连接对应的 Handler 来响应,Handler 会完成 Read→业务处理→Send的完整业务流程。

  2. 单Reactor多线程

    1. 特点:Handler现在只负责read读取数据,和send响应数据,会分发给后面Worker线程池中worker线程进行业务处理逻辑,然后返回给handler进行send返回给client。

    2.优缺点:可以利用多核cpu的处理能力,由于Reactor处理所有事件的监听和响应,多高并发下容易瓶颈,自己拉了跨,后面处理没问题。

    在这里插入图片描述

  3. 主从Reactor多线程

    所以针对上述瓶颈情况,可以让Reactor多线程(子SubReactor),并且连接加入到连接队列中监听,发生事件时才传给Handler进行处理,即使用Reactor主从线程。

在这里插入图片描述

  1. 通过MainReactor进行select监听连接,通过Acceptor处理连接事件,不是建立连接则将连接分配给SubReactor。
  2. SubReactor将连接加入到连接队列中进行监听,当事件发生时,调用对应的Handler进行处理。

4. Reactor模型优点

  • 响应快,不必为单个同步事件所阻塞,因为可以调用其他的SubReactor进行处理。
  • 避免复杂的多线程及同步问题,并且避免了多线程/进程的切换开销。
  • 方便扩展,增加Reactor子线程实例来利用CPU资源。

5. Netty模型

​ Netty 主要基于主从Reactor多线程模型做了一定的改进,其中主从 Reactor 多线程模型有多个 Reactor。

在这里插入图片描述

  • 对上图的说明:

(1)Netty抽象出两组线程池 BossGroup 专门负责接收客户端的连接, WorkerGroup 专门负责网络的读写

(2)BossGroup 和 WorkerGroup 类型都是 NioEventLoopGroup,NioEventLoopGroup 相当于一个事件循环组, 这个组中含有多个事件循环 ,每一个事件循环是 NioEventLoop

(3)NioEventLoop 表示一个不断循环(死循环)的执行处理任务的线程, 每个NioEventLoop 都有一个selector , 用于监听绑定在其上的socket的网络通讯,NioEventLoopGroup 可以有多个线程, 即可以含有多个NioEventLoop

  • 每个Boss NioEventLoop 循环执行的步骤有3步
  1. 轮询accept事件(判断是否有客户端请求连接)

  2. 处理accept事件 , 与client建立连接 , 将ServerSocketChannel生成NioServerSocketChannel , 并将其注册到某个worker中NioEventLoop 上的 selector

  3. 处理任务队列taskqueue的任务 , 即 runAllTasks

  • 每个 Worker NioEventLoop 循环执行的步骤
  1. 轮询read, write事件

  2. 处理i/o事件, 即read , write 事件,在对应注册的NioSocketChannel上进行处理(因为是数据读写,涉及到NIO)。

  3. 处理任务队列taskqueue的任务 , 即 runAllTasks

  4. 每个Worker NioEventLoop 处理业务时,会使用pipeline(管道,类型是ChannelPipeline),管道中维护了很多的处理器Handler,pipeline与channel又互相包含。

6. 任务队列TaskQueue

​ (1)为什么会添加任务到任务队列中?

当管道pipeline中的业务比较耗时,为了不阻塞管道,则进行异步执行。

​ (2)怎么添加?

让业务进行异步执行,提交到该channel对应的Eventloop的taskqueue中。

7. Netty模型总结

1) Netty 抽象出两组线程池,BossGroup 专门负责接收客户端连接,WorkerGroup 专门负责网络读写操作。

2) NioEventLoop 表示一个不断循环执行处理任务的线程,每个 NioEventLoop 都有一个 selector,用于监听绑定在其上的 socket 网络通道。

3) NioEventLoop 内部采用串行化设计,从消息的读取->解码->处理->编码->发送,始终由 IO 线程 NioEventLoop 负责

• NioEventLoopGroup 下包含多个 NioEventLoop

• 每个 NioEventLoop 中包含有一个 Selector,一个 taskQueue任务队列

• 每个 NioEventLoop 的 Selector 上可以注册监听多个 NioChannel

• 每个 NioChannel 只会绑定在唯一的 NioEventLoop 上

• 每个 NioChannel 都绑定有一个自己的 ChannelPipeline

8. 异步模型

​ 异步即:当调用发出后,调用者不会立刻得到结果,实际处理这个调用的组件执行完后,再回调或者通知调用者。

Netty中的 IO操作 都是 异步IO,包括Bind、Write、Connect等,都会先返回一个ChannelFuture,后序可以通过Future-Listener机制,主动去监控方法的处理过程 或者 通知机制回调回来结果。

上述的监听功能由注册的监听器实现,常见有如下操作

•通过 isDone 方法来判断当前操作是否完成;

•通过 isSuccess 方法来判断已完成的当前操作是否成功;

•通过 getCause 方法来获取已完成的当前操作失败的原因;

•通过 isCancelled 方法来判断已完成的当前操作是否被取消;

通过 addListener 方法来注册监听器,当操作已完成(isDone 方法返回完成),将会通知指定的监听器;如果 Future 对象已完成,则通知指定的监听器

//绑定一个端口并且同步, 生成了一个 ChannelFuture 对象
//启动服务器(并绑定端口)
ChannelFuture cf = bootstrap.bind(6668).sync();

//给cf 注册监听器,监控我们关心的事件
cf.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (cf.isSuccess()) {
System.out.println("监听端口 6668 成功");
} else {
System.out.println("监听端口 6668 失败");
}
}
});

在这里插入图片描述

注:拦截操作和转换出入站数据只需要您提供 callback 或利用future 即可。这使得链式操作简单、高效, 并有利于编写可重用的、通用的代码。

  • 小结:

    相比较于传统的阻塞I/O模型,异步处理不会造成线程阻塞的情况,在I/O期间还可以执行别的业务程序,因此在高并发场景下,更稳定和有更高的吞吐量。

三、Netty 核心模块组件

1. Bootstrap

​ 引导类,与Tomcat引导类相同,一个 Netty 应用通常由一个 Bootstrap 开始,主要作用是配置整个 Netty 程序,链式编程串联各个组件。

​ Bootstrap 类是客户端程序的启动引导类,ServerBootstrap 是服务端启动引导类。

2. Future、ChannelFuture

​ IO 操作都是异步的,通过 Future 和 ChannelFutures,他们可以注册一个监听,当操作执行成功或失败时监听会自动触发注册的监听事件(sync返回)。

3. Channel

​ Channel是执行I/O操作的通道,例如建立连接,绑定端口,读写都是由Channel来完成

  • NioSocketChannel,异步的客户端 TCP Socket 连接。

  • NioServerSocketChannel,异步的服务器端 TCP Socket 连接。

4. BoosGroup(Selector实例)

​ BoosGroup中的EventLoop,是一个Selector实例,Netty 基于 Selector 对象实现 I/O 多路复用,通过 Selector 一个线程可以轮询监听多个连接的 Channel 事件,从而高效管理多个Channel。

5. ChannelHandler

​ 处理 I/O 事件或拦截 I/O 操作,并将其转发到其 ChannelPipeline(业务处理链)中的下一个处理程序

在这里插入图片描述

需要重写几个方法:

//当通道就绪就会触发该方法
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("client " + ctx);
ctx.writeAndFlush(Unpooled.copiedBuffer("hello, server: (>^ω^<)喵", CharsetUtil.UTF_8));
}

//当通道有读取事件时,会触发
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
System.out.println("服务器回复的消息:" + buf.toString(CharsetUtil.UTF_8));
System.out.println("服务器的地址: "+ ctx.channel().remoteAddress());
}

//数据读取完毕
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
//writeAndFlush 是 write + flush
//将数据写入到缓存,并刷新(再写到管道)
ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 客户端~(>^ω^<)喵1", CharsetUtil.UTF_8));
}
//异常时
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}

6. Pipeline和ChannelPipeline

​ ChannelPipeline是保存ChannelHandler的双向链表,用于处理或拦截Channel的入站事件和出站操作,实现了一种高级形式的拦截过滤器模式

在这里插入图片描述

每个 Channel 都有且仅有一个 ChannelPipeline 与之对应。

  • 一个 Channel 包含了一个 ChannelPipeline,而 ChannelPipeline 中又维护了一个由 ChannelHandlerContext 组成的双向链表,并且每个 ChannelHandlerContext 中又关联着一个 ChannelHandler

  • 入站事件和出站事件在一个双向链表中,入站事件会从链表 head 往后传递到最后一个入站的 handler,出站事件会从链表 tail 往前传递到最前一个出站的 handler,两种类型的 handler 互不干扰

ChannelHandlerContext(ctx)可以获取管道和通道,已经对应的Handler

例如:都是通过ChannelHandlerContext进行操作对应的通道I/O

//当通道就绪就会触发该方法
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("client " + ctx);
ctx.writeAndFlush(Unpooled.copiedBuffer("hello, server: (>^ω^<)喵", CharsetUtil.UTF_8));
}

7. EventLoopGroup、NioEventLoopGroup

​ 在 Netty 服务器端编程中,我们一般都需要提供两个 EventLoopGroup,例如:BossEventLoopGroup 和 WorkerEventLoopGroup。

在这里插入图片描述

解读:BossEventLoopGroup 通常是一个单线程的 EventLoop,EventLoop 维护着一个注册了ServerSocketChannel 的 Selector 实例,BossEventLoop 不断轮询 Selector 将连接事件分离出来交给WorkerEventLoopGroup 来进行 IO 处理

8. 心跳检测机制

​ Netty中自带了心跳检测的拦截器,当检测到读或者写超时时,就会触发,然后传递给下一个handler userEventTiggered

//加入一个netty 提供 IdleStateHandler处理空闲状态的处理器
pipeline.addLast(new IdleStateHandler(7000,7000,10, TimeUnit.SECONDS));

// 当 IdleStateEvent 触发后 , 就会传递给管道 的下一个handler去处理通过调用(触发)下一个handler 的 userEventTiggered , 在该方法中去处理 IdleStateEvent(读空闲,写空闲,读写空闲)

//加入一个对空闲检测进一步处理的handler(自定义)
pipeline.addLast(new MyServerHandler());
public class MyServerHandler extends ChannelInboundHandlerAdapter {

/**
*
* @param ctx 上下文
* @param evt 事件
* @throws Exception
*/
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {

if(evt instanceof IdleStateEvent) {

//将 evt 向下转型 IdleStateEvent
IdleStateEvent event = (IdleStateEvent) evt;
String eventType = null;
switch (event.state()) {
case READER_IDLE:
eventType = "读空闲";
break;
case WRITER_IDLE:
eventType = "写空闲";
break;
case ALL_IDLE:
eventType = "读写空闲";
break;
}
System.out.println(ctx.channel().remoteAddress() + "--超时时间--" + eventType);
System.out.println("服务器做相应处理..");

//如果发生空闲,我们关闭通道
// ctx.channel().close();
}
}
}

9. 入站、出站机制

​ ChannelHandler充当了处理入站和出站数据的应用程序逻辑的容器。

入站实现ChannelInboundHandlerAdapter,出站同理Outbound

在这里插入图片描述

入、出站是针对Socket而言,Socket读出来输入到管道就是入站,反之就是出站。(客户端和服务器端各有Pipeline,是个相对概念)

例如:编码器就是客户端管道往socket里写,准备传输,则是出站,解码器是往服务器端管道读,则是入站。

10. 编解码器Encoder和Decoder

​ 因为进行的是网络传输,数据在网络中又是二进制的字节码数据,所以发送时编码,接收时解码成对象

在这里插入图片描述

由于不可能知道远程节点是否会一次性发送一个完整的信息,tcp有可能出现粘包拆包的问题。

11.TCP粘包与拆包

  • 概念:发送数据时,会出现数据传输不完整的如下情况:

    (1)D1,D2独立被读,没有发生粘包,拆包

    (2)服务端一次接受到了两个数据包,D1和D2粘合在一起,称之为TCP粘包

    (3)服务端分两次读取,第一次读取到了完整的D1包和D2包的部分内容,第二次读取到了D2包的剩余内容,这称之为TCP拆包

在这里插入图片描述

  • 原因:TCP是面向连接的,面向流的,提供高可靠性服务。收发两端有一成对的socket,因此,发送端为了将多个发给接收端的包,将多次间隔较小且数据量小的数据,合并成一个大的数据块,然后进行封包。这样做虽然提高了效率,但是接收端就难于分辨出完整的数据包了,因为面向流的通信是无消息保护边界

  • 解决方案:

    (1)关键:获取到每次读取数据长度的问题,这个问题解决,就不会出现服务器多读或少读数据的问题,从而避免的TCP 粘包、拆包 。

    (2)通常使用自定义协议包 + 编解码器 来实现

//协议包
public class MessageProtocol {
private int len; //关键
private byte[] content;

public int getLen() {
return len;
}

public void setLen(int len) {
this.len = len;
}

public byte[] getContent() {
return content;
}

public void setContent(byte[] content) {
this.content = content;
}
}

​ 发送方:

(1)编码器:将数据长度、内容写入,方便下一个Handler

public class MyMessageEncoder extends MessageToByteEncoder<MessageProtocol> {
@Override
protected void encode(ChannelHandlerContext ctx, MessageProtocol msg, ByteBuf out) throws Exception {
System.out.println("MyMessageEncoder encode 方法被调用");
out.writeInt(msg.getLen());
out.writeBytes(msg.getContent());
}
}

(2)Handler,直接创建一个对象,往通道里写,进行I/O

//创建协议包对象
MessageProtocol messageProtocol = new MessageProtocol();
messageProtocol.setLen(length);
messageProtocol.setContent(content);
ctx.writeAndFlush(messageProtocol);

接收方:同理,拿到后将字节码转变成自定义的对象,然后handler拿到长度,进行数据操作。