3.8 既然有 HTTP 协议,为什么还要有 RPC?
Netty学习之NIO基础 - Nyima’s Blog
Netty 学习手册

JavaNIO:

基本概念:

Netty可以理解为NIO的改进,为了更好的理解Netty我们需要先了解NIO:

1.NIO 有三大核心部分: Channel(通道)、Buffer(缓冲区)、Selector(选择器)
2.NIO 是面向缓冲区,或者面向块编程的。
3.NIO 的非阻塞模式,使一个线程从某通道发送请求或者读取数据,但是它仅能得到目前可用的数据,如果目前没有数据可用时,就什么都不会获取,而不是保持线程阻塞,所以直至数据变的可以读取之前,该线程可以继续做其他的事情
4.NIO 是可以做到用一个线程来处理多个操作的。假设有 10000 个请求过来,根据实际情况,可以分配 50 或者 100 个线程来处理。不像之前的阻塞 IO 那样,非得分配 10000 个。
5.HTTP 2.0 使用了多路复用的技术,做到同一个连接并发处理多个请求

image.png

  1. 每个 Channel 都会对应一个 Buffer。
  2. Selector 对应一个线程,一个线程对应多个 Channel(连接)。
  3. 该图反应了有三个 Channel 注册到该 Selector //程序
  4. 程序切换到哪个 Channel 是由事件决定的,Event 就是一个重要的概念。事件是指发生在通道上的某种状态或操作,比如通道已准备好读取数据、通道已准备好写入数据等。
  5. Selector 会根据不同的事件,在各个通道上切换。
  6. Buffer 就是一个内存块,底层是有一个数组。
  7. 数据的读取写入是通过 Buffer,这个和 BIO,BIO 中要么是输入流,或者是输出流,不能双向,但是 NIO 的 Buffer 是可以读也可以写,需要 flip 方法切换 Channel 是双向的,可以返回底层操作系统的情况,比如 Linux,底层的操作系统通道就是双向的。
  8. 客户端总是跟 Buffer 读写, 后由 Channel 完成服务器与 Buffer 间通信

缓冲区:

缓冲区(Buffer):缓冲区本质上是一个可以读写数据的内存块,可以理解成是一个容器对象(含数组)

核心属性:
1
2
3
4
5
6
7
8
9
10
// Invariants: mark <= position <= limit <= capacity
private int mark = -1;
private int position = 0;
private int limit;
private int capacity;

capacity:缓冲区的容量。
limit:缓冲区的界限;
position:下一个读写位置的索引;
mark:记录当前position的值

put方法:

  • put()方法可以将一个数据放入到缓冲区中。
  • 进行该操作后,postition的值会+1,指向下一个可以放入的位置。capacity = limit ,为缓冲区容量的值。


flip()方法
  • flip()方法会切换对缓冲区的操作模式,由写->读 / 读->写
  • 进行该操作后
    • 如果是写模式->读模式,position = 0 , limit 指向最后一个元素的下一个位置,capacity不变
    • 如果是读->写,则恢复为put()方法中的值


get()方法
  • get()方法会读取缓冲区中的一个值
  • 进行该操作后,position会+1,如果超过了limit则会抛出异常


还有rewind()和clean()方法

分散读取

分散读取(Scattering Reads)是指从Channel 中读取的数据“分散”到多个Buffer 中

聚集写入

聚集写入(Gathering Writes)是指将多个Buffer 中的数据“聚集”到Channel

通道:

  1. NIO 的通道类似于流,但有些区别如下:

    • 通道可以同时进行读写,而流只能读或者只能写
    • 通道可以实现异步读写数据
    • 通道可以从缓冲读数据,也可以写数据到缓冲:
    • socket 就绪会形成⼀个通道. 通道代表⼀个就绪的socket

选择器:

