🦀 Как написать высоконагруженного WebSocket-клиента на Rust с Tokio


На главную > Блог > Категория > 🦀 Как написать высоконагруженного WebSocket-клиента на Rust с Tokio

websocket_rust

Вступление: почему WebSocket — это сердце real-time трейдинга

Трейдинг — это скорость. Когда цена биткоина меняется за миллисекунды, REST-запросы «раз в секунду» уже не подходят. Нужен WebSocket — постоянное соединение, по которому биржа присылает тики в реальном времени. Но просто подключиться недостаточно. WebSocket-клиент должен быть:

  • Высоконагруженным — обрабатывать тысячи тиков в секунду.
  • 🛡️ Отказоустойчивым — переподключаться при обрыве связи.
  • 🧵 Асинхронным — не блокировать поток на время обработки.
  • 🚀 Быстрым — минимальные задержки между получением тика и отправкой ордера.

Rust с его экосистемой Tokio идеально подходит для этой задачи. В этой статье я покажу, как написать промышленного WebSocket-клиента для Binance, который:

  1. Подключается к нескольким торговым парам одновременно.
  2. Обрабатывает тики асинхронно без блокировок.
  3. Автоматически переподключается при разрыве.
  4. Фильтрует и агрегирует данные для стратегии.
«WebSocket-клиент на Rust + Tokio — это как реактивный двигатель для вашего бота. Он не знает слова «тормоз».

1. Проектирование архитектуры: что нам нужно

Наш клиент будет состоять из нескольких независимых компонентов, общающихся через каналы (tokio::sync::mpsc). Это позволяет масштабировать систему и изолировать ошибки.

Компоненты:

  • 🔌 WebSocket Connection Manager — отвечает за соединение, переподключение и чтение сообщений.
  • 📋 Message Parser — парсит JSON, извлекает цену, объём, время.
  • 📈 Order Book Updater — поддерживает актуальный стакан (опционально).
  • 🧠 Strategy Worker — принимает тики, рассчитывает индикаторы, генерирует сигналы.
  • 💸 Order Sender — отправляет ордеры через REST или второй WebSocket.
🎯 Архитектура async/await: Каждый компонент — отдельная `tokio::task`. Они общаются через каналы, что исключает блокировки и гонки данных.

2. Установка зависимостей


[dependencies]
tokio = { version = "1", features = ["full"] }
tokio-tungstenite = "0.20"
futures-util = "0.3"
serde = { version = "1", features = ["derive"] }
serde_json = "1"
tracing = "0.1"
tracing-subscriber = "0.3"
anyhow = "1"
thiserror = "1"
📦 Что за библиотеки:
  • tokio-tungstenite — асинхронный WebSocket поверх Tungstenite (чистый Rust).
  • tracing — структурированное логирование.
  • anyhow/thiserror — удобная обработка ошибок.

3. Создаём подключение с автоматическим переподключением

Главная проблема WebSocket — он может разорваться из-за сети или нагрузки на сервере. Поэтому нужен механизм reconnect.


use tokio_tungstenite::{connect_async, tungstenite::protocol::Message};
use futures_util::{SinkExt, StreamExt};
use tokio::time::{sleep, Duration};
use tracing::{info, warn, error};

pub async fn connect_websocket(
    url: &str,
    on_message: impl Fn(Message) + Send + Sync + 'static,
) {
    loop {
        match connect_async(url).await {
            Ok((ws_stream, _)) => {
                info!("✅ WebSocket подключён: {}", url);
                let (_, mut read) = ws_stream.split();
                
                while let Some(msg) = read.next().await {
                    match msg {
                        Ok(msg) => on_message(msg),
                        Err(e) => {
                            error!("Ошибка чтения WebSocket: {}", e);
                            break;
                        }
                    }
                }
                warn!("Соединение потеряно, переподключение через 3 секунды...");
            }
            Err(e) => {
                error!("Не удалось подключиться: {}. Повтор через 3 секунды...", e);
            }
        }
        sleep(Duration::from_secs(3)).await;
    }
}

4. Структуры данных и парсинг сообщений Binance

