粘包与半包
粘包
现象:发送abc def,接受到abcdef
原因:
- 应用层:接收方ByteBuf设置太大(Netty默认是1024)
- 传输层滑动窗口: 假设发送方256 bytes表示一个完整的报文,接收方的滑动窗口来不及处理且滑动窗口足够大,导致多个报文在一个滑动窗口中,导致粘包。(TCP协议)
- Nagle算法:会造成粘包。Nagle算法是TCP协议中的一种机制,有时候发送一个字节,也需要加入TCP头和IP头,有点浪费网络。为了提高网络利用率,会将少量数据进行延迟发送,积攒起来一起发送。会造成粘包现象。
半包
现象:发送abcdef,接受abc def
原因:
- 应用层:接收方的ByteBuf小于实际发送的数据,导致一个数据报文被拆分了。
- 传输层滑动窗口:假设接收方的滑动窗口大小为128bytes,这时候发送方发了256bytes,滑动窗口接受不过来,让发送方只能先发128bytes,等待ACK后,才能发送剩余的数据。造成一个完整的报文被拆分开。(传输层TCP协议)
- MSS(max segment size)限制:当发送的数据超出MSS限制后,会将数据切分后发送,会造成半包。
本质上是因为TCP协议是基于字节流的(首部没有长度),消息没有边界,所以会造成粘包和半包现象。UDP是面向报文的(首部有长度),所以不会有粘包和半包现象。
解决方案
方法1 短链接
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44
| public class HelloWorldClient { static final Logger log = LoggerFactory.getLogger(HelloWorldClient.class);
public static void main(String[] args) { for (int i = 0; i < 10; i++) { send(); } }
private static void send() { NioEventLoopGroup worker = new NioEventLoopGroup(); try { Bootstrap bootstrap = new Bootstrap(); bootstrap.channel(NioSocketChannel.class); bootstrap.group(worker); bootstrap.handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { log.debug("conneted..."); ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG)); ch.pipeline().addLast(new ChannelInboundHandlerAdapter() { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { log.debug("sending..."); ByteBuf buffer = ctx.alloc().buffer(); buffer.writeBytes(new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15}); ctx.writeAndFlush(buffer); ctx.close(); } }); } }); ChannelFuture channelFuture = bootstrap.connect("localhost", 8080).sync(); channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) { log.error("client error", e); } finally { worker.shutdownGracefully(); } } }
|
短链接相当于发送一个数据就断开一次。只能解决粘包。接受的ByteBuf还是有限的,还是会有半包问题。
方法2 固定长度
客户端和服务端商量好发送的数据包具有固定长度。如果不够长,就用占位符占位。
让所有的数据包长度固定(假设长度为10字节),服务端加入FixedLengthFrameDecoder进行处理。
1
| ch.pipeline().addLast(new FixedLengthFrameDecoder(10));
|
客户端测试代码,注意, 采用这种方法后,客户端什么时候 flush 都可以
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45
| public class HelloWorldClient { static final Logger log = LoggerFactory.getLogger(HelloWorldClient.class);
public static void main(String[] args) { NioEventLoopGroup worker = new NioEventLoopGroup(); try { Bootstrap bootstrap = new Bootstrap(); bootstrap.channel(NioSocketChannel.class); bootstrap.group(worker); bootstrap.handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { log.debug("connetted..."); ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG)); ch.pipeline().addLast(new ChannelInboundHandlerAdapter() { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { log.debug("sending..."); Random r = new Random(); char c = 'a'; ByteBuf buffer = ctx.alloc().buffer(); for (int i = 0; i < 10; i++) { byte[] bytes = new byte[8]; for (int j = 0; j < r.nextInt(8); j++) { bytes[j] = (byte) c; } c++; buffer.writeBytes(bytes); } ctx.writeAndFlush(buffer); } }); } }); ChannelFuture channelFuture = bootstrap.connect("192.168.0.103", 9090).sync(); channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) { log.error("client error", e); } finally { worker.shutdownGracefully(); } } }
|
缺点是数据包的大小不好把握。长度只能定的太大,占位符就会多。浪费空间。因此长度最好定位最大的数据包的长度。仅仅适用于提前知道数据包的长度的情况。
方法3 固定分隔符
客户端和服务端商量好发送的数据包具有特定的分隔符。服务端用LineBasedFrameDecoder处理,自动将接受到的消息用\n进行分隔。
1
| ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
|
客户端在每条消息之后,加入\n分隔符。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43
| public class HelloWorldClient { static final Logger log = LoggerFactory.getLogger(HelloWorldClient.class);
public static void main(String[] args) { NioEventLoopGroup worker = new NioEventLoopGroup(); try { Bootstrap bootstrap = new Bootstrap(); bootstrap.channel(NioSocketChannel.class); bootstrap.group(worker); bootstrap.handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { log.debug("connetted..."); ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG)); ch.pipeline().addLast(new ChannelInboundHandlerAdapter() { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { log.debug("sending..."); Random r = new Random(); char c = 'a'; ByteBuf buffer = ctx.alloc().buffer(); for (int i = 0; i < 10; i++) { for (int j = 1; j <= r.nextInt(16)+1; j++) { buffer.writeByte((byte) c); } buffer.writeByte(10); c++; } ctx.writeAndFlush(buffer); } }); } }); ChannelFuture channelFuture = bootstrap.connect("192.168.0.103", 9090).sync(); channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) { log.error("client error", e); } finally { worker.shutdownGracefully(); } } }
|
如果本身传输的数据中有分隔符,会解析错误
方法4 预设长度(推荐)
在发送消息前,先约定用定长字节表示接下来数据的长度。服务端用LengthFieldBasedFrameDecoder来处理。
1 2
| ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(1024, 0, 1, 0, 1));
|
测试代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
| package cn.itcast.advance.c1;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; import io.netty.channel.embedded.EmbeddedChannel; import io.netty.handler.codec.LengthFieldBasedFrameDecoder; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler;
public class TestLengthFieldDecoder { public static void main(String[] args) { EmbeddedChannel channel = new EmbeddedChannel( new LengthFieldBasedFrameDecoder( 1024, 0, 4, 1,4), new LoggingHandler(LogLevel.DEBUG) );
ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(); send(buffer, "Hello, world"); send(buffer, "Hi!"); channel.writeInbound(buffer); }
private static void send(ByteBuf buffer, String content) { byte[] bytes = content.getBytes(); int length = bytes.length; buffer.writeInt(length); buffer.writeByte(1); buffer.writeBytes(bytes); } }
|