第四章Netty第二节入门案例+channel,future,promise介绍
|字数总计:4.7k|阅读时长:21分钟|阅读量:
需求
开发一个简单的服务器端和客户端
- 客户端向服务器端发送 hello, world
- 服务器仅接收,不返回
1 2 3 4 5
| <dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.39.Final</version> </dependency>
|
服务端
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| new ServerBootstrap() .group(new NioEventLoopGroup()) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<NioSocketChannel>() { protected void initChannel(NioSocketChannel ch) { ch.pipeline().addLast(new StringDecoder()); ch.pipeline().addLast(new SimpleChannelInboundHandler<String>() { @Override protected void channelRead0(ChannelHandlerContext ctx, String msg) { System.out.println(msg); } }); } }) .bind(8080);
|
代码解读:
- 1处创建NIOEventLoopGroup,可以简单理解为线程池+selector
- 2处选择服务socket实现类,其中NIOServerSocketChannel表示基于NIO的服务端实现。其他还有
- 3处,为啥方法骄傲childHandler,是接下来处理器都是给socketChannel用的,而不是给ServerSocketChannel。ChannelInitializer 处理器(仅执行一次),它的作用是待客户端socketChannel建立连接后,执行initChannel以便添加更多的处理器。
- 4 处,ServerSocketChannel 绑定的监听端口
- 5处,socketChannel的处理器,解码ByteBuf=>String
- 6处,socketChannel的业务处理器,使用上一个处理器的处理结果
客户端
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| new Bootstrap() .group(new NioEventLoopGroup()) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<Channel>() { @Override protected void initChannel(Channel ch) { ch.pipeline().addLast(new StringEncoder()); } }) .connect("127.0.0.1", 8080) .sync() .channel() .writeAndFlush(new Date() + ": hello world!");
|
代码解读:
1.1处,创建NIOEventLoopGroup,同server
2. 2处, 选择客户的socket实现类,NIOSocketChannel表示基于NIO的客户端实现类。其他实现还有
- 3处,添加socketChannel的处理器,ChannelInitializer处理器(仅执行一次),它的作用是待客户端socketChannel建立连接后,执行initChannel以便添加跟多的处理器。
- 4 处,指定要连接的服务器和端口
- 5处,Netty中很多方法都是异步的,如connect,这时需要使用sync方法等待connect建立连接完毕。
- 6处,获取channel对象,它即为通道抽象,可以进行数据读写。
- 7处,写入消息并清空缓冲区
- 8处,建立连接后,消息会经过通道handler处理,这里是将string=>ByteBuf发出。
- 数据经过网络传输,到达服务端,服务端5和6处的handlerx先后被触发走完一个流程。
流程梳理
注意:
- 把channel理解为数据的通道
- 把msg理解为流动的数据,最开始输入的是ByteBuf,但经过pipeLine的加工,会变成其他类型的对象,最后输出又变成ByteBuf
- 把handler理解为数据的处理工序
- 工序有多道,合在一起就是pipeline.pipeline负责发布事件(读,读取完成。。。)传播给每个handler,handler对自己感兴趣的事件进行处理(重写了相应事件处理方法)
- handler分Inbound和outbound两类
- 把eventloop理解为处理数据的工人
- 工人可以管理多个channel的io操作。并且一旦工人负责了某个channel就要负责到底(绑定)
- 工人既可以执行io操作,也可以进行任务处理。每位工人有任务队列,队列里可以堆放多个channel待处理的任务,把任务分为普通任务和定时任务。
- 工人按照pipeline顺序,依次按照handler的规划(代码)处理数据,可以为每一道工序执行不同的工人。
组件-EventLoop
事件循环对象 EventLoop
本质上是一个单线程执行器同时维护了一个selector,里面有run方法处理channel上源源不断的io事件。
继承关系:
- 一条线是继承自 j.u.c.ScheduledExecutorService 因此包含了线程池中所有的方法
- 另一条线是继承自 netty 自己的 OrderedEventExecutor
- 提供了boolean inEventLoop(Thread thread)方法判断一个线程是否属于此EventLoop
- 提供了parent方法来看看自己属于哪个EventLoopGroup
事件循环组 EventLoopGroup
是一组EventLoop,channel一般会调用EventLoopGroup的register方法来绑定一个EventLoop,后续这个channel上的io事件都由此EventLoop来处理(保证了io事件处理时的线程安全)
继承自 netty 自己的 EventExecutorGroup
- 实现了 Iterable 接口提供遍历 EventLoop 的能力
- 另有 next 方法获取集合中下一个 EventLoop
以一个简单的实现为例:1 2 3 4 5
| DefaultEventLoopGroup group = new DefaultEventLoopGroup(2); System.out.println(group.next()); System.out.println(group.next()); System.out.println(group.next());
|
输出1 2 3
| io.netty.channel.DefaultEventLoop@60f82f98 io.netty.channel.DefaultEventLoop@35f983a6 io.netty.channel.DefaultEventLoop@60f82f98
|
也可以使用 for 循环
1 2 3 4
| DefaultEventLoopGroup group = new DefaultEventLoopGroup(2); for (EventExecutor eventLoop : group) { System.out.println(eventLoop); }
|
输出
1 2
| io.netty.channel.DefaultEventLoop@60f82f98 io.netty.channel.DefaultEventLoop@35f983a6
|
优雅关闭
优雅关闭 shutdownGracefully
方法。该方法会首先切换 EventLoopGroup
到关闭状态从而拒绝新的任务的加入,然后在任务队列的任务都处理完成后,停止线程的运行。从而确保整体应用是在正常有序的状态下退出的
演示NioEventLoop处理io事件
服务端两个Nio worker工人
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| new ServerBootstrap() .group(new NioEventLoopGroup(1), new NioEventLoopGroup(2)) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<NioSocketChannel>() { @Override protected void initChannel(NioSocketChannel ch) { ch.pipeline().addLast(new ChannelInboundHandlerAdapter() { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { ByteBuf byteBuf = msg instanceof ByteBuf ? ((ByteBuf) msg) : null; if (byteBuf != null) { byte[] buf = new byte[16]; ByteBuf len = byteBuf.readBytes(buf, 0, byteBuf.readableBytes()); log.debug(new String(buf)); } } }); } }).bind(8080).sync();
|
客户端,启动三次,分别修改发送字符串为 zhangsan(第一次),lisi(第二次),wangwu(第三次)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| public static void main(String[] args) throws InterruptedException { Channel channel = new Bootstrap() .group(new NioEventLoopGroup(1)) .handler(new ChannelInitializer<NioSocketChannel>() { @Override protected void initChannel(NioSocketChannel ch) throws Exception { System.out.println("init..."); ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG)); } }) .channel(NioSocketChannel.class).connect("localhost", 8080) .sync() .channel();
channel.writeAndFlush(ByteBufAllocator.DEFAULT.buffer().writeBytes("wangwu".getBytes())); Thread.sleep(2000); channel.writeAndFlush(ByteBufAllocator.DEFAULT.buffer().writeBytes("wangwu".getBytes()));
|
最后输出
1 2 3 4 5 6
| 22:03:34 [DEBUG] [nioEventLoopGroup-3-1] c.i.o.EventLoopTest - zhangsan 22:03:36 [DEBUG] [nioEventLoopGroup-3-1] c.i.o.EventLoopTest - zhangsan 22:05:36 [DEBUG] [nioEventLoopGroup-3-2] c.i.o.EventLoopTest - lisi 22:05:38 [DEBUG] [nioEventLoopGroup-3-2] c.i.o.EventLoopTest - lisi 22:06:09 [DEBUG] [nioEventLoopGroup-3-1] c.i.o.EventLoopTest - wangwu 22:06:11 [DEBUG] [nioEventLoopGroup-3-1] c.i.o.EventLoopTest - wangwu
|
可以看到两个工人轮流处理 channel,但工人与 channel 之间进行了绑定
再增加两个非 nio 工人
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| DefaultEventLoopGroup normalWorkers = new DefaultEventLoopGroup(2); new ServerBootstrap() .group(new NioEventLoopGroup(1), new NioEventLoopGroup(2)) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<NioSocketChannel>() { @Override protected void initChannel(NioSocketChannel ch) { ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG)); ch.pipeline().addLast(normalWorkers,"myhandler", new ChannelInboundHandlerAdapter() { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { ByteBuf byteBuf = msg instanceof ByteBuf ? ((ByteBuf) msg) : null; if (byteBuf != null) { byte[] buf = new byte[16]; ByteBuf len = byteBuf.readBytes(buf, 0, byteBuf.readableBytes()); log.debug(new String(buf)); } } }); } }).bind(8080).sync();
|
客户端代码不变,启动三次,分别修改发送字符串为 zhangsan(第一次),lisi(第二次),wangwu(第三次)
输出
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54
| 22:19:48 [DEBUG] [nioEventLoopGroup-4-1] i.n.h.l.LoggingHandler - [id: 0x251562d5, L:/127.0.0.1:8080 - R:/127.0.0.1:52588] REGISTERED 22:19:48 [DEBUG] [nioEventLoopGroup-4-1] i.n.h.l.LoggingHandler - [id: 0x251562d5, L:/127.0.0.1:8080 - R:/127.0.0.1:52588] ACTIVE 22:19:48 [DEBUG] [nioEventLoopGroup-4-1] i.n.h.l.LoggingHandler - [id: 0x251562d5, L:/127.0.0.1:8080 - R:/127.0.0.1:52588] READ: 8B +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000| 7a 68 61 6e 67 73 61 6e |zhangsan | +--------+-------------------------------------------------+----------------+ 22:19:48 [DEBUG] [nioEventLoopGroup-4-1] i.n.h.l.LoggingHandler - [id: 0x251562d5, L:/127.0.0.1:8080 - R:/127.0.0.1:52588] READ COMPLETE 22:19:48 [DEBUG] [defaultEventLoopGroup-2-1] c.i.o.EventLoopTest - zhangsan 22:19:50 [DEBUG] [nioEventLoopGroup-4-1] i.n.h.l.LoggingHandler - [id: 0x251562d5, L:/127.0.0.1:8080 - R:/127.0.0.1:52588] READ: 8B +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000| 7a 68 61 6e 67 73 61 6e |zhangsan | +--------+-------------------------------------------------+----------------+ 22:19:50 [DEBUG] [nioEventLoopGroup-4-1] i.n.h.l.LoggingHandler - [id: 0x251562d5, L:/127.0.0.1:8080 - R:/127.0.0.1:52588] READ COMPLETE 22:19:50 [DEBUG] [defaultEventLoopGroup-2-1] c.i.o.EventLoopTest - zhangsan 22:20:24 [DEBUG] [nioEventLoopGroup-4-2] i.n.h.l.LoggingHandler - [id: 0x94b2a840, L:/127.0.0.1:8080 - R:/127.0.0.1:52612] REGISTERED 22:20:24 [DEBUG] [nioEventLoopGroup-4-2] i.n.h.l.LoggingHandler - [id: 0x94b2a840, L:/127.0.0.1:8080 - R:/127.0.0.1:52612] ACTIVE 22:20:25 [DEBUG] [nioEventLoopGroup-4-2] i.n.h.l.LoggingHandler - [id: 0x94b2a840, L:/127.0.0.1:8080 - R:/127.0.0.1:52612] READ: 4B +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000| 6c 69 73 69 |lisi | +--------+-------------------------------------------------+----------------+ 22:20:25 [DEBUG] [nioEventLoopGroup-4-2] i.n.h.l.LoggingHandler - [id: 0x94b2a840, L:/127.0.0.1:8080 - R:/127.0.0.1:52612] READ COMPLETE 22:20:25 [DEBUG] [defaultEventLoopGroup-2-2] c.i.o.EventLoopTest - lisi 22:20:27 [DEBUG] [nioEventLoopGroup-4-2] i.n.h.l.LoggingHandler - [id: 0x94b2a840, L:/127.0.0.1:8080 - R:/127.0.0.1:52612] READ: 4B +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000| 6c 69 73 69 |lisi | +--------+-------------------------------------------------+----------------+ 22:20:27 [DEBUG] [nioEventLoopGroup-4-2] i.n.h.l.LoggingHandler - [id: 0x94b2a840, L:/127.0.0.1:8080 - R:/127.0.0.1:52612] READ COMPLETE 22:20:27 [DEBUG] [defaultEventLoopGroup-2-2] c.i.o.EventLoopTest - lisi 22:20:38 [DEBUG] [nioEventLoopGroup-4-1] i.n.h.l.LoggingHandler - [id: 0x79a26af9, L:/127.0.0.1:8080 - R:/127.0.0.1:52625] REGISTERED 22:20:38 [DEBUG] [nioEventLoopGroup-4-1] i.n.h.l.LoggingHandler - [id: 0x79a26af9, L:/127.0.0.1:8080 - R:/127.0.0.1:52625] ACTIVE 22:20:38 [DEBUG] [nioEventLoopGroup-4-1] i.n.h.l.LoggingHandler - [id: 0x79a26af9, L:/127.0.0.1:8080 - R:/127.0.0.1:52625] READ: 6B +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000| 77 61 6e 67 77 75 |wangwu | +--------+-------------------------------------------------+----------------+ 22:20:38 [DEBUG] [nioEventLoopGroup-4-1] i.n.h.l.LoggingHandler - [id: 0x79a26af9, L:/127.0.0.1:8080 - R:/127.0.0.1:52625] READ COMPLETE 22:20:38 [DEBUG] [defaultEventLoopGroup-2-1] c.i.o.EventLoopTest - wangwu 22:20:40 [DEBUG] [nioEventLoopGroup-4-1] i.n.h.l.LoggingHandler - [id: 0x79a26af9, L:/127.0.0.1:8080 - R:/127.0.0.1:52625] READ: 6B +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000| 77 61 6e 67 77 75 |wangwu | +--------+-------------------------------------------------+----------------+ 22:20:40 [DEBUG] [nioEventLoopGroup-4-1] i.n.h.l.LoggingHandler - [id: 0x79a26af9, L:/127.0.0.1:8080 - R:/127.0.0.1:52625] READ COMPLETE 22:20:40 [DEBUG] [defaultEventLoopGroup-2-1] c.i.o.EventLoopTest - wangwu
|
可以看到,nio 工人和 非 nio 工人也分别绑定了 channel(LoggingHandler 由 nio 工人执行,而我们自己的 handler 由非 nio 工人执行)
handler 执行中如何换人?
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) { final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { next.invokeChannelRead(m); } else { executor.execute(new Runnable() { @Override public void run() { next.invokeChannelRead(m); } }); } }
|
- 如果两个handler绑定的是同一个线程,那么在当前线程中直接调用
- 否则,提交给一个新的线程调用。
演示 NioEventLoop 处理普通任务
NioEventLoop 除了可以处理 io 事件,同样可以向它提交普通任务
1 2 3 4 5 6 7 8 9 10 11 12
| EventLoopGroup group = new NioEventLoopGroup(2); group.next().submit(() -> { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } log.debug("ok"); }); log.debug("main");
|
输出
1 2
| 19:52:29 [DEBUG] [main] c.i.n.c.TestEventLoop - main 19:52:30 [DEBUG] [nioEventLoopGroup-2-1] c.i.n.c.TestEventLoop - ok
|
可以用来执行耗时较长的任务,不会被阻塞。
演示 NioEventLoop 处理定时任务
1 2 3 4 5 6 7 8
| EventLoopGroup group = new NioEventLoopGroup(2);
group.next().scheduleAtFixedRate(() -> { log.debug("ok"); }, 0, 1, TimeUnit.SECONDS);
log.debug("main");
|
1 2 3 4
| 19:53:32 [DEBUG] [main] c.i.n.c.TestEventLoop - main 19:53:32 [DEBUG] [nioEventLoopGroup-2-1] c.i.n.c.TestEventLoop - ok 19:53:33 [DEBUG] [nioEventLoopGroup-2-1] c.i.n.c.TestEventLoop - ok 19:53:34 [DEBUG] [nioEventLoopGroup-2-1] c.i.n.c.TestEventLoop - ok
|
可以用来执行定时任务。
组件-Channel
channel的主要方法
- close() 可以用来关闭 channel
- closeFuture() 用来处理 channel 的关闭
- sync 方法作用是同步等待 channel 关闭
- 而 addListener 方法是异步等待 channel 关闭
- pipeline() 方法添加处理器
- write() 方法将数据写入到缓冲区中,但没有将数据发出。
- writeAndFlush() 方法将数据写入缓存去,并立刻将缓冲区数据刷出。会立即将数据发出。
ChannelFuture
1 2 3 4 5 6 7 8 9 10 11 12 13
| ChannelFuture channelFuture = new Bootstrap() .group(new NioEventLoopGroup()) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<Channel>() { @Override protected void initChannel(Channel ch) { ch.pipeline().addLast(new StringEncoder()); } }) .connect("127.0.0.1", 8080);
channelFuture.sync().channel().writeAndFlush(new Date() + ": hello world!");
|
说明:
1处返回的是 ChannelFuture 对象,它的作用是利用 channel() 方法来获取 Channel 对象
注意:connect 方法是异步的,意味着不等连接建立,方法执行就返回了。因此 channelFuture 对象中不能【立刻】获得到正确的 Channel 对象。加了sync才能变成同步,一直阻塞直到建立连接。然后执行后面的才能正确发送数据。
实验如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| ChannelFuture channelFuture = new Bootstrap() .group(new NioEventLoopGroup()) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<Channel>() { @Override protected void initChannel(Channel ch) { ch.pipeline().addLast(new StringEncoder()); } }) .connect("127.0.0.1", 8080);
System.out.println(channelFuture.channel()); channelFuture.sync(); System.out.println(channelFuture.channel());
|
- 执行到 1 时,连接未建立,打印
[id: 0x2e1884dd]
- 执行到 2 时,sync 方法是同步等待连接建立完成
- 执行到 3 时,连接肯定建立了,打印
[id: 0x2e1884dd, L:/127.0.0.1:57191 - R:/127.0.0.1:8080]
除了用 sync 方法可以让异步操作同步以外,还可以使用回调的方式:1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| ChannelFuture channelFuture = new Bootstrap() .group(new NioEventLoopGroup()) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<Channel>() { @Override protected void initChannel(Channel ch) { ch.pipeline().addLast(new StringEncoder()); } }) .connect("127.0.0.1", 8080); System.out.println(channelFuture.channel()); channelFuture.addListener((ChannelFutureListener) future -> { System.out.println(future.channel()); });
|
核心:channelFuture.addListener()。注意带有future,promise的类型都可以和addListener方法配套使用的。
说明:这种方法不同于上面的同步的方法,同步的方法是有main线程完成打印的操作。而这里main线程直接将任务交给其他线程以异步的方式执行,当连接建立完成后自动完成打印操作。
CloseFuture
用法基本同上面的channelFuture。closeFuture是用来处理关闭之后的操作的,一般用来关闭EventLoopGroup。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
| @Slf4j public class CloseFutureClient { public static void main(String[] args) throws InterruptedException { NioEventLoopGroup group new NioEventLoopGroup(); ChannelFuture channelFuture = new Bootstrap() .group(group) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<NioSocketChannel>() { @Override protected void initChannel(NioSocketChannel ch) throws Exception { ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG)); ch.pipeline().addLast(new StringEncoder()); } }) .connect(new InetSocketAddress("localhost", 8080)); Channel channel = channelFuture.sync().channel(); log.debug("{}", channel); new Thread(()->{ Scanner scanner = new Scanner(System.in); while (true) { String line = scanner.nextLine(); if ("q".equals(line)) { channel.close();
break; } channel.writeAndFlush(line); } }, "input").start();
ChannelFuture closeFuture = channel.closeFuture();
closeFuture.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { log.debug("处理关闭之后的操作"); group.shutdownGracefully(); } }); } }
|
组件-Future&Promise
在异步处理的时候,经常用到这两个接口。
netty中的Future和JDK中的Future同名。Netty的Future是继承自JDK的Future。Promise是对Netty中的Future进行拓展(Extend)。
- JDK中的Future只能同步等待任务结束(或成功或失败)才能得到结果。
- netty Future可以同步等待任务结束得到结果,也可以异步(addListener)得到结果。但都要等任务结束
- netty Promise不仅有netty Future的功能,而且可以「主动创建」一个Promise,作为两个线程间传递结果的容器。
功能/名称 |
jdk Future |
netty Future |
Promise |
cancel |
取消任务 |
- |
- |
isCanceled |
任务是否取消 |
- |
- |
isDone |
任务是否完成,不能区分成功失败 |
- |
- |
get |
获取任务结果,阻塞等待 |
- |
- |
getNow |
- |
获取任务结果,非阻塞,还未产生结果时返回 null |
- |
await |
- |
等待任务结束,如果任务失败,不会抛异常,而是通过 isSuccess 判断 |
- |
sync |
- |
等待任务结束,如果任务失败,抛出异常 |
- |
isSuccess |
- |
判断任务是否成功 |
- |
cause |
- |
获取失败信息,非阻塞,如果没有失败,返回null |
- |
addLinstener |
- |
添加回调,异步接收结果 |
- |
setSuccess |
- |
- |
设置成功结果 |
setFailure |
- |
- |
设置失败结果 |
JDK Future
只能通过同步等待任务结束,future.get()会阻塞,直到拿到结果。
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| ExecutorService service = Executors.newFixedThreadPool(2); Future<Integer> future = service.submit(new Callable<Integer>() { @Override public Integer call() throws Exception { log.debug("执行计算"); Thread.sleep(1000); return 50; } }); log.debug("等待结果"); log.debug("结果是 {}", future.get());
|
1 2 3
| 22:29:50 [DEBUG] [pool-1-thread-1] c.i.n.c.TestJdkFuture - 执行计算 22:29:50 [DEBUG] [main] c.i.n.c.TestJdkFuture - 等待结果 22:29:51 [DEBUG] [main] c.i.n.c.TestJdkFuture - 结果是 50
|
netty Future
可以通过同步(get方法会阻塞)也可以通过异步(addListener不会阻塞,交给另外的线程处理)获得结果
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| NioEventLoopGroup group = new NioEventLoopGroup(); EventLoop eventLoop = group.next(); Future<Integer> future = eventLoop.submit(new Callable<Integer>() { @Override public Integer call() throws Exception { log.debug("执行计算"); Thread.sleep(1000); return 70; } });
future.addListener(new GenericFutureListener<Future<? super Integer>>(){ @Override public void operationComplete(Future<? super Integer> future) throws Exception { log.debug("接收结果:{}", future.getNow()); } });
|
上面异步的方式输出结果:
1 2
| 22:34:24 [DEBUG] [nioEventLoopGroup-2-1] c.i.n.c.TestNettyFuture - 执行计算 22:34:25 [DEBUG] [nioEventLoopGroup-2-1] c.i.n.c.TestNettyFuture - 接收结果:70
|
netty Promise(常用的)
Promise 继承自(extend)netty的Future。不仅有Future的功能,而且能主动创建,作为线程 间存放数据的容器
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| EventLoop eventLoop = new NioEventLoopGroup().next(); DefaultPromise<Integer> promise = new DefaultPromise<>(eventLoop); new Thread(() -> { log.debug("开始计算..."); try { int i = 1 / 0; Thread.sleep(1000); promise.setSuccess(80); } catch (Exception e) { e.printStackTrace(); promise.setFailure(e); }
}).start(); log.debug("等待结果..."); log.debug("结果是: {}", promise.get());
|
1 2 3
| 22:37:03 [DEBUG] [main] c.i.n.c.TestNettyPromise - 等待结果... 22:37:03 [DEBUG] [Thread-0] c.i.n.c.TestNettyPromise - 开始计算... 22:37:04 [DEBUG] [main] c.i.n.c.TestNettyPromise - 结果是: 80
|
后续netty一般都用Promise