Binance присылает тики в формате:


{
  "e": "trade",
  "E": 123456789,
  "s": "BTCUSDT",
  "t": 123456,
  "p": "67000.50",
  "q": "0.123",
  "T": 123456789,
  "m": false
}
use serde::{Deserialize, Serialize};

#[derive(Debug, Clone, Deserialize)]
pub struct TradeEvent {
    #[serde(rename = "e")]
    pub event_type: String,
    #[serde(rename = "E")]
    pub event_time: u64,
    #[serde(rename = "s")]
    pub symbol: String,
    #[serde(rename = "p")]
    pub price: String,
    #[serde(rename = "q")]
    pub quantity: String,
}

impl TradeEvent {
    pub fn parse_price(&self) -> f64 {
        self.price.parse().unwrap_or(0.0)
    }
    
    pub fn parse_quantity(&self) -> f64 {
        self.quantity.parse().unwrap_or(0.0)
    }
}

5. Многоканальная обработка: разделение задач

Создадим канал, через который WebSocket будет передавать тики в стратегию. Это позволяет не блокировать чтение WebSocket тяжёлыми вычислениями.


use tokio::sync::mpsc::{channel, Sender, Receiver};

#[derive(Debug, Clone)]
pub enum MarketEvent {
    Trade(TradeEvent),
    // можно добавить OrderBook, Kline и т.д.
}

pub async fn start_websocket_client(
    symbols: Vec,
    tx: Sender<MarketEvent>,
) -> anyhow::Result<()> {
    let streams: Vec<String> = symbols
        .iter()
        .map(|s| format!("{}@trade", s.to_lowercase()))
        .collect();
    
    let url = format!("wss://stream.binance.com:9443/stream?streams={}", streams.join("/"));
    
    tokio::spawn(async move {
        connect_websocket(&url, move |msg| {
            let tx = tx.clone();
            tokio::spawn(async move {
                if msg.is_text() {
                    if let Ok(text) = msg.to_text() {
                        if let Ok(event) = parse_trade_event(text) {
                            let _ = tx.send(MarketEvent::Trade(event)).await;
                        }
                    }
                }
            });
        }).await;
    });
    
    Ok(())
}

fn parse_trade_event(data: &str) -> anyhow::Result<TradeEvent> {
    let value: serde_json::Value = serde_json::from_str(data)?;
    // Binance оборачивает в поле "data"
    let trade_data = value["data"].as_str().unwrap();
    Ok(serde_json::from_str(trade_data)?)
}

6. Стратегия: асинхронный worker с расчётом индикаторов

Стратегия получает тики из канала, обновляет индикаторы и, при выполнении условий, отправляет сигнал в канал ордеров.


use std::collections::VecDeque;

struct SMACrossover {
    prices: VecDeque<f64>,
    short_period: usize,
    long_period: usize,
}

impl SMACrossover {
    fn new(short: usize, long: usize) -> Self {
        Self {
            prices: VecDeque::with_capacity(long + 1),
            short_period: short,
            long_period: long,
        }
    }
    
    fn add_price(&mut self, price: f64) -> Option<Signal> {
        self.prices.push_back(price);
        if self.prices.len() > self.long_period {
            self.prices.pop_front();
        }
        
        if self.prices.len() >= self.long_period {
            let sma_short = self.sma(self.short_period);
            let sma_long = self.sma(self.long_period);
            
            if sma_short > sma_long {
                Some(Signal::Buy)
            } else {
                Some(Signal::Sell)
            }
        } else {
            None
        }
    }
    
    fn sma(&self, period: usize) -> f64 {
        let len = self.prices.len();
        let start = len - period;
        let sum: f64 = self.prices.range(start..).sum();
        sum / period as f64
    }
}

#[derive(Debug, Clone, Copy, PartialEq)]
enum Signal {
    Buy,
    Sell,
    Hold,
}

7. Отправка ордеров через REST (асинхронно)

Когда стратегия генерирует сигнал, отправляем ордер через отдельный канал. Это позволяет избежать блокировки WebSocket.


use reqwest::Client;

async fn send_order(symbol: &str, side: &str, quantity: f64, api_key: &str, api_secret: &str) -> anyhow::Result<()> {
    let client = Client::new();
    let params = [
        ("symbol", symbol),
        ("side", side),
        ("type", "MARKET"),
        ("quantity", &quantity.to_string()),
    ];
    
    let response = client
        .post("https://api.binance.com/api/v3/order")
        .header("X-MBX-APIKEY", api_key)
        .form(&params)
        .send()
        .await?;
    
    if response.status().is_success() {
        info!("Ордер отправлен: {} {}", side, quantity);
    } else {
        error!("Ошибка отправки ордера: {}", response.text().await?);
    }
    
    Ok(())
}

8. Собираем всё вместе: главная функция


#[tokio::main]
async fn main() -> anyhow::Result<()> {
    tracing_subscriber::fmt::init();
    
    let symbols = vec!["BTCUSDT".to_string(), "ETHUSDT".to_string()];
    let (market_tx, mut market_rx) = channel::<MarketEvent>(1000);
    let (signal_tx, mut signal_rx) = channel::<Signal>(100);
    
    // Запуск WebSocket клиента
    start_websocket_client(symbols.clone(), market_tx).await?;
    
    // Запуск стратегий для каждой пары
    for symbol in symbols {
        let mut strategy = SMACrossover::new(10, 30);
        let mut rx = market_rx.resubscribe(); // разделяем канал
        let tx = signal_tx.clone();
        
        tokio::spawn(async move {
            while let Some(event) = rx.recv().await {
                if let MarketEvent::Trade(trade) = event {
                    let price = trade.parse_price();
                    if let Some(signal) = strategy.add_price(price) {
                        if signal != Signal::Hold {
                            info!("{}: {:?}", trade.symbol, signal);
                            let _ = tx.send(signal).await;
                        }
                    }
                }
            }
        });
    }
    
    // Запуск обработчика сигналов (отправка ордеров)
    tokio::spawn(async move {
        while let Some(signal) = signal_rx.recv().await {
            match signal {
                Signal::Buy => {
                    let _ = send_order("BTCUSDT", "BUY", 0.01, "KEY", "SECRET").await;
                }
                Signal::Sell => {
                    let _ = send_order("BTCUSDT", "SELL", 0.01, "KEY", "SECRET").await;
                }
                _ => {}
            }
        }
    });
    
    // Ждём сигнала завершения (например, Ctrl+C)
    tokio::signal::ctrl_c().await?;
    info!("Бот остановлен");
    
    Ok(())
}

9. Оптимизация производительности

Избегайте аллокаций в горячем пути


// Плохо: создание новой строки на каждый тик
let price_str = trade.price.clone();
let price: f64 = price_str.parse().unwrap();

// Хорошо: парсим из &str
let price: f64 = trade.price.parse().unwrap_or(0.0);

Используйте маленький канал с фиксированным размером

Если канал переполняется, подтормаживайте чтение — лучше потерять несколько тиков, чем упасть по памяти.

Парсинг JSON с помощью simd-json

Для высоких нагрузок замените serde_json на simd-json (до 2-3 раз быстрее).

Заключение: ваш high-load WebSocket клиент готов

Вы создали промышленного WebSocket-клиента, который:

  • ✅ Подключается к нескольким торговым парам одновременно.
  • ✅ Автоматически переподключается при обрыве.
  • ✅ Обрабатывает тики асинхронно через каналы.
  • ✅ Рассчитывает индикаторы и отправляет ордеры в реальном времени.

Такая архитектура выдерживает тысячи тиков в секунду и работает 24/7 без утечек памяти. Rust + Tokio дают вам предсказуемую производительность и безопасность, недоступную в Python или Java.

Настройте парсинг под свои нужды, добавьте order book или kline, интегрируйте свою стратегию — и вы получите бота, способного конкурировать с лучшими HFT-системами.

«WebSocket-клиент на Rust — это как гиперкару: быстрый, надёжный и не прощает ошибок в управлении. Но если вы освоите — обгоните всех».

 

Дата размещения статьи: 09-06-2026 в 09:17:06