NIO中⾮阻塞I/O采⽤了基于Reactor模式的⼯作⽅式,I/O调⽤不会被阻塞,⽽是注册感兴趣的特 定I/O事件,如可读数据到达、新的套接字连接等,在发⽣特定事件时,系统再通知我们。NIO中 实现⾮阻塞I/O的核⼼对象是Selector,Selector是注册各种I/O事件的地⽅,⽽且当那些事件发⽣ 时,就是Seleetor告诉我们所发⽣的事件

  1. Java 的 NIO,用非阻塞的 IO 方式。可以用一个线程,处理多个的客户端连接,就会使用到 Selector(选择器)。
  2. Selector 能够检测多个注册的通道上是否有事件发生(注意:多个 Channel 以事件的方式可以注册到同一个 Selector),如果有事件发生,便获取事件然后针对每个事件进行相应的处理。这样就可以只用一个单线程去管理多个通道,也就是管理多个连接和请求。【示意图】
  3. 只有在连接/通道真正有读写事件发生时,才会进行读写,就大大地减少了系统开销,并且不必为每个连接都创建一个线程,不用去维护多个线程。
  4. 避免了多线程之间的上下文切换导致的开销。

image.png

  1. Netty 的 IO 线程 NioEventLoop 聚合了 Selector(选择器,也叫多路复用器),可以同时并发处理成百上千个客户端连接。
  2. 当线程从某客户端 Socket 通道进行读写数据时,若没有数据可用时,该线程可以进行其他任务。
  3. 线程通常将非阻塞 IO 的空闲时间用于在其他通道上执行 IO 操作,所以单独的线程可以管理多个输入和输出通道。
  4. 由于读写操作都是非阻塞的,这就可以充分提升 IO 线程的运行效率,避免由于频繁 I/O 阻塞导致的线程挂起。
  5. 一个 I/O 线程可以并发处理 N 个客户端连接和读写操作,这从根本上解决了传统同步阻塞 I/O 一连接一线程模型,架构的性能、弹性伸缩能力和可靠性都得到了极大的提升。

Netty概述:

Netty 是一个异步的、基于事件驱动的网络应用框架,用于快速开发可维护、高性能的网络服务器和客户端
注意:netty的异步还是基于多路复用的,并没有实现真正意义上的异步IO

工作原理:

主从反应模型的Reactor:
image.png
简单版:
Netty 主要基于主从 Reactors 多线程模型(如图)做了一定的改进,其中主从 Reactor 多线程模型有多个 Reactor
image.png

  1. BossGroup 线程维护 Selector,只关注 Accecpt
  2. 当接收到 Accept 事件,获取到对应的 SocketChannel,封装成 NIOScoketChannel 并注册到 Worker 线程(事件循环),并进行维护
  3. 当 Worker 线程监听到 Selector 中通道发生自己感兴趣的事件后,就进行处理(就由 handler),注意 handler 已经加入到通道

详细版:
image.png

  1. Netty 抽象出两组线程池 BossGroup 专门负责接收客户端的连接,WorkerGroup 专门负责网络的读写
  2. BossGroup 和 WorkerGroup 类型都是 NioEventLoopGroup
  3. NioEventLoopGroup 相当于一个事件循环组,这个组中含有多个事件循环,每一个事件循环是 NioEventLoop
  4. NioEventLoop 表示一个不断循环的执行处理任务的线程,每个 NioEventLoop 都有一个 Selector,用于监听绑定在其上的 socket 的网络通讯
  5. NioEventLoopGroup 可以有多个线程,即可以含有多个 NioEventLoop
  6. 每个 BossNioEventLoop 循环执行的步骤有 3 步
    • 轮询 accept 事件
    • 处理 accept 事件,与 client 建立连接,生成 NioScocketChannel,并将其注册到某个 worker NIOEventLoop 上的 Selector
    • 处理任务队列的任务,即 runAllTasks
  7. 每个 Worker NIOEventLoop 循环执行的步骤
    • 轮询 read,write 事件
    • 处理 I/O 事件,即 read,write 事件,在对应 NioScocketChannel 处理
    • 处理任务队列的任务,即 runAllTasks
  8. 每个 Worker NIOEventLoop 处理业务时,会使用 pipeline(管道),pipeline 中包含了 channel,即通过 pipeline 可以获取到对应通道,管道中维护了很多的处理器

案例:

