Boost.Asio 网络库用法与压力测试
Boost.Asio 是一个高性能的 C++ 网络编程库,广泛应用于各种网络服务开发。本文将介绍 Asio 的几种典型用法,并演示如何进行压力测试。
一、基础概念
在开始之前,先了解几个核心概念:
| 概念 | 说明 |
|---|---|
io_context |
I/O 上下文,异步操作的执行引擎(旧版本叫 io_service) |
acceptor |
接收器,用于监听和接受新连接 |
socket |
套接字,用于读写数据 |
async_* |
异步操作,如 async_read、async_write、async_accept |
二、最简单用法:单线程模型
这是最基本的用法,一个 io_context 在单线程中运行事件循环。
2.1 Echo 服务器
#include <boost/asio.hpp>
#include <iostream>
#include <memory>
using boost::asio::ip::tcp;
class Session : public std::enable_shared_from_this<Session> {
public:
explicit Session(tcp::socket socket)
: socket_(std::move(socket)) {}
void start() {
do_read();
}
private:
void do_read() {
auto self = shared_from_this();
socket_.async_read_some(
boost::asio::buffer(data_, max_length),
[this, self](boost::system::error_code ec, std::size_t length) {
if (!ec) {
do_write(length);
}
});
}
void do_write(std::size_t length) {
auto self = shared_from_this();
boost::asio::async_write(
socket_,
boost::asio::buffer(data_, length),
[this, self](boost::system::error_code ec, std::size_t /*length*/) {
if (!ec) {
do_read();
}
});
}
tcp::socket socket_;
enum { max_length = 1024 };
char data_[max_length];
};
class Server {
public:
Server(boost::asio::io_context& io_context, unsigned short port)
: acceptor_(io_context, tcp::endpoint(tcp::v4(), port)) {
do_accept();
}
private:
void do_accept() {
acceptor_.async_accept(
[this](boost::system::error_code ec, tcp::socket socket) {
if (!ec) {
std::make_shared<Session>(std::move(socket))->start();
}
do_accept();
});
}
tcp::acceptor acceptor_;
};
int main(int argc, char* argv[]) {
if (argc != 2) {
std::cerr << "Usage: " << argv[0] << " <port>\n";
return 1;
}
unsigned short port = std::atoi(argv[1]);
boost::asio::io_context io_context;
Server server(io_context, port);
std::cout << "Server listening on port " << port << std::endl;
// 单线程运行事件循环
io_context.run();
return 0;
}
2.2 特点
- 优点:简单、无需考虑线程安全
- 缺点:无法利用多核 CPU,并发性能受限
- 适用场景:客户端程序、低并发服务
三、多线程模型:单 io_context + 线程池
这种模式只有一个 io_context,但多个线程同时调用 run(),所有线程共享同一个事件循环。
3.1 实现
#include <boost/asio.hpp>
#include <iostream>
#include <memory>
#include <vector>
#include <thread>
using boost::asio::ip::tcp;
// Session 类同上,省略...
class Server {
public:
Server(boost::asio::io_context& io_context, unsigned short port)
: acceptor_(io_context, tcp::endpoint(tcp::v4(), port)) {
do_accept();
}
private:
void do_accept() {
acceptor_.async_accept(
[this](boost::system::error_code ec, tcp::socket socket) {
if (!ec) {
std::make_shared<Session>(std::move(socket))->start();
}
do_accept();
});
}
tcp::acceptor acceptor_;
};
int main(int argc, char* argv[]) {
if (argc != 3) {
std::cerr << "Usage: " << argv[0] << " <port> <threads>\n";
return 1;
}
unsigned short port = std::atoi(argv[1]);
int thread_count = std::atoi(argv[2]);
boost::asio::io_context io_context;
Server server(io_context, port);
std::cout << "Server listening on port " << port
<< " with " << thread_count << " threads" << std::endl;
// 创建线程池
std::vector<std::thread> threads;
for (int i = 0; i < thread_count; ++i) {
threads.emplace_back([&io_context]() {
io_context.run();
});
}
// 主线程也参与事件循环
io_context.run();
for (auto& t : threads) {
t.join();
}
return 0;
}
3.2 线程安全注意事项
多线程共享 io_context 时,回调函数可能在不同线程执行,需要注意:
- acceptor 操作:
async_accept是线程安全的 - socket 操作:同一 socket 的异步操作不能并发执行
- 共享资源:如需访问共享数据,使用
strand保证串行执行
class SafeSession : public std::enable_shared_from_this<SafeSession> {
public:
explicit SafeSession(tcp::socket socket)
: socket_(std::move(socket)),
strand_(socket_.get_executor()) {} // 创建 strand
void start() {
do_read();
}
private:
void do_read() {
auto self = shared_from_this();
// 使用 strand 包装,确保回调串行执行
boost::asio::async_read_some(
socket_,
boost::asio::buffer(data_, max_length),
boost::asio::bind_executor(
strand_,
[this, self](boost::system::error_code ec, std::size_t length) {
if (!ec) {
do_write(length);
}
}));
}
void do_write(std::size_t length) {
auto self = shared_from_this();
boost::asio::async_write(
socket_,
boost::asio::buffer(data_, length),
boost::asio::bind_executor(
strand_,
[this, self](boost::system::error_code ec, std::size_t) {
if (!ec) {
do_read();
}
}));
}
tcp::socket socket_;
boost::asio::strand<tcp::socket::executor_type> strand_;
enum { max_length = 1024 };
char data_[max_length];
};
3.3 特点
- 优点:简单、负载均衡自动完成、连接均匀分布
- 缺点:锁竞争(内部)、单
io_context可能成为瓶颈 - 适用场景:中等并发服务
四、多 io_context 模型:每个线程一个 io_context
这种模式为每个线程创建独立的 io_context,消除锁竞争,适合超高并发场景。
4.1 实现
#include <boost/asio.hpp>
#include <iostream>
#include <memory>
#include <vector>
#include <thread>
#include <atomic>
using boost::asio::ip::tcp;
class Session : public std::enable_shared_from_this<Session> {
// 同上,省略...
};
class IoContextPool {
public:
explicit IoContextPool(std::size_t pool_size)
: next_index_(0) {
if (pool_size == 0) {
throw std::runtime_error("pool_size must be > 0");
}
for (std::size_t i = 0; i < pool_size; ++i) {
auto io_context = std::make_shared<boost::asio::io_context>();
io_contexts_.push_back(io_context);
work_.push_back(std::make_shared<boost::asio::io_context::work>(*io_context));
}
}
void run() {
std::vector<std::thread> threads;
for (auto& io_context : io_contexts_) {
threads.emplace_back([io_context]() {
io_context->run();
});
}
for (auto& t : threads) {
t.join();
}
}
boost::asio::io_context& get_io_context() {
// 轮询选择 io_context
return *io_contexts_[next_index_++ % io_contexts_.size()];
}
private:
std::vector<std::shared_ptr<boost::asio::io_context>> io_contexts_;
std::vector<std::shared_ptr<boost::asio::io_context::work>> work_;
std::atomic<std::size_t> next_index_;
};
class Server {
public:
Server(IoContextPool& pool, unsigned short port)
: acceptor_(pool.get_io_context(), tcp::endpoint(tcp::v4(), port)),
io_context_pool_(pool) {
do_accept();
}
private:
void do_accept() {
// 为新连接选择一个 io_context
auto& io_context = io_context_pool_.get_io_context();
acceptor_.async_accept(
io_context,
[this](boost::system::error_code ec, tcp::socket socket) {
if (!ec) {
std::make_shared<Session>(std::move(socket))->start();
}
do_accept();
});
}
tcp::acceptor acceptor_;
IoContextPool& io_context_pool_;
};
int main(int argc, char* argv[]) {
if (argc != 3) {
std::cerr << "Usage: " << argv[0] << " <port> <threads>\n";
return 1;
}
unsigned short port = std::atoi(argv[1]);
std::size_t threads = std::atoi(argv[2]);
IoContextPool pool(threads);
Server server(pool, port);
std::cout << "Server listening on port " << port
<< " with " << threads << " io_context threads" << std::endl;
pool.run();
return 0;
}
4.2 特点
- 优点:无锁竞争、最大程度利用多核、每线程独立事件循环
- 缺点:实现复杂、连接分布不均可能导致负载不均
- 适用场景:超高并发服务、CPU 密集型场景
4.3 优化:CPU 亲和性
为进一步提升性能,可以绑定线程到特定 CPU 核心:
void bind_to_cpu(int cpu) {
cpu_set_t cpuset;
CPU_ZERO(&cpuset);
CPU_SET(cpu, &cpuset);
pthread_setaffinity_np(pthread_self(), sizeof(cpu_set_t), &cpuset);
}
// 在线程启动时调用
threads.emplace_back([io_context, i]() {
bind_to_cpu(i % std::thread::hardware_concurrency());
io_context->run();
});
五、三种模式对比
| 模式 | 复杂度 | 并发性能 | 锁竞争 | 适用场景 |
|---|---|---|---|---|
| 单线程 | ⭐ | 低 | 无 | 客户端、低并发 |
| 单 io_context + 线程池 | ⭐⭐ | 中 | 有 | 中等并发服务 |
| 多 io_context | ⭐⭐⭐ | 高 | 无 | 高并发、CPU 密集型 |
六、压力测试
压力测试是验证网络服务性能的关键步骤。下面介绍如何编写压测客户端和使用专业工具。
6.1 自定义压测客户端
使用 Asio 编写压测客户端,模拟大量并发连接:
#include <boost/asio.hpp>
#include <iostream>
#include <vector>
#include <memory>
#include <atomic>
#include <chrono>
using boost::asio::ip::tcp;
class Client : public std::enable_shared_from_this<Client> {
public:
Client(boost::asio::io_context& io_context,
const tcp::resolver::results_type& endpoints,
std::atomic<uint64_t>& total_requests,
std::atomic<uint64_t>& total_bytes)
: socket_(io_context),
endpoints_(endpoints),
total_requests_(total_requests),
total_bytes_(total_bytes) {}
void start() {
do_connect();
}
private:
void do_connect() {
boost::asio::async_connect(
socket_,
endpoints_,
[this](boost::system::error_code ec, tcp::endpoint) {
if (!ec) {
do_write();
} else {
// 连接失败,重试
std::this_thread::sleep_for(std::chrono::milliseconds(100));
do_connect();
}
});
}
void do_write() {
auto self = shared_from_this();
boost::asio::async_write(
socket_,
boost::asio::buffer(message_),
[this, self](boost::system::error_code ec, std::size_t length) {
if (!ec) {
do_read();
}
});
}
void do_read() {
auto self = shared_from_this();
socket_.async_read_some(
boost::asio::buffer(reply_, max_length),
[this, self](boost::system::error_code ec, std::size_t length) {
if (!ec) {
total_requests_.fetch_add(1);
total_bytes_.fetch_add(length);
do_write(); // 继续发送
}
});
}
tcp::socket socket_;
tcp::resolver::results_type endpoints_;
std::atomic<uint64_t>& total_requests_;
std::atomic<uint64_t>& total_bytes_;
enum { max_length = 1024 };
char reply_[max_length];
std::string message_ = "Hello, Server!";
};
int main(int argc, char* argv[]) {
if (argc != 5) {
std::cerr << "Usage: " << argv[0] << " <host> <port> <connections> <threads>\n";
return 1;
}
std::string host = argv[1];
std::string port = argv[2];
int connections = std::atoi(argv[3]);
int threads = std::atoi(argv[4]);
boost::asio::io_context io_context;
tcp::resolver resolver(io_context);
auto endpoints = resolver.resolve(host, port);
std::atomic<uint64_t> total_requests{0};
std::atomic<uint64_t> total_bytes{0};
// 创建多个客户端连接
for (int i = 0; i < connections; ++i) {
auto client = std::make_shared<Client>(
io_context, endpoints, std::ref(total_requests), std::ref(total_bytes));
client->start();
}
// 启动统计线程
std::atomic<bool> running{true};
std::thread stats_thread([&]() {
auto last_time = std::chrono::steady_clock::now();
uint64_t last_requests = 0;
uint64_t last_bytes = 0;
while (running) {
std::this_thread::sleep_for(std::chrono::seconds(1));
auto now = std::chrono::steady_clock::now();
auto elapsed = std::chrono::duration_cast<std::chrono::seconds>(now - last_time).count();
if (elapsed > 0) {
uint64_t current_requests = total_requests.load();
uint64_t current_bytes = total_bytes.load();
uint64_t req_per_sec = (current_requests - last_requests) / elapsed;
double mb_per_sec = (current_bytes - last_bytes) / 1024.0 / 1024.0 / elapsed;
std::cout << "QPS: " << req_per_sec
<< ", Throughput: " << mb_per_sec << " MB/s" << std::endl;
last_requests = current_requests;
last_bytes = current_bytes;
last_time = now;
}
}
});
// 运行事件循环
std::vector<std::thread> thread_pool;
for (int i = 0; i < threads; ++i) {
thread_pool.emplace_back([&io_context]() {
io_context.run();
});
}
// 等待用户输入停止
std::cin.get();
running = false;
stats_thread.join();
for (auto& t : thread_pool) {
t.join();
}
return 0;
}
6.2 使用专业压测工具
wrk 是一个高效的 HTTP 压测工具:
# 安装
git clone https://github.com/wg/wrk.git
cd wrk && make
# 使用 12 线程、1000 连接、持续 30 秒
wrk -t12 -c1000 -d30s http://localhost:8080/
# 输出示例
Running 30s test @ http://localhost:8080/
12 threads and 1000 connections
Thread Stats Avg Stdev Max +/- Stdev
Latency 15.32ms 12.45ms 150.00ms 78.32%
Req/Sec 5.45k 1.23k 8.90k 71.23%
1952345 requests in 30.02s, 1.45GB read
Requests/sec: 65031.25
Transfer/sec: 49.32MB
TcpKali 专门用于 TCP 压测:
# 安装
brew install tcpkali # macOS
# 或编译: https://github.com/machinezone/tcpkali
# 1000 并发连接,持续 60 秒
tcpkali -c 1000 -d 60s -e "Hello, Server!" 127.0.0.1:8080
6.3 压测指标
| 指标 | 说明 | 评估标准 |
|---|---|---|
| QPS | 每秒请求数 | 越高越好 |
| 延迟 | 请求响应时间 | P99 < 100ms 较好 |
| 吞吐量 | 每秒数据量 | 取决于业务 |
| 连接数 | 同时活跃连接 | 验证连接管理 |
| CPU 使用率 | 服务端 CPU 占用 | < 80% 较健康 |
6.4 压测注意事项
- 客户端机器:确保压测客户端性能足够,不要成为瓶颈
- 网络环境:本地测试避免网络延迟干扰,生产测试考虑真实网络
- 资源监控:监控服务端 CPU、内存、网络、磁盘 IO
- 预热:测试前先预热,让系统进入稳定状态
- 多次测试:多次测试取平均值,排除偶然因素
七、总结
本文介绍了 Boost.Asio 的三种典型用法:
- 单线程模型:简单易用,适合低并发场景
- 单 io_context + 线程池:中等复杂度,适合中等并发
- 多 io_context 模型:高性能无锁设计,适合高并发场景
同时介绍了压力测试的方法,包括自定义压测客户端和专业工具的使用。
选择哪种模式取决于实际需求,建议从简单开始,根据性能测试结果逐步优化。