未读代码 未读代码
首页
  • Java 18 新功能介绍
  • Java 17 新功能介绍
  • Java 16 新功能介绍
  • Java 15 新功能介绍
  • Java 14 新功能介绍
  • Java 8 新特性

    • Java 8 Lambda 表达式
    • Java 8 Stream 流式操作
    • Java 8 时间处理介绍
    • Java 8 Optional 介绍
  • Java 开发工具
Java 源码分析
Spring Boot 系列
  • Arthas 问题定位
  • JMH 基准测试
GitHub (opens new window)
首页
  • Java 18 新功能介绍
  • Java 17 新功能介绍
  • Java 16 新功能介绍
  • Java 15 新功能介绍
  • Java 14 新功能介绍
  • Java 8 新特性

    • Java 8 Lambda 表达式
    • Java 8 Stream 流式操作
    • Java 8 时间处理介绍
    • Java 8 Optional 介绍
  • Java 开发工具
Java 源码分析
Spring Boot 系列
  • Arthas 问题定位
  • JMH 基准测试
GitHub (opens new window)
  • Java 开发

  • Java 开发工具

  • 消息中间件

    • IO通信模型(一)同步阻塞模式BIO(Blocking IO)
    • IO通信模型(二)同步非阻塞模式NIO(NonBlocking IO)
      • IO通信模型(三)多路复用IO
      • 消息队列中间件(一)介绍
      • 消息队列中间件(二)使用 ActiveMQ
      • 消息队列中间件(三)Kafka 入门指南
    • Java 开发
    • 消息中间件
    程序猿阿朗
    2018-10-25

    IO通信模型(二)同步非阻塞模式NIO(NonBlocking IO)

    # 同步非阻塞模式(NonBlocking IO)

    在非阻塞模式中,发出Socket的accept()和read()操作时,如果内核中的数据还没有准备好,那么它并不会阻塞用户进程,而是立刻返回一个信息。也就是说进程发起一个read操作后,并不需要一直阻塞等待,而是马上就得到了一个结果。 如果结果发现数据准备完毕就可以读取数据,然后拷贝到用户内存。如果结果发现数据没有就绪也会返回,进程继续不断的主动询问数据的准备情况是非阻塞模式的一个特点。 多路复用IO 伪代码表示如下:

    {
    	while(read(socket, buffer) != SUCCESS){    
    	}
    	process(buffer);
    }
    
    1
    2
    3
    4
    5

    # Java同步非阻塞模式

    如上所述,Java的Socket是阻塞模式的典型应用。在发起accpet()和read()请求之后会持续阻塞,但是Java中提供了setSoTimeout()方法设置超时时间,在固定时间内没有得到结果,就会结束本次阻塞,等待进行下一次的阻塞轮询。这是,也就实现了应用层面的非阻塞。

    Java中Socket中的setSoTimeout()方法:

    public synchronized void setSoTimeout(int timeout) throws SocketException {
        if (isClosed())
            throw new SocketException("Socket is closed");
        if (timeout < 0)
            throw new IllegalArgumentException("timeout can't be negative");
        getImpl().setOption(SocketOptions.SO_TIMEOUT, new Integer(timeout));
    }
    
    1
    2
    3
    4
    5
    6
    7

    # Java同步非阻塞模式编码

    通过设置setSoTimeout()使阻塞模式的服务端accpet()和read()优化为非阻塞模式。 SocketServerNioListenAndRead.java

    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import java.io.IOException;
    import java.io.InputStream;
    import java.io.OutputStream;
    import java.net.ServerSocket;
    import java.net.Socket;
    import java.net.SocketException;
    import java.net.SocketTimeoutException;
    
    /**
     * <p>
     * 非阻塞IO - 监听非阻塞 - 读取非阻塞
     *
     * @Author niujinpeng
     * @Date 2018/10/15 14:53
     */
    public class SocketServerNioListenAndRead {
        /**
         * 日志
         */
        private static final Logger logger = LoggerFactory.getLogger(SocketServerNioListenAndRead.class);
        private static Object xWait = new Object();
    
        public static void main(String[] args) throws IOException {
            ServerSocket serverSocket = null;
    
            try {
                serverSocket = new ServerSocket(83);
                serverSocket.setSoTimeout(100);
                while (true) {
                    Socket socket = null;
                    try {
                        socket = serverSocket.accept();
                    } catch (SocketTimeoutException e) {
                        synchronized (SocketServerNioListenAndRead.xWait) {
                            logger.info("没有从底层接收到任务数据报文,等待10ms,,模拟事件X的处理时间");
                            SocketServerNioListenAndRead.xWait.wait(10);
                        }
                        continue;
                    }
    
                    InputStream input = socket.getInputStream();
                    OutputStream output = socket.getOutputStream();
                    Integer sourcePort = socket.getPort();
                    int maxLen = 2048;
                    byte[] contentBytes = new byte[maxLen];
                    int realLen;
                    StringBuffer message = new StringBuffer();
    
                    // 接收消息非阻塞实现
                    socket.setSoTimeout(10);
    
                    BIORead:
                    while (true) {
                        try {
                            // 读取的时候,程序会阻塞,知道系统把网络传过来的数据准备完毕
                            while ((realLen = input.read(contentBytes, 0, maxLen)) != -1) {
                                message.append(new String(contentBytes, 0, realLen));
                                /**
                                 * 如果收到over,表示传送完毕
                                 */
                                if (message.toString().endsWith("over")) {
                                    break BIORead;
                                }
                            }
                        } catch (SocketTimeoutException e) {
                            //===========================================================
                            //      执行到这里,说明本次read没有接收到任何数据流
                            //      主线程在这里又可以做一些事情,记为Y
                            //===========================================================
                            logger.info("这次没有从底层接收到任务数据报文,等待10毫秒,模拟事件Y的处理时间");
                            continue;
                        }
    
                    }
    
                    // 输出信息
                    logger.info("服务器收到来自端口" + sourcePort + "的消息:" + message.toString());
                    // 响应
                    output.write("Done!".getBytes());
    
                    output.close();
                    input.close();
                    socket.close();
                }
            } catch (SocketException | InterruptedException e) {
                logger.error(e.getMessage(), e);
            } finally {
                if (serverSocket != null) {
                    serverSocket.close();
                }
            }
        }
    
    }
    
    
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100

    上面的代码可以实现监听和读取数据的非阻塞,但是还是只能一个一个的处理,可以使用多线程稍微改进。 SocketServerNioListenThread.java

    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import java.io.InputStream;
    import java.io.OutputStream;
    import java.net.ServerSocket;
    import java.net.Socket;
    import java.net.SocketTimeoutException;
    
    /**
     * <p>
     * 非阻塞IO - 监听非阻塞 - 读取非阻塞
     * 通过加入线程的概念,让socket server能够在应用层面
     * 通过非阻塞的方式同时处理多个socket套接字
     * <p>
     * 此时可以实现非阻塞的IO,但是因为调用了系统底层的阻塞同步IO,
     * 因此仍然没有从根本上解决问题
     *
     * @Author niujinpeng
     * @Date 2018/10/15 15:23
     */
    public class SocketServerNioListenThread {
    
        private static Object xWait = new Object();
    
        private static final Logger LOGGER = LoggerFactory.getLogger(SocketServerNioListenThread.class);
    
        public static void main(String[] args) throws Exception {
            ServerSocket serverSocket = new ServerSocket(83);
            serverSocket.setSoTimeout(100);
            try {
                while (true) {
                    Socket socket = null;
                    try {
                        socket = serverSocket.accept();
                    } catch (SocketTimeoutException e1) {
                        //===========================================================
                        //      执行到这里,说明本次accept没有接收到任何TCP连接
                        //      主线程在这里就可以做一些事情,记为X
                        //===========================================================
                        synchronized (SocketServerNioListenThread.xWait) {
                            LOGGER.info("这次没有从底层接收到任何TCP连接,等待10毫秒,模拟事件X的处理时间");
                            SocketServerNioListenThread.xWait.wait(10);
                        }
                        continue;
                    }
                    //当然业务处理过程可以交给一个线程(这里可以使用线程池),并且线程的创建是很耗资源的。
                    //最终改变不了.accept()只能一个一个接受socket连接的情况
                    SocketServerThread socketServerThread = new SocketServerThread(socket);
                    new Thread(socketServerThread).start();
                }
            } catch (Exception e) {
                LOGGER.error(e.getMessage(), e);
            } finally {
                if (serverSocket != null) {
                    serverSocket.close();
                }
            }
        }
    }
    
    /**
     * 当然,接收到客户端的socket后,业务的处理过程可以交给一个线程来做。
     * 但还是改变不了socket被一个一个的做accept()的情况。
     *
     * @author niujinpeng
     */
    class SocketServerThread implements Runnable {
    
        /**
         * 日志
         */
        private static final Logger LOGGER = LoggerFactory.getLogger(SocketServerThread.class);
    
        private Socket socket;
    
        public SocketServerThread(Socket socket) {
            this.socket = socket;
        }
    
        @Override
        public void run() {
            InputStream in = null;
            OutputStream out = null;
            try {
                in = socket.getInputStream();
                out = socket.getOutputStream();
                Integer sourcePort = socket.getPort();
                int maxLen = 2048;
                byte[] contextBytes = new byte[maxLen];
                int realLen;
                StringBuffer message = new StringBuffer();
                //下面我们收取信息(设置成非阻塞方式,这样read信息的时候,又可以做一些其他事情)
                this.socket.setSoTimeout(10);
                BIORead:
                while (true) {
                    try {
                        while ((realLen = in.read(contextBytes, 0, maxLen)) != -1) {
                            message.append(new String(contextBytes, 0, realLen));
                            /*
                             * 我们假设读取到“over”关键字,
                             * 表示客户端的所有信息在经过若干次传送后,完成
                             * */
                            if (message.indexOf("over") != -1) {
                                break BIORead;
                            }
                        }
                    } catch (SocketTimeoutException e2) {
                        //===========================================================
                        //      执行到这里,说明本次read没有接收到任何数据流
                        //      主线程在这里又可以做一些事情,记为Y
                        //===========================================================
                        LOGGER.info("这次没有从底层接收到任务数据报文,等待10毫秒,模拟事件Y的处理时间");
                        continue;
                    }
                }
                //下面打印信息
                Long threadId = Thread.currentThread().getId();
                LOGGER.info("服务器(线程:" + threadId + ")收到来自于端口:" + sourcePort + "的信息:" + message);
    
                //下面开始发送信息
                out.write("回发响应信息!".getBytes());
    
                //关闭
                out.close();
                in.close();
                this.socket.close();
            } catch (Exception e) {
                LOGGER.error(e.getMessage(), e);
            }
        }
    }
    
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    117
    118
    119
    120
    121
    122
    123
    124
    125
    126
    127
    128
    129
    130
    131
    132
    133
    134

    # 同步非阻塞模式总结

    用户需要不断地调用,尝试读取数据,直到读取成功后,才继续处理接收的数据。整个IO请求的过程中,虽然用户线程每次发起IO请求后可以立即返回,但是为了等到数据,仍需要不断地轮询、重复请求,消耗了大量的CPU的资源。一般很少直接使用这种模型,而是在其他IO模型中使用非阻塞IO这一特性。

    开发难度相对于阻塞IO模式较难,适合并发小且不需要及时响应的网络应用开发。

    GitHub 源码:https://github.com/niumoo/java-toolbox/ (opens new window) 此文参考文章:IO复用,AIO,BIO,NIO,同步,异步,阻塞和非阻塞 (opens new window) 此文参考文章:6.2 I/O Models (opens new window)

    订阅

    文章持续更新,订阅可以关注「 程序猿阿朗 」公众号或者未读代码博客。

    文章作者: 程序猿阿朗
    文章链接:https://www.wdbyte.com/2018/10/io/io2-nio/
    版权声明:本网站当前文章采用 CC BY-NC-SA 4.0 许可协议。转载请注明来自 未读代码!
    #NIO#通信模型#非阻塞IO#NonBlocking IO
    上次更新: 2022/12/05, 08:18:32
    IO通信模型(一)同步阻塞模式BIO(Blocking IO)
    IO通信模型(三)多路复用IO

    ← IO通信模型(一)同步阻塞模式BIO(Blocking IO) IO通信模型(三)多路复用IO→

    最近更新
    01
    如何搭建一个自己的音乐服务器
    12-04
    02
    JUnit 5 单元测试教程
    11-17
    03
    使用 StringUtils.split 的坑
    11-02
    更多文章>

    提示:评论前请刷新页面,否则评论的可能不是当前文章。

    Theme by Vdoing | Copyright © 2018-2022 程序猿阿朗 | MIT License | 皖ICP备20000567号-1
    • 跟随系统
    • 浅色模式
    • 深色模式
    • 阅读模式