eg1:

  1. Netty 服务器在 6668 端口监听,客户端能发送消息给服务器"hello,服务器~"
  2. 服务器可以回复消息给客户端"hello,客户端~"
  3. 目的:对 Netty 线程模型有一个初步认识,便于理解 Netty 模型理论
  4. 看老师代码演示 5.1 编写服务端 5.2 编写客户端 5.3 对 netty 程序进行分析,看看 netty 模型特点 说明:创建 Maven 项目,并引入 Netty 包
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
public class NettyServer {

public static void main(String[] args) throws Exception {

//创建BossGroup 和 WorkerGroup
//说明
//1. 创建两个线程组 bossGroup 和 workerGroup
//2. bossGroup 只是处理连接请求 , 真正的和客户端业务处理,会交给 workerGroup完成
//3. 两个都是无限循环
//4. bossGroup 和 workerGroup 含有的子线程(NioEventLoop)的个数
// 默认实际 cpu核数 * 2
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup(); //8

try {
//创建服务器端的启动对象,配置参数
ServerBootstrap bootstrap = new ServerBootstrap();
//使用链式编程来进行设置
bootstrap.group(bossGroup, workerGroup) //设置两个线程组
.channel(NioServerSocketChannel.class) //使用NioSocketChannel 作为服务器的通道实现
.option(ChannelOption.SO_BACKLOG, 128) // 设置线程队列得到连接个数
.childOption(ChannelOption.SO_KEEPALIVE, true) //设置保持活动连接状态
// .handler(null) // 该 handler对应 bossGroup , childHandler 对应 workerGroup

.childHandler(new ChannelInitializer<SocketChannel>() {//创建一个通道初始化对象(匿名对象)
//给pipeline 设置处理器
@Override
protected void initChannel(SocketChannel ch) throws Exception {
System.out.println("客户socketchannel hashcode=" + ch.hashCode());
//可以使用一个集合管理 SocketChannel, 再推送消息时,可以将业务加入到各个channel
//对应的 NIOEventLoop 的 taskQueue 或者 scheduleTaskQueue
ch.pipeline().addLast(new NettyServerHandler());
}
}); // 给我们的workerGroup 的 EventLoop 对应的管道设置处理器

System.out.println(".....服务器 is ready...");

//绑定一个端口并且同步, 生成了一个 ChannelFuture 对象
//启动服务器(并绑定端口)
ChannelFuture cf = bootstrap.bind(6668).sync();

//给cf 注册监听器,监控我们关心的事件

cf.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (cf.isSuccess()) {
System.out.println("监听端口 6668 成功");
} else {
System.out.println("监听端口 6668 失败");
}
}
});

//对关闭通道进行监听
cf.channel().closeFuture().sync();
}finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44

