1. 案例需求
编写一个 NIO 群聊系统,实现服务器端和客户端之间的数据简单通讯(非阻塞)实现多人群聊服务器端:可以监测用户上线,离线,并实现消息转发功能客户端:通过channel 可以无阻塞发送消息给其它所有用户,同时可以接受其它用户发送的消息(有服务器转发得到)
目的:进一步理解NIO非阻塞网络编程机制


2. 流程
服务端
- 监听客户端连接
 
- 初始化工作,实例化selector,实例化serverSocketChannel.设置非阻塞。注册ACCEPT事件
 
- 监听客户端连接请求,将socketChannel注册到selector,注册为READ事件。
 
- 读取客户端信息,打印输出
 
- 将客户端信息转发到其他的客户端
 
- 获取所有的keys。如果channel instanceof socketChannel && 不是当前的客户端的channel,向其他客户端channel写入数据。
 
客户端
- 发送信息
初始化工作,实例化selector,实例化SocketChannel,连接服务器.设置非阻塞。向socketChannel写入信息 
- 接受信息
selector.select()看是否有事件,有则遍历key,获取通道读取数据。
注意:读写数据都是通过channel。
三种方法获取channel: 
- serverSocketChannel.accept() 服务端 
 
- socketChannel.open(new InetSocketAddress(“127.0.0.1”, PORT)); 客户端 
 
- SocketChannel sc = (SocketChannel) key.channel(); 通过selector的selectionKey获取(事件获取)
 
3. 代码
服务端
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122
   | public class GroupChatServer {          private Selector selector;     private ServerSocketChannel listenChannel;     private static final int PORT = 6667;               public GroupChatServer() {         try {                          selector = Selector.open();                          listenChannel =  ServerSocketChannel.open();                          listenChannel.socket().bind(new InetSocketAddress(PORT));                          listenChannel.configureBlocking(false);                          listenChannel.register(selector, SelectionKey.OP_ACCEPT);         }catch (IOException e) {             e.printStackTrace();         }     }          public void listen() {         System.out.println("监听线程: " + Thread.currentThread().getName());         try {                          while (true) {                 int count = selector.select();                 if(count > 0) {                                          Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();                     while (iterator.hasNext()) {                                                  SelectionKey key = iterator.next();                                                  if(key.isAcceptable()) {                             SocketChannel sc = listenChannel.accept();                             sc.configureBlocking(false);                                                          sc.register(selector, SelectionKey.OP_READ);                                                          System.out.println(sc.getRemoteAddress() + " 上线 ");                         }                         if(key.isReadable()) {                                                           readData(key);                         }                                                  iterator.remove();                     }
                  } else {                     System.out.println("等待....");                 }             }         }catch (Exception e) {             e.printStackTrace();         }finally {                      }     }
           private void readData(SelectionKey key) {                  SocketChannel channel = null;         try {                         channel = (SocketChannel) key.channel();                          ByteBuffer buffer = ByteBuffer.allocate(1024);             int count = channel.read(buffer);                          if(count > 0) {                                  String msg = new String(buffer.array());                                  System.out.println("form 客户端: " + msg);
                                   sendInfoToOtherClients(msg, channel);             }         }catch (IOException e) {             try {                 System.out.println(channel.getRemoteAddress() + " 离线了..");                                  key.cancel();                                  channel.close();             }catch (IOException e2) {                 e2.printStackTrace();;             }         }     }          private void sendInfoToOtherClients(String msg, SocketChannel self ) throws  IOException{         System.out.println("服务器转发消息中...");         System.out.println("服务器转发数据给客户端线程: " + Thread.currentThread().getName());                  for(SelectionKey key: selector.keys()) {                          Channel targetChannel = key.channel();                          if(targetChannel instanceof  SocketChannel && targetChannel != self) {                                  SocketChannel dest = (SocketChannel)targetChannel;                                  ByteBuffer buffer = ByteBuffer.wrap(msg.getBytes());                                  dest.write(buffer);             }         }
      }     public static void main(String[] args) {                  GroupChatServer groupChatServer = new GroupChatServer();         groupChatServer.listen();     } }
  | 
 
客户端
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87
   |  public class GroupChatClient {
           private final String HOST = "127.0.0.1";      private final int PORT = 6667;      private Selector selector;     private SocketChannel socketChannel;     private String username;          public GroupChatClient() throws IOException {         selector = Selector.open();                  socketChannel = socketChannel.open(new InetSocketAddress("127.0.0.1", PORT));                  socketChannel.configureBlocking(false);                  socketChannel.register(selector, SelectionKey.OP_READ);                  username = socketChannel.getLocalAddress().toString().substring(1);         System.out.println(username + " is ok...");     }
           public void sendInfo(String info) {         info = username + " 说:" + info;         try {             socketChannel.write(ByteBuffer.wrap(info.getBytes()));         }catch (IOException e) {             e.printStackTrace();         }     }          public void readInfo() {         try {             int readChannels = selector.select();             if(readChannels > 0) {                 Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();                 while (iterator.hasNext()) {                     SelectionKey key = iterator.next();                     if(key.isReadable()) {                                                 SocketChannel sc = (SocketChannel) key.channel();                                                 ByteBuffer buffer = ByteBuffer.allocate(1024);                                                  sc.read(buffer);                                                  String msg = new String(buffer.array());                         System.out.println(msg.trim());                     }                 }                 iterator.remove();              } else {                              }
          }catch (Exception e) {             e.printStackTrace();         }     }
      public static void main(String[] args) throws Exception {                  GroupChatClient chatClient = new GroupChatClient();                  new Thread() {             public void run() {                 while (true) {                     chatClient.readInfo();                     try {                         Thread.currentThread().sleep(3000);                     }catch (InterruptedException e) {                         e.printStackTrace();                     }                 }             }         }.start();                  Scanner scanner = new Scanner(System.in);         while (scanner.hasNextLine()) {             String s = scanner.nextLine();             chatClient.sendInfo(s);         }     } }
 
 
  |