Boost.Asio 是一个高性能的 C++ 网络编程库,广泛应用于各种网络服务开发。本文将介绍 Asio 的几种典型用法,并演示如何进行压力测试。

一、基础概念

在开始之前,先了解几个核心概念:

概念 说明
io_context I/O 上下文,异步操作的执行引擎(旧版本叫 io_service
acceptor 接收器,用于监听和接受新连接
socket 套接字,用于读写数据
async_* 异步操作,如 async_readasync_writeasync_accept

二、最简单用法:单线程模型

这是最基本的用法,一个 io_context 在单线程中运行事件循环。

2.1 Echo 服务器
#include <boost/asio.hpp>
#include <iostream>
#include <memory>

using boost::asio::ip::tcp;

class Session : public std::enable_shared_from_this<Session> {
public:
    explicit Session(tcp::socket socket)
        : socket_(std::move(socket)) {}

    void start() {
        do_read();
    }

private:
    void do_read() {
        auto self = shared_from_this();
        socket_.async_read_some(
            boost::asio::buffer(data_, max_length),
            [this, self](boost::system::error_code ec, std::size_t length) {
                if (!ec) {
                    do_write(length);
                }
            });
    }

    void do_write(std::size_t length) {
        auto self = shared_from_this();
        boost::asio::async_write(
            socket_,
            boost::asio::buffer(data_, length),
            [this, self](boost::system::error_code ec, std::size_t /*length*/) {
                if (!ec) {
                    do_read();
                }
            });
    }

    tcp::socket socket_;
    enum { max_length = 1024 };
    char data_[max_length];
};

class Server {
public:
    Server(boost::asio::io_context& io_context, unsigned short port)
        : acceptor_(io_context, tcp::endpoint(tcp::v4(), port)) {
        do_accept();
    }

private:
    void do_accept() {
        acceptor_.async_accept(
            [this](boost::system::error_code ec, tcp::socket socket) {
                if (!ec) {
                    std::make_shared<Session>(std::move(socket))->start();
                }
                do_accept();
            });
    }

    tcp::acceptor acceptor_;
};

int main(int argc, char* argv[]) {
    if (argc != 2) {
        std::cerr << "Usage: " << argv[0] << " <port>\n";
        return 1;
    }

    unsigned short port = std::atoi(argv[1]);

    boost::asio::io_context io_context;
    Server server(io_context, port);

    std::cout << "Server listening on port " << port << std::endl;

    // 单线程运行事件循环
    io_context.run();

    return 0;
}
2.2 特点
  • 优点:简单、无需考虑线程安全
  • 缺点:无法利用多核 CPU,并发性能受限
  • 适用场景:客户端程序、低并发服务

三、多线程模型:单 io_context + 线程池

这种模式只有一个 io_context,但多个线程同时调用 run(),所有线程共享同一个事件循环。

3.1 实现
#include <boost/asio.hpp>
#include <iostream>
#include <memory>
#include <vector>
#include <thread>

using boost::asio::ip::tcp;

// Session 类同上,省略...

class Server {
public:
    Server(boost::asio::io_context& io_context, unsigned short port)
        : acceptor_(io_context, tcp::endpoint(tcp::v4(), port)) {
        do_accept();
    }

private:
    void do_accept() {
        acceptor_.async_accept(
            [this](boost::system::error_code ec, tcp::socket socket) {
                if (!ec) {
                    std::make_shared<Session>(std::move(socket))->start();
                }
                do_accept();
            });
    }

    tcp::acceptor acceptor_;
};

int main(int argc, char* argv[]) {
    if (argc != 3) {
        std::cerr << "Usage: " << argv[0] << " <port> <threads>\n";
        return 1;
    }

    unsigned short port = std::atoi(argv[1]);
    int thread_count = std::atoi(argv[2]);

    boost::asio::io_context io_context;
    Server server(io_context, port);

    std::cout << "Server listening on port " << port 
              << " with " << thread_count << " threads" << std::endl;

    // 创建线程池
    std::vector<std::thread> threads;
    for (int i = 0; i < thread_count; ++i) {
        threads.emplace_back([&io_context]() {
            io_context.run();
        });
    }

    // 主线程也参与事件循环
    io_context.run();

    for (auto& t : threads) {
        t.join();
    }

    return 0;
}
3.2 线程安全注意事项

多线程共享 io_context 时,回调函数可能在不同线程执行,需要注意:

  1. acceptor 操作async_accept 是线程安全的
  2. socket 操作:同一 socket 的异步操作不能并发执行
  3. 共享资源:如需访问共享数据,使用 strand 保证串行执行
class SafeSession : public std::enable_shared_from_this<SafeSession> {
public:
    explicit SafeSession(tcp::socket socket)
        : socket_(std::move(socket)),
          strand_(socket_.get_executor()) {}  // 创建 strand

    void start() {
        do_read();
    }

private:
    void do_read() {
        auto self = shared_from_this();
        // 使用 strand 包装,确保回调串行执行
        boost::asio::async_read_some(
            socket_,
            boost::asio::buffer(data_, max_length),
            boost::asio::bind_executor(
                strand_,
                [this, self](boost::system::error_code ec, std::size_t length) {
                    if (!ec) {
                        do_write(length);
                    }
                }));
    }

    void do_write(std::size_t length) {
        auto self = shared_from_this();
        boost::asio::async_write(
            socket_,
            boost::asio::buffer(data_, length),
            boost::asio::bind_executor(
                strand_,
                [this, self](boost::system::error_code ec, std::size_t) {
                    if (!ec) {
                        do_read();
                    }
                }));
    }

    tcp::socket socket_;
    boost::asio::strand<tcp::socket::executor_type> strand_;
    enum { max_length = 1024 };
    char data_[max_length];
};
3.3 特点
  • 优点:简单、负载均衡自动完成、连接均匀分布
  • 缺点:锁竞争(内部)、单 io_context 可能成为瓶颈
  • 适用场景:中等并发服务

四、多 io_context 模型:每个线程一个 io_context

这种模式为每个线程创建独立的 io_context,消除锁竞争,适合超高并发场景。

4.1 实现
#include <boost/asio.hpp>
#include <iostream>
#include <memory>
#include <vector>
#include <thread>
#include <atomic>

using boost::asio::ip::tcp;

class Session : public std::enable_shared_from_this<Session> {
    // 同上,省略...
};

class IoContextPool {
public:
    explicit IoContextPool(std::size_t pool_size)
        : next_index_(0) {
        if (pool_size == 0) {
            throw std::runtime_error("pool_size must be > 0");
        }

        for (std::size_t i = 0; i < pool_size; ++i) {
            auto io_context = std::make_shared<boost::asio::io_context>();
            io_contexts_.push_back(io_context);
            work_.push_back(std::make_shared<boost::asio::io_context::work>(*io_context));
        }
    }

    void run() {
        std::vector<std::thread> threads;
        for (auto& io_context : io_contexts_) {
            threads.emplace_back([io_context]() {
                io_context->run();
            });
        }

        for (auto& t : threads) {
            t.join();
        }
    }

    boost::asio::io_context& get_io_context() {
        // 轮询选择 io_context
        return *io_contexts_[next_index_++ % io_contexts_.size()];
    }

private:
    std::vector<std::shared_ptr<boost::asio::io_context>> io_contexts_;
    std::vector<std::shared_ptr<boost::asio::io_context::work>> work_;
    std::atomic<std::size_t> next_index_;
};

class Server {
public:
    Server(IoContextPool& pool, unsigned short port)
        : acceptor_(pool.get_io_context(), tcp::endpoint(tcp::v4(), port)),
          io_context_pool_(pool) {
        do_accept();
    }

private:
    void do_accept() {
        // 为新连接选择一个 io_context
        auto& io_context = io_context_pool_.get_io_context();
        
        acceptor_.async_accept(
            io_context,
            [this](boost::system::error_code ec, tcp::socket socket) {
                if (!ec) {
                    std::make_shared<Session>(std::move(socket))->start();
                }
                do_accept();
            });
    }

    tcp::acceptor acceptor_;
    IoContextPool& io_context_pool_;
};

int main(int argc, char* argv[]) {
    if (argc != 3) {
        std::cerr << "Usage: " << argv[0] << " <port> <threads>\n";
        return 1;
    }

    unsigned short port = std::atoi(argv[1]);
    std::size_t threads = std::atoi(argv[2]);

    IoContextPool pool(threads);
    Server server(pool, port);

    std::cout << "Server listening on port " << port 
              << " with " << threads << " io_context threads" << std::endl;

    pool.run();

    return 0;
}
4.2 特点
  • 优点:无锁竞争、最大程度利用多核、每线程独立事件循环
  • 缺点:实现复杂、连接分布不均可能导致负载不均
  • 适用场景:超高并发服务、CPU 密集型场景
4.3 优化:CPU 亲和性

为进一步提升性能,可以绑定线程到特定 CPU 核心:

void bind_to_cpu(int cpu) {
    cpu_set_t cpuset;
    CPU_ZERO(&cpuset);
    CPU_SET(cpu, &cpuset);
    pthread_setaffinity_np(pthread_self(), sizeof(cpu_set_t), &cpuset);
}

// 在线程启动时调用
threads.emplace_back([io_context, i]() {
    bind_to_cpu(i % std::thread::hardware_concurrency());
    io_context->run();
});

五、三种模式对比

模式 复杂度 并发性能 锁竞争 适用场景
单线程 客户端、低并发
单 io_context + 线程池 ⭐⭐ 中等并发服务
多 io_context ⭐⭐⭐ 高并发、CPU 密集型

六、压力测试

压力测试是验证网络服务性能的关键步骤。下面介绍如何编写压测客户端和使用专业工具。

6.1 自定义压测客户端

使用 Asio 编写压测客户端,模拟大量并发连接:

#include <boost/asio.hpp>
#include <iostream>
#include <vector>
#include <memory>
#include <atomic>
#include <chrono>

using boost::asio::ip::tcp;

class Client : public std::enable_shared_from_this<Client> {
public:
    Client(boost::asio::io_context& io_context,
           const tcp::resolver::results_type& endpoints,
           std::atomic<uint64_t>& total_requests,
           std::atomic<uint64_t>& total_bytes)
        : socket_(io_context),
          endpoints_(endpoints),
          total_requests_(total_requests),
          total_bytes_(total_bytes) {}

    void start() {
        do_connect();
    }

private:
    void do_connect() {
        boost::asio::async_connect(
            socket_,
            endpoints_,
            [this](boost::system::error_code ec, tcp::endpoint) {
                if (!ec) {
                    do_write();
                } else {
                    // 连接失败,重试
                    std::this_thread::sleep_for(std::chrono::milliseconds(100));
                    do_connect();
                }
            });
    }

    void do_write() {
        auto self = shared_from_this();
        boost::asio::async_write(
            socket_,
            boost::asio::buffer(message_),
            [this, self](boost::system::error_code ec, std::size_t length) {
                if (!ec) {
                    do_read();
                }
            });
    }

    void do_read() {
        auto self = shared_from_this();
        socket_.async_read_some(
            boost::asio::buffer(reply_, max_length),
            [this, self](boost::system::error_code ec, std::size_t length) {
                if (!ec) {
                    total_requests_.fetch_add(1);
                    total_bytes_.fetch_add(length);
                    do_write();  // 继续发送
                }
            });
    }

    tcp::socket socket_;
    tcp::resolver::results_type endpoints_;
    std::atomic<uint64_t>& total_requests_;
    std::atomic<uint64_t>& total_bytes_;
    enum { max_length = 1024 };
    char reply_[max_length];
    std::string message_ = "Hello, Server!";
};

int main(int argc, char* argv[]) {
    if (argc != 5) {
        std::cerr << "Usage: " << argv[0] << " <host> <port> <connections> <threads>\n";
        return 1;
    }

    std::string host = argv[1];
    std::string port = argv[2];
    int connections = std::atoi(argv[3]);
    int threads = std::atoi(argv[4]);

    boost::asio::io_context io_context;
    tcp::resolver resolver(io_context);
    auto endpoints = resolver.resolve(host, port);

    std::atomic<uint64_t> total_requests{0};
    std::atomic<uint64_t> total_bytes{0};

    // 创建多个客户端连接
    for (int i = 0; i < connections; ++i) {
        auto client = std::make_shared<Client>(
            io_context, endpoints, std::ref(total_requests), std::ref(total_bytes));
        client->start();
    }

    // 启动统计线程
    std::atomic<bool> running{true};
    std::thread stats_thread([&]() {
        auto last_time = std::chrono::steady_clock::now();
        uint64_t last_requests = 0;
        uint64_t last_bytes = 0;

        while (running) {
            std::this_thread::sleep_for(std::chrono::seconds(1));
            auto now = std::chrono::steady_clock::now();
            auto elapsed = std::chrono::duration_cast<std::chrono::seconds>(now - last_time).count();

            if (elapsed > 0) {
                uint64_t current_requests = total_requests.load();
                uint64_t current_bytes = total_bytes.load();

                uint64_t req_per_sec = (current_requests - last_requests) / elapsed;
                double mb_per_sec = (current_bytes - last_bytes) / 1024.0 / 1024.0 / elapsed;

                std::cout << "QPS: " << req_per_sec 
                          << ", Throughput: " << mb_per_sec << " MB/s" << std::endl;

                last_requests = current_requests;
                last_bytes = current_bytes;
                last_time = now;
            }
        }
    });

    // 运行事件循环
    std::vector<std::thread> thread_pool;
    for (int i = 0; i < threads; ++i) {
        thread_pool.emplace_back([&io_context]() {
            io_context.run();
        });
    }

    // 等待用户输入停止
    std::cin.get();
    running = false;
    stats_thread.join();

    for (auto& t : thread_pool) {
        t.join();
    }

    return 0;
}
6.2 使用专业压测工具

wrk 是一个高效的 HTTP 压测工具:

# 安装
git clone https://github.com/wg/wrk.git
cd wrk && make

# 使用 12 线程、1000 连接、持续 30 秒
wrk -t12 -c1000 -d30s http://localhost:8080/

# 输出示例
Running 30s test @ http://localhost:8080/
  12 threads and 1000 connections
  Thread Stats   Avg      Stdev     Max   +/- Stdev
    Latency    15.32ms   12.45ms 150.00ms   78.32%
    Req/Sec    5.45k     1.23k    8.90k    71.23%
  1952345 requests in 30.02s, 1.45GB read
Requests/sec:  65031.25
Transfer/sec:     49.32MB

TcpKali 专门用于 TCP 压测:

# 安装
brew install tcpkali  # macOS
# 或编译: https://github.com/machinezone/tcpkali

# 1000 并发连接,持续 60 秒
tcpkali -c 1000 -d 60s -e "Hello, Server!" 127.0.0.1:8080
6.3 压测指标
指标 说明 评估标准
QPS 每秒请求数 越高越好
延迟 请求响应时间 P99 < 100ms 较好
吞吐量 每秒数据量 取决于业务
连接数 同时活跃连接 验证连接管理
CPU 使用率 服务端 CPU 占用 < 80% 较健康
6.4 压测注意事项
  1. 客户端机器:确保压测客户端性能足够,不要成为瓶颈
  2. 网络环境:本地测试避免网络延迟干扰,生产测试考虑真实网络
  3. 资源监控:监控服务端 CPU、内存、网络、磁盘 IO
  4. 预热:测试前先预热,让系统进入稳定状态
  5. 多次测试:多次测试取平均值,排除偶然因素

七、总结

本文介绍了 Boost.Asio 的三种典型用法:

  1. 单线程模型:简单易用,适合低并发场景
  2. 单 io_context + 线程池:中等复杂度,适合中等并发
  3. 多 io_context 模型:高性能无锁设计,适合高并发场景

同时介绍了压力测试的方法,包括自定义压测客户端和专业工具的使用。

选择哪种模式取决于实际需求,建议从简单开始,根据性能测试结果逐步优化。

八、参考

  1. Boost.Asio 官方文档
  2. wrk - HTTP benchmarking tool
  3. TcpKali - TCP load testing