1. 案例需求
编写一个 NIO 群聊系统,实现服务器端和客户端之间的数据简单通讯(非阻塞)实现多人群聊服务器端:可以监测用户上线,离线,并实现消息转发功能客户端:通过channel 可以无阻塞发送消息给其它所有用户,同时可以接受其它用户发送的消息(有服务器转发得到)
目的:进一步理解NIO非阻塞网络编程机制
2. 流程
服务端
- 监听客户端连接
- 初始化工作,实例化selector,实例化serverSocketChannel.设置非阻塞。注册ACCEPT事件
- 监听客户端连接请求,将socketChannel注册到selector,注册为READ事件。
- 读取客户端信息,打印输出
- 将客户端信息转发到其他的客户端
- 获取所有的keys。如果channel instanceof socketChannel && 不是当前的客户端的channel,向其他客户端channel写入数据。
客户端
- 发送信息
初始化工作,实例化selector,实例化SocketChannel,连接服务器.设置非阻塞。向socketChannel写入信息
- 接受信息
selector.select()看是否有事件,有则遍历key,获取通道读取数据。
注意:读写数据都是通过channel。
三种方法获取channel:
- serverSocketChannel.accept() 服务端
- socketChannel.open(new InetSocketAddress(“127.0.0.1”, PORT)); 客户端
- SocketChannel sc = (SocketChannel) key.channel(); 通过selector的selectionKey获取(事件获取)
3. 代码
服务端
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122
| public class GroupChatServer { private Selector selector; private ServerSocketChannel listenChannel; private static final int PORT = 6667; public GroupChatServer() { try { selector = Selector.open(); listenChannel = ServerSocketChannel.open(); listenChannel.socket().bind(new InetSocketAddress(PORT)); listenChannel.configureBlocking(false); listenChannel.register(selector, SelectionKey.OP_ACCEPT); }catch (IOException e) { e.printStackTrace(); } } public void listen() { System.out.println("监听线程: " + Thread.currentThread().getName()); try { while (true) { int count = selector.select(); if(count > 0) { Iterator<SelectionKey> iterator = selector.selectedKeys().iterator(); while (iterator.hasNext()) { SelectionKey key = iterator.next(); if(key.isAcceptable()) { SocketChannel sc = listenChannel.accept(); sc.configureBlocking(false); sc.register(selector, SelectionKey.OP_READ); System.out.println(sc.getRemoteAddress() + " 上线 "); } if(key.isReadable()) { readData(key); } iterator.remove(); }
} else { System.out.println("等待...."); } } }catch (Exception e) { e.printStackTrace(); }finally { } }
private void readData(SelectionKey key) { SocketChannel channel = null; try { channel = (SocketChannel) key.channel(); ByteBuffer buffer = ByteBuffer.allocate(1024); int count = channel.read(buffer); if(count > 0) { String msg = new String(buffer.array()); System.out.println("form 客户端: " + msg);
sendInfoToOtherClients(msg, channel); } }catch (IOException e) { try { System.out.println(channel.getRemoteAddress() + " 离线了.."); key.cancel(); channel.close(); }catch (IOException e2) { e2.printStackTrace();; } } } private void sendInfoToOtherClients(String msg, SocketChannel self ) throws IOException{ System.out.println("服务器转发消息中..."); System.out.println("服务器转发数据给客户端线程: " + Thread.currentThread().getName()); for(SelectionKey key: selector.keys()) { Channel targetChannel = key.channel(); if(targetChannel instanceof SocketChannel && targetChannel != self) { SocketChannel dest = (SocketChannel)targetChannel; ByteBuffer buffer = ByteBuffer.wrap(msg.getBytes()); dest.write(buffer); } }
} public static void main(String[] args) { GroupChatServer groupChatServer = new GroupChatServer(); groupChatServer.listen(); } }
|
客户端
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87
| public class GroupChatClient {
private final String HOST = "127.0.0.1"; private final int PORT = 6667; private Selector selector; private SocketChannel socketChannel; private String username; public GroupChatClient() throws IOException { selector = Selector.open(); socketChannel = socketChannel.open(new InetSocketAddress("127.0.0.1", PORT)); socketChannel.configureBlocking(false); socketChannel.register(selector, SelectionKey.OP_READ); username = socketChannel.getLocalAddress().toString().substring(1); System.out.println(username + " is ok..."); }
public void sendInfo(String info) { info = username + " 说:" + info; try { socketChannel.write(ByteBuffer.wrap(info.getBytes())); }catch (IOException e) { e.printStackTrace(); } } public void readInfo() { try { int readChannels = selector.select(); if(readChannels > 0) { Iterator<SelectionKey> iterator = selector.selectedKeys().iterator(); while (iterator.hasNext()) { SelectionKey key = iterator.next(); if(key.isReadable()) { SocketChannel sc = (SocketChannel) key.channel(); ByteBuffer buffer = ByteBuffer.allocate(1024); sc.read(buffer); String msg = new String(buffer.array()); System.out.println(msg.trim()); } } iterator.remove(); } else { }
}catch (Exception e) { e.printStackTrace(); } }
public static void main(String[] args) throws Exception { GroupChatClient chatClient = new GroupChatClient(); new Thread() { public void run() { while (true) { chatClient.readInfo(); try { Thread.currentThread().sleep(3000); }catch (InterruptedException e) { e.printStackTrace(); } } } }.start(); Scanner scanner = new Scanner(System.in); while (scanner.hasNextLine()) { String s = scanner.nextLine(); chatClient.sendInfo(s); } } }
|