/**
* 说明
* 1. 我们自定义一个Handler 需要继续netty 规定好的某个HandlerAdapter(规范)
* 2. 这时我们自定义一个Handler , 才能称为一个handler
*/
public class NettyServerHandler extends ChannelInboundHandlerAdapter {

//读取数据实际(这里我们可以读取客户端发送的消息)
/**
* 1. ChannelHandlerContext ctx:上下文对象, 含有 管道pipeline , 通道channel, 地址
* 2. Object msg: 就是客户端发送的数据 默认Object
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {

System.out.println("服务器读取线程 " + Thread.currentThread().getName() + " channle =" + ctx.channel());
System.out.println("server ctx =" + ctx);
System.out.println("看看channel 和 pipeline的关系");
Channel channel = ctx.channel();
ChannelPipeline pipeline = ctx.pipeline(); //本质是一个双向链接, 出站入站

//将 msg 转成一个 ByteBuf
//ByteBuf 是 Netty 提供的,不是 NIO 的 ByteBuffer.
ByteBuf buf = (ByteBuf) msg;
System.out.println("客户端发送消息是:" + buf.toString(CharsetUtil.UTF_8));
System.out.println("客户端地址:" + channel.remoteAddress());
}

//数据读取完毕
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
//writeAndFlush 是 write + flush
//将数据写入到缓存,并刷新
//一般讲,我们对这个发送的数据进行编码
ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 客户端~(>^ω^<)喵1", CharsetUtil.UTF_8));
}

//处理异常, 一般是需要关闭通道
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public class NettyClient {

public static void main(String[] args) throws Exception {

//客户端需要一个事件循环组
EventLoopGroup group = new NioEventLoopGroup();
try {

//创建客户端启动对象
//注意客户端使用的不是 ServerBootstrap 而是 Bootstrap
Bootstrap bootstrap = new Bootstrap();
//设置相关参数
bootstrap.group(group) //设置线程组
.channel(NioSocketChannel.class) // 设置客户端通道的实现类(反射)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new NettyClientHandler()); //加入自己的处理器
}
});

System.out.println("客户端 ok..");
//启动客户端去连接服务器端
//关于 ChannelFuture 要分析,涉及到netty的异步模型
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 6668).sync();
//给关闭通道进行监听
channelFuture.channel().closeFuture().sync();
} finally {
group.shutdownGracefully();
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class NettyClientHandler extends ChannelInboundHandlerAdapter {

//当通道就绪就会触发该方法
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("client " + ctx);
ctx.writeAndFlush(Unpooled.copiedBuffer("hello, server: (>^ω^<)喵", CharsetUtil.UTF_8));
}

//当通道有读取事件时,会触发
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
System.out.println("服务器回复的消息:" + buf.toString(CharsetUtil.UTF_8));
System.out.println("服务器的地址: " + ctx.channel().remoteAddress());
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}

提示: 每个 Channel 都将会被分配⼀个 ChannelPipeline ,而ChannelPipeline中都有一个handler相当于Channel中都绑定了一个handler处理器。
eg2:

  1. 用户程序自定义的普通任务【举例说明】
  2. 用户自定义定时任务
  3. 非当前 Reactor 线程调用 Channel 的各种方法 例如在推送系统的业务线程里面,根据用户的标识,找到对应的 Channel 引用,然后调用 Write 类方法向该用户推送消息,就会进入到这种场景。最终的 Write 会提交到任务队列中后被异步消费

Netty异步模型:

NioEventLoop 中可以按照一定顺序进行数据处理 , 如数据到来后 , 按照下面的流程执行一系列操作 ;
读取数据 -> 数据解码 -> 业务逻辑处理 -> 数据编码 -> 数据发送

NioEventLoop 中封装内容 :

  • 选择器 Selector
  • 任务队列 TaskQueue
  • 调度任务队列 ScheduleTaskQueue
  • NIO 通道 NioChannel
  • 管道 ChannelPipeline

1 . 异步操作概念 :

调用者调用一个异步操作后 , 并不能马上知道该操作的返回值 , 该操作也不会马上执行完成 , 该操作完成后 , 会通过回调机制 , 如 通知 , 注册的回调函数等机制通知调用者 ;

2 . Netty 中的异步操作与 ChannelFuture 返回值 :

① 异步操作 : Netty 模型中凡是关于 IO 的操作 , 如绑定端口 ( Bind ) , 远程连接 ( Connect ) , 读取数据 ( Read ) , 写出数据 ( Write ) 等操作都是异步操作 ;
② 异步操作返回值 : 上述 IO 操作返回值都是 ChannelFuture 类型实例 , ChannelFuture 是异步 IO 操作的返回结果 ;
③ 在服务器端绑定端口号时 , 调用 Bootstrap 的 bind 方法 , 会返回 ChannelFuture 对象 ;
④ 在客户端调用 Bootstrap 的 connect 方法 , 也会返回 ChannelFuture 对象 ;

3 . Netty 中的异步操作机制 :

① Future-Listener 机制 : Future 表示当前不知道结果 , 在未来的某个时刻才知道结果 , Listener 表示监听操作 , 监听返回的结果 ;
② Netty 异步模型的两个基础 : Future ( ChannelFuture 未来知道结果 ) , Callback ( 监听回调 ) ;

4 . 以客户端写出数据到服务器端为例 :

客户端写出数据 : 客户端调用写出数据方法 ChannelFuture writeAndFlush(Object msg) , 向服务器写出数据 ;
操作耗时 : 假设在服务器中接收到该数据后 , 要执行一个非常耗时的操作才能返回结果 , 就是操作非常耗时 ;
客户端不等待 : 客户端这里写出了数据 , 肯定不能阻塞等待写出操作的结果 , 需要立刻执行下面的操作 , 因此该方法是异步的 ;
客户端监听 : writeAndFlush 方法返回一个 ChannelFuture 对象 , 如果客户端需要该操作的返回结果 , 那么通过 ChannelFuture 可以监听该写出方法是否成功 ;
5 . 异步操作返回结果 :
① 返回结果 : Future 表示异步 IO 操作执行结果 , 通过该 Future 提供的 检索 , 计算 等方法检查异步操作是否执行完成 ;
② 常用接口 : ChannelFuture 继承了 Future , 也是一个接口 , 可以为该接口对象注册监听器 , 当异步任务完成后会回调该监听器方法 ;

1
public interface ChannelFuture extends Future<Void>

6 . Future 链式操作 : 这里以读取数据 , 处理后返回结果为例 ;

  • 数据读取操作 ;
  • 对读取的数据进行解码处理 ;
  • 执行业务逻辑
  • 将数据编码 ;
  • 将编码后的数据写出 ;

上述 5 个步骤 , 每个数据处理操作 , 都有与之对应的 Handler 处理器 ;
异步机制 : 在 Handler 处理器中需要实现异步机制 , 一般使用 Callback 回调 , 或 Future 机制 ;
链式操作优势 : 上述的链式操作 , 简洁 , 高效 , 可以让开发者快速开发高性能 , 高可靠性服务器 , 只关注业务逻辑 , 不用过多的将精力浪费在网络基础功能开发上 ;这里的网络基础功能就是高可靠性 , 高性能的网络传输模块 ;

Future-Listener 机制:

1 . Future-Listener 机制 :
① Future 返回值 : 在 Netty 中执行 IO 操作 , 如 bind , read , write , connect 等方法 , 会立刻返回 ChannelFuture 对象 ;
② ChannelFuture 返回时状态 : 调用 IO 方法后 , 立刻返回 ChannelFuture 对象 , 此时该操作未完成 ;
③ 注册监听器 : ChannelFuture 可以设置 ChannelFutureListener 监听器 , 监听该 IO 操作完成状态 , 如果 IO 操作完成 , 那么会回调其 public void operationComplete(ChannelFuture future) throws Exception 接口实现方法 ;
④ IO 操作执行状态判定 : 在 operationComplete 方法中通过 调用 ChannelFuture future 参数的如下方法 , 判定当前 IO 操作完成状态 ;
future.isDone() : IO 操作是否完成 ;
future.isSuccess() : IO 操作是否成功 ; ( 常用 )
future.isCancelled() : IO 操作是否被取消 ;
future.cause() : IO 操作的失败原因 ;

案例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
package com.example.nettyFuture;

/**
* @author redA
* @时间: 2024年03月12日 8:19
*/

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;

/**
* Netty 案例服务器端
*/
public class Server {
public static void main(String[] args) {

// 1. 创建 BossGroup 线程池 和 WorkerGroup 线程池, 其中维护 NioEventLoop 线程
// NioEventLoop 线程中执行无限循环操作

// BossGroup 线程池 : 负责客户端的连接
// 指定线程个数 : 客户端个数很少, 不用很多线程维护, 这里指定线程池中线程个数为 1
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
// WorkerGroup 线程池 : 负责客户端连接的数据读写
EventLoopGroup workerGroup = new NioEventLoopGroup();

// 2. 服务器启动对象, 需要为该对象配置各种参数
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup) // 设置 主从 线程组 , 分别对应 主 Reactor 和 从 Reactor
.channel(NioServerSocketChannel.class) // 设置 NIO 网络套接字通道类型
.option(ChannelOption.SO_BACKLOG, 128) // 设置线程队列维护的连接个数
.childOption(ChannelOption.SO_KEEPALIVE, true) // 设置连接状态行为, 保持连接状态
.childHandler( // 为 WorkerGroup 线程池对应的 NioEventLoop 设置对应的事件 处理器 Handler
new ChannelInitializer<SocketChannel>() {// 创建通道初始化对象
@Override
protected void initChannel(SocketChannel ch) throws Exception {
// 该方法在服务器与客户端连接建立成功后会回调
// 为 管道 Pipeline 设置处理器 Hanedler
// 这里暂时设置为 null , 执行不会失败 , 服务器绑定端口会成功
ch.pipeline().addLast(null);
}
}
);
System.out.println("服务器准备完毕 ...");

ChannelFuture channelFuture = null;
try {
// 绑定本地端口, 进行同步操作 , 并返回 ChannelFuture
channelFuture = bootstrap.bind(8888).sync();
System.out.println("服务器开始监听 8888 端口 ...");

// ( 本次示例核心代码 ) ----------------------------------------------------------
// 监听绑定操作的结果 ( 本次示例核心代码 )
// 添加 ChannelFutureListener 监听器, 监听 bind 操作的结果
channelFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if(future.isDone()){
System.out.println("绑定端口完成");
}

if(future.isSuccess()){
System.out.println("绑定端口成功");
}else{
System.out.println("绑定端口失败");
}

if(future.isCancelled()){
System.out.println("绑定端口取消");
}

System.out.println("失败原因 : " + future.cause());
}
});
// ( 本次示例核心代码 ) ----------------------------------------------------------


