未读代码 未读代码
首页
  • Java 18 新功能介绍
  • Java 17 新功能介绍
  • Java 16 新功能介绍
  • Java 15 新功能介绍
  • Java 14 新功能介绍
  • Java 8 新特性

    • Java 8 Lambda 表达式
    • Java 8 Stream 流式操作
    • Java 8 时间处理介绍
    • Java 8 Optional 介绍
  • Java 开发工具
Java 源码分析
Spring Boot 系列
  • Arthas 问题定位
  • JMH 基准测试
GitHub (opens new window)
首页
  • Java 18 新功能介绍
  • Java 17 新功能介绍
  • Java 16 新功能介绍
  • Java 15 新功能介绍
  • Java 14 新功能介绍
  • Java 8 新特性

    • Java 8 Lambda 表达式
    • Java 8 Stream 流式操作
    • Java 8 时间处理介绍
    • Java 8 Optional 介绍
  • Java 开发工具
Java 源码分析
Spring Boot 系列
  • Arthas 问题定位
  • JMH 基准测试
GitHub (opens new window)
  • Java 开发

  • Java 开发工具

  • 消息中间件

    • IO通信模型(一)同步阻塞模式BIO(Blocking IO)
      • IO通信模型(二)同步非阻塞模式NIO(NonBlocking IO)
      • IO通信模型(三)多路复用IO
      • 消息队列中间件(一)介绍
      • 消息队列中间件(二)使用 ActiveMQ
      • 消息队列中间件(三)Kafka 入门指南
    • Java 开发
    • 消息中间件
    程序猿阿朗
    2018-10-23

    IO通信模型(一)同步阻塞模式BIO(Blocking IO)

    # 几个概念

    阻塞IO 和非阻塞IO 这两个概念是程序级别的。主要描述的是程序请求操作系统IO操作后,如果IO资源没有准备好,那么程序该如何处理的问题:前者等待;后者继续执行(但是使用线程一直轮询,直到有IO资源准备好了)。

    同步IO 和 异步IO,这两个概念是操作系统级别的。主要描述的是操作系统在收到程序请求IO操作后,如果IO资源没有准备好,该如何响应程序的问题:前者不响应,直到IO资源准备好以后;后者返回一个标记(好让程序和自己知道以后的数据往哪里通知),当IO资源准备好以后,再用事件机制返回给程序。

    # 同步阻塞模式(Blocking IO)

    同步阻塞IO模型是最简单的IO模型,用户线程在内核进行IO操作时如果数据没有准备号会被阻塞。

    BIO 图片来源:www.masterraghu.com

    伪代码表示如下:

    {
        // 阻塞,直到有数据
    	read(socket, buffer);
    	process(buffer);
    }
    
    1
    2
    3
    4
    5

    BIO通信方式的特点:

    1. 一个线程负责连接,多线程则为每一个接入开启一个线程。
    2. 一个请求一个应答。
    3. 请求之后应答之前客户端会一直等待(阻塞)。

    BIO通信方式在单线程服务器下一次只能处理一个请求,在处理完毕之前一直阻塞。因此不适用于高并发的情况。不过可以使用多线程稍微改进。

    BIO通信模型-来源于慕课网

    # Java同步阻塞模式

    Java中的阻塞模式BIO,就是在java.net包中的Socket套接字的实现,Socket套接字是TCP/UDP等传输层协议的实现。

    # Java同步阻塞模式编码

    # 多线程客户端

    为了测试服务端程序,可以先编写一个多线程客户端用于请求测试。

    
    import java.io.IOException;
    import java.io.InputStream;
    import java.io.OutputStream;
    import java.net.Socket;
    import java.util.concurrent.CountDownLatch;
    
    /**
     * <p>
     * BIO测试
     * 模拟20个客户端并发请求,服务端则使用单线程。
     *
     * @Author niujinpeng
     * @Date 2018/10/15 10:50
     */
    public class SocketClient {
        public static void main(String[] args) throws InterruptedException {
            Integer clientNumber = 20;
            CountDownLatch countDownLatch = new CountDownLatch(clientNumber);
    
            // 分别启动20个客户端
            for (int index = 0; index < clientNumber; index++, countDownLatch.countDown()) {
                SocketClientRequestThread client = new SocketClientRequestThread(countDownLatch, index);
                new Thread(client).start();
            }
    
            synchronized (SocketClient.class) {
                SocketClient.class.wait();
            }
        }
    }
    
    /**
     * <p>
     * 客户端,用于模拟请求
     *
     * @Author niujinpeng
     * @Date 2018/10/15 10:53
     */
    class SocketClientRequestThread implements Runnable {
    
        private CountDownLatch countDownLatch;
    
        /**
         * 线程的编号
         */
        private Integer clientIndex;
    
    
        public SocketClientRequestThread(CountDownLatch countDownLatch, Integer clientIndex) {
            this.countDownLatch = countDownLatch;
            this.clientIndex = clientIndex;
        }
    
        @Override
        public void run() {
            Socket socket = null;
            OutputStream clientRequest = null;
            InputStream clientResponse = null;
            try {
                socket = new Socket("localhost", 83);
                clientRequest = socket.getOutputStream();
                clientResponse = socket.getInputStream();
    
                //等待,直到SocketClientDaemon完成所有线程的启动,然后所有线程一起发送请求
                this.countDownLatch.await();
    
                // 发送请求信息
                clientRequest.write(("这是第" + this.clientIndex + "个客户端的请求").getBytes());
                clientRequest.flush();
    
                // 等待服务器返回消息
                System.out.println("第" + this.clientIndex + "个客户端请求发送完成,等待服务器响应");
                int maxLen = 1024;
                byte[] contentBytes = new byte[maxLen];
                int realLen;
                String message = "";
    
                // 等待服务端返回,in和out不能cloese
                while ((realLen = clientResponse.read(contentBytes, 0, maxLen)) != -1) {
                    message += new String(contentBytes, 0, realLen);
                }
                System.out.println("第" + this.clientIndex + "个客户端接受到来自服务器的消息:" + message);
    
            } catch (IOException e) {
                e.printStackTrace();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                try {
                    if (clientRequest != null) {
                        clientRequest.close();
                    }
                    if (clientRequest != null) {
                        clientResponse.close();
                    }
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102

    # 单线程服务端

    因为Java中的Socket就是BIO的模式,因此我们可以很简单的编写一个BIO单线程服务端。

    SocketServer.java

    
    import java.io.IOException;
    import java.io.InputStream;
    import java.io.OutputStream;
    import java.net.ServerSocket;
    import java.net.Socket;
    
    /**
     * BIO服务端
     * <p>
     * 单线程阻塞的服务器端
     *
     * @Author niujinpeng
     * @Date 2018/10/15 11:17
     */
    public class SocketServer {
    
        public static void main(String[] args) throws IOException {
            ServerSocket serverSocket = new ServerSocket(83);
            try {
                while (true) {
                    // 阻塞,直到有数据准备完毕
                    Socket socket = serverSocket.accept();
    
                    // 开始收取信息
                    InputStream input = socket.getInputStream();
                    OutputStream output = socket.getOutputStream();
                    Integer sourcePort = socket.getPort();
                    int maxLen = 1024 * 2;
                    byte[] contextBytes = new byte[maxLen];
    
                    // 阻塞,直到有数据准备完毕
                    int realLen = input.read(contextBytes, 0, maxLen);
                    // 读取信息
                    String message = new String(contextBytes, 0, realLen);
    
                    // 输出接收信息
                    System.out.println("服务器收到来自端口【" + sourcePort + "】的信息:" + message);
                    // 响应信息
                    output.write("Done!".getBytes());
    
                    // 关闭
                    output.close();
                    input.close();
                    socket.close();
    
                }
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                if (serverSocket != null) {
                    serverSocket.close();
                }
            }
        }
    }
    
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57

    # 多线程服务端

    单线程服务器,在处理请求时只能同时处理一条,也就是说如果在请求到来时发现有请求尚未处理完毕,只能等待处理,因此使用多线程改进服务端。

    SocketServerThread.java

    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import java.io.IOException;
    import java.io.InputStream;
    import java.io.OutputStream;
    import java.net.ServerSocket;
    import java.net.Socket;
    
    /**
     * BIO服务端
     * <p>
     * 多线程的阻塞的服务端
     * <p>
     * 当然,接收到客户端的socket后,业务的处理过程可以交给一个线程来做。
     * 但还是改变不了socket被一个一个的做accept()的情况。
     *
     * @Author niujinpeng
     * @Date 2018/10/15 11:17
     */
    public class SocketServerThread implements Runnable {
    
        /**
         * 日志
         */
        private static final Logger logger = LoggerFactory.getLogger(SocketServerThread.class);
    
        private Socket socket;
    
        public SocketServerThread(Socket socket) {
            this.socket = socket;
        }
    
        public static void main(String[] args) throws Exception {
            ServerSocket serverSocket = new ServerSocket(83);
            try {
                while (true) {
                    Socket socket = serverSocket.accept();
                    //当然业务处理过程可以交给一个线程(这里可以使用线程池),并且线程的创建是很耗资源的。
                    //最终改变不了.accept()只能一个一个接受socket的情况,并且被阻塞的情况
                    SocketServerThread socketServerThread = new SocketServerThread(socket);
                    new Thread(socketServerThread).start();
                }
            } catch (Exception e) {
                System.out.println(e.getMessage());
            } finally {
                if (serverSocket != null) {
                    serverSocket.close();
                }
            }
        }
    
    
        @Override
        public void run() {
            InputStream in = null;
            OutputStream out = null;
            try {
                //下面我们收取信息
                in = socket.getInputStream();
                out = socket.getOutputStream();
                Integer sourcePort = socket.getPort();
                int maxLen = 1024;
                byte[] contextBytes = new byte[maxLen];
                //使用线程,同样无法解决read方法的阻塞问题,
                //也就是说read方法处同样会被阻塞,直到操作系统有数据准备好
                int realLen = in.read(contextBytes, 0, maxLen);
                //读取信息
                String message = new String(contextBytes, 0, realLen);
    
                //下面打印信息
                logger.info("服务器收到来自于端口:" + sourcePort + "的信息:" + message);
    
                //下面开始发送信息
                out.write("回发响应信息!".getBytes());
            } catch (Exception e) {
                logger.error(e.getMessage(), e);
            } finally {
                //试图关闭
                try {
                    if (in != null) {
                        in.close();
                    }
                    if (out != null) {
                        out.close();
                    }
                    if (this.socket != null) {
                        this.socket.close();
                    }
                } catch (IOException e) {
                    logger.error(e.getMessage(), e);
                }
            }
        }
    }
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96

    看起来多线程增加了服务能力,但是很明显多线程改进之后仍有以下局限性:

    • 接收和通知处理结果的过程依旧是单线程的。
    • 系统可以创建的线程数量有限。cat /proc/sys/kernel/threads-max可以查看可以创建的线程数量。
    • 如果线程较多,CPU需要更多的时间切换,处理真正业务的时间就会变少。
    • 创建线程会消耗较多资源,JVM创建一个线程都会默认分配128KB空间。
    • 多线程也无法解决因为调用底层系统的同步IO而决定的同步IO机制。

    # 同步阻塞模式总结

    BIO模式因为进程的阻塞挂起,不会消耗过多的CPU资源,而且开发难度低,比较适合并发量小的网络应用开发。同时很容易发现因为请求IO会阻塞进程,所以不时候并发量大的应用。如果为每一个请求分配一个线程,系统开销就会过大。

    同时在Java中,使用了多线程来处理阻塞模式,也无法解决程序在accept()和read()时候的阻塞问题。因为accept()和read()的IO模式支持是基于操作系统的,如果操作系统发现没有套接字从指定的端口传送过来,那么操作系统就会等待。这样accept()和read()方法就会一直等待。


    GitHub 源码:https://github.com/niumoo/java-toolbox (opens new window)

    此文参考文章:5种IO模型、阻塞IO和非阻塞IO、同步IO和异步IO (opens new window) 此文参考文章:架构设计:系统间通信(3)——IO通信模型和JAVA实践 上篇 (opens new window)

    订阅

    文章持续更新,订阅可以关注「 程序猿阿朗 」公众号或者未读代码博客。

    文章作者: 程序猿阿朗
    文章链接:https://www.wdbyte.com/2018/10/io/io1-bio/
    版权声明:本网站当前文章采用 CC BY-NC-SA 4.0 许可协议。转载请注明来自 未读代码!
    #BIO#IO#通信模型#Blocking IO#同步阻塞IO
    上次更新: 2022/12/05, 08:18:32
    Linux配置Tomcat的单机多实例
    IO通信模型(二)同步非阻塞模式NIO(NonBlocking IO)

    ← Linux配置Tomcat的单机多实例 IO通信模型(二)同步非阻塞模式NIO(NonBlocking IO)→

    最近更新
    01
    如何搭建一个自己的音乐服务器
    12-04
    02
    JUnit 5 单元测试教程
    11-17
    03
    使用 StringUtils.split 的坑
    11-02
    更多文章>

    提示:评论前请刷新页面,否则评论的可能不是当前文章。

    Theme by Vdoing | Copyright © 2018-2022 程序猿阿朗 | MIT License | 皖ICP备20000567号-1
    • 跟随系统
    • 浅色模式
    • 深色模式
    • 阅读模式