// 关闭通道 , 开始监听操作
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
// 出现异常后, 优雅的关闭
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}

}
}

image.png

创建HTTP服务:

  1. 实例要求:使用 IDEA 创建 Netty 项目
  2. Netty 服务器在 6668 端口监听,浏览器发出请求 http://localhost:6668/
  3. 服务器可以回复消息给客户端"Hello!我是服务器5",并对特定请求资源进行过滤。
  4. 目的:Netty 可以做 Http 服务开发,并且理解 Handler 实例和客户端及其请求的关系。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class TestServer {

public static void main(String[] args) throws Exception {

EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();

try {
ServerBootstrap serverBootstrap = new ServerBootstrap();

serverBootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(new TestServerInitializer());

ChannelFuture channelFuture = serverBootstrap.bind(6668).sync();

channelFuture.channel().closeFuture().sync();

} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class TestServerInitializer extends ChannelInitializer<SocketChannel> {

@Override
protected void initChannel(SocketChannel ch) throws Exception {

//向管道加入处理器

//得到管道
ChannelPipeline pipeline = ch.pipeline();

//加入一个netty 提供的httpServerCodec codec =>[coder - decoder]
//HttpServerCodec 说明
//1. HttpServerCodec 是netty 提供的处理http的 编-解码器
pipeline.addLast("MyHttpServerCodec", new HttpServerCodec());
//2. 增加一个自定义的handler
pipeline.addLast("MyTestHttpServerHandler", new TestHttpServerHandler());

System.out.println("ok~~~~");
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
/**
* 说明
* 1. SimpleChannelInboundHandler 是 ChannelInboundHandlerAdapter
* 2. HttpObject 客户端和服务器端相互通讯的数据被封装成 HttpObject
*/
public class TestHttpServerHandler extends SimpleChannelInboundHandler<HttpObject> {

//channelRead0 读取客户端数据
@Override
protected void channelRead0(ChannelHandlerContext ctx, HttpObject msg) throws Exception {

System.out.println("对应的channel=" + ctx.channel() + " pipeline=" + ctx
.pipeline() + " 通过pipeline获取channel" + ctx.pipeline().channel());

System.out.println("当前ctx的handler=" + ctx.handler());

//判断 msg 是不是 httprequest请求
if (msg instanceof HttpRequest) {

System.out.println("ctx 类型=" + ctx.getClass());

System.out.println("pipeline hashcode" + ctx.pipeline().hashCode() + " TestHttpServerHandler hash=" + this.hashCode());

System.out.println("msg 类型=" + msg.getClass());
System.out.println("客户端地址" + ctx.channel().remoteAddress());

//获取到
HttpRequest httpRequest = (HttpRequest) msg;
//获取uri, 过滤指定的资源
URI uri = new URI(httpRequest.uri());
if ("/favicon.ico".equals(uri.getPath())) {
System.out.println("请求了 favicon.ico, 不做响应");
return;
}
//回复信息给浏览器 [http协议]

ByteBuf content = Unpooled.copiedBuffer("hello, 我是服务器", CharsetUtil.UTF_8);

//构造一个http的相应,即 httpresponse
FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, content);

response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain");
response.headers().set(HttpHeaderNames.CONTENT_LENGTH, content.readableBytes());

//将构建好 response返回
ctx.writeAndFlush(response);

}
}
}

核心组件:

Netty 学习手册

源码分析1:

【Netty 从成神到升仙系列 一】Netty 服务端的启动源码剖析(一)-CSDN博客