Building scalable data-driven applications using Rust
Data-driven applications cần xử lý, phân tích và serve dữ liệu một cách nhanh chóng và đáng tin cậy. Rust với performance cao, memory safety và concurrency mạnh mẽ là lựa chọn tuyệt vời để xây dựng các ứng dụng data-driven có khả năng scale.
Trong bài này, chúng ta sẽ tìm hiểu cách xây dựng một ứng dụng data-driven hoàn chỉnh với các thành phần:
- Data ingestion và processing
- Storage và indexing
- Query engine
- API để serve data
Kiến trúc ứng dụng Data-Driven
┌─────────────┐
│ Data Sources│ (CSV, JSON, API, Database)
└──────┬──────┘
│
▼
┌─────────────┐
│ Ingestion │ (tokio, reqwest, async-std)
└──────┬──────┘
│
▼
┌─────────────┐
│ Processing │ (polars, arrow, datafusion)
└──────┬──────┘
│
▼
┌─────────────┐
│ Storage │ (parquet, sled, rocksdb)
└──────┬──────┘
│
▼
┌─────────────┐
│ Query Engine│ (datafusion, tantivy)
└──────┬──────┘
│
▼
┌─────────────┐
│ API Server │ (actix-web, axum, rocket)
└─────────────┘
Ví dụ 1: Data Ingestion với Tokio
Xây dựng service để ingest data từ nhiều nguồn:
use tokio::fs::File;
use tokio::io::AsyncReadExt;
use reqwest;
use anyhow::Result;
/// Đọc data từ file
async fn ingest_from_file(path: &str) -> Result<Vec<u8>> {
let mut file = File::open(path).await?;
let mut contents = Vec::new();
file.read_to_end(&mut contents).await?;
Ok(contents)
}
/// Fetch data từ HTTP API
async fn ingest_from_api(url: &str) -> Result<String> {
let response = reqwest::get(url).await?;
let body = response.text().await?;
Ok(body)
}
/// Ingest data từ nhiều nguồn đồng thời
async fn ingest_all() -> Result<()> {
// Chạy parallel
let (file_data, api_data) = tokio::join!(
ingest_from_file("data.csv"),
ingest_from_api("https://api.example.com/data")
);
println!("File data size: {}", file_data?.len());
println!("API data: {}", api_data?);
Ok(())
}
#[tokio::main]
async fn main() -> Result<()> {
ingest_all().await?;
Ok(())
}
Ví dụ 2: Processing với DataFusion
DataFusion là SQL query engine sử dụng Apache Arrow, cực kỳ nhanh:
[dependencies]
datafusion = "43.0"
tokio = { version = "1.48", features = ["full"] }
use datafusion::prelude::*;
use anyhow::Result;
#[tokio::main]
async fn main() -> Result<()> {
// Tạo SessionContext
let ctx = SessionContext::new();
// Đăng ký Parquet file như một table
ctx.register_parquet("sales", "sales_data.parquet", ParquetReadOptions::default())
.await?;
// Chạy SQL query
let df = ctx.sql(
"SELECT
category,
SUM(revenue) as total_revenue,
AVG(revenue) as avg_revenue,
COUNT(*) as num_transactions
FROM sales
WHERE revenue > 100
GROUP BY category
ORDER BY total_revenue DESC"
).await?;
// Hiển thị kết quả
df.show().await?;
// Hoặc lưu lại thành Parquet
df.write_parquet("output.parquet", None, None).await?;
Ok(())
}
Ưu điểm của DataFusion:
- Tốc độ cực nhanh (columnar processing)
- Hỗ trợ SQL standard
- Có thể scale đến hàng TB dữ liệu
- Zero-copy reads với Arrow
Ví dụ 3: Storage với Parquet
Parquet là format columnar storage rất hiệu quả:
use polars::prelude::*;
use anyhow::Result;
fn save_to_parquet(df: &mut DataFrame, path: &str) -> Result<()> {
let file = std::fs::File::create(path)?;
ParquetWriter::new(&file)
.with_compression(ParquetCompression::Snappy) // Nén data
.finish(df)?;
Ok(())
}
fn read_from_parquet(path: &str) -> Result<DataFrame> {
let df = ParquetReader::new(std::fs::File::open(path)?)
.finish()?;
Ok(df)
}
fn main() -> Result<()> {
// Tạo sample data
let mut df = df! {
"id" => &[1, 2, 3, 4, 5],
"name" => &["Alice", "Bob", "Charlie", "David", "Eve"],
"revenue" => &[1200.50, 800.30, 1500.75, 950.20, 1100.00],
}?;
// Lưu
save_to_parquet(&mut df, "users.parquet")?;
println!("✅ Đã lưu data");
// Đọc lại
let loaded = read_from_parquet("users.parquet")?;
println!("{:?}", loaded);
Ok(())
}
Ví dụ 4: Full-text Search với Tantivy
Tantivy là full-text search engine tương tự Lucene/Elasticsearch:
[dependencies]
tantivy = "0.22"
use tantivy::schema::*;
use tantivy::{doc, Index, IndexWriter, ReloadPolicy};
use tantivy::collector::TopDocs;
use tantivy::query::QueryParser;
use anyhow::Result;
fn main() -> Result<()> {
// Định nghĩa schema
let mut schema_builder = Schema::builder();
schema_builder.add_text_field("title", TEXT | STORED);
schema_builder.add_text_field("body", TEXT);
let schema = schema_builder.build();
// Tạo index
let index = Index::create_in_ram(schema.clone());
let mut index_writer: IndexWriter = index.writer(50_000_000)?;
// Index documents
let title = schema.get_field("title").unwrap();
let body = schema.get_field("body").unwrap();
index_writer.add_document(doc!(
title => "Rust Programming",
body => "Rust is a systems programming language that runs blazingly fast"
))?;
index_writer.add_document(doc!(
title => "Data Engineering",
body => "Building scalable data pipelines with Rust and Polars"
))?;
index_writer.commit()?;
// Search
let reader = index
.reader_builder()
.reload_policy(ReloadPolicy::OnCommitWithDelay)
.try_into()?;
let searcher = reader.searcher();
let query_parser = QueryParser::for_index(&index, vec![title, body]);
let query = query_parser.parse_query("Rust")?;
let top_docs = searcher.search(&query, &TopDocs::with_limit(10))?;
println!("Tìm thấy {} kết quả:", top_docs.len());
for (_score, doc_address) in top_docs {
let retrieved_doc = searcher.doc(doc_address)?;
println!("{}", schema.to_json(&retrieved_doc));
}
Ok(())
}
Ví dụ 5: API Server với Axum
Xây dựng REST API để serve data:
[dependencies]
axum = "0.7"
tokio = { version = "1.48", features = ["full"] }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
datafusion = "43.0"
use axum::{
extract::{Query, State},
http::StatusCode,
routing::get,
Json, Router,
};
use datafusion::prelude::*;
use serde::{Deserialize, Serialize};
use std::sync::Arc;
use anyhow::Result;
#[derive(Clone)]
struct AppState {
ctx: Arc<SessionContext>,
}
#[derive(Deserialize)]
struct QueryParams {
category: Option<String>,
min_revenue: Option<f64>,
}
#[derive(Serialize)]
struct SalesRecord {
category: String,
total_revenue: f64,
num_transactions: i64,
}
async fn query_sales(
State(state): State<AppState>,
Query(params): Query<QueryParams>,
) -> Result<Json<Vec<SalesRecord>>, StatusCode> {
// Build SQL query dựa trên params
let mut sql = String::from(
"SELECT category,
SUM(revenue) as total_revenue,
COUNT(*) as num_transactions
FROM sales WHERE 1=1"
);
if let Some(cat) = params.category {
sql.push_str(&format!(" AND category = '{}'", cat));
}
if let Some(min_rev) = params.min_revenue {
sql.push_str(&format!(" AND revenue >= {}", min_rev));
}
sql.push_str(" GROUP BY category");
// Execute query
let df = state.ctx.sql(&sql)
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
// Convert to JSON (simplified)
let records = vec![
SalesRecord {
category: "Electronics".to_string(),
total_revenue: 5000.0,
num_transactions: 100,
}
];
Ok(Json(records))
}
async fn health_check() -> &'static str {
"OK"
}
#[tokio::main]
async fn main() -> Result<()> {
// Setup DataFusion
let ctx = SessionContext::new();
ctx.register_parquet("sales", "sales_data.parquet", ParquetReadOptions::default())
.await?;
let state = AppState {
ctx: Arc::new(ctx),
};
// Build router
let app = Router::new()
.route("/health", get(health_check))
.route("/api/sales", get(query_sales))
.with_state(state);
// Run server
println!("🚀 Server chạy tại http://localhost:3000");
let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await?;
axum::serve(listener, app).await?;
Ok(())
}
Test API:
# Health check
curl http://localhost:3000/health
# Query với params
curl "http://localhost:3000/api/sales?category=Electronics&min_revenue=100"
Ví dụ 6: Real-time Processing với Channels
Xử lý data streaming real-time:
use tokio::sync::mpsc;
use tokio::time::{Duration, sleep};
use anyhow::Result;
#[derive(Debug, Clone)]
struct DataPoint {
timestamp: i64,
value: f64,
}
async fn producer(tx: mpsc::Sender<DataPoint>) {
let mut counter = 0;
loop {
let data = DataPoint {
timestamp: counter,
value: (counter as f64) * 1.5,
};
if tx.send(data).await.is_err() {
println!("Receiver dropped");
break;
}
counter += 1;
sleep(Duration::from_millis(100)).await;
}
}
async fn processor(mut rx: mpsc::Receiver<DataPoint>) {
let mut buffer = Vec::new();
let batch_size = 10;
while let Some(data) = rx.recv().await {
buffer.push(data);
if buffer.len() >= batch_size {
// Process batch
let sum: f64 = buffer.iter().map(|d| d.value).sum();
let avg = sum / buffer.len() as f64;
println!("📊 Batch processed: {} records, avg = {:.2}", buffer.len(), avg);
buffer.clear();
}
}
}
#[tokio::main]
async fn main() -> Result<()> {
let (tx, rx) = mpsc::channel(100);
// Spawn producer và processor
let producer_handle = tokio::spawn(producer(tx));
let processor_handle = tokio::spawn(processor(rx));
// Chạy 5 giây
sleep(Duration::from_secs(5)).await;
producer_handle.abort();
processor_handle.await?;
Ok(())
}
Best Practices
1. Sử dụng Columnar Format
#![allow(unused)]
fn main() {
// ✅ Tốt: Parquet (columnar)
ParquetWriter::new(&file).finish(&mut df)?;
// ❌ Tránh: CSV cho large datasets
// CSV rất chậm và tốn memory
}
2. Lazy Evaluation
#![allow(unused)]
fn main() {
// ✅ Tốt: Lazy evaluation
let result = df.lazy()
.filter(...)
.select(...)
.collect()?; // Chỉ execute 1 lần
// ❌ Tránh: Eager evaluation
let df = df.filter(...)?; // Execute ngay
let df = df.select(...)?; // Execute lại
}
3. Batch Processing
#![allow(unused)]
fn main() {
// ✅ Tốt: Process theo batch
for chunk in data.chunks(10_000) {
process_chunk(chunk);
}
// ❌ Tránh: Process từng item
for item in data {
process_item(item); // Chậm!
}
}
4. Connection Pooling
#![allow(unused)]
fn main() {
use deadpool_postgres::{Config, Pool};
// ✅ Tốt: Dùng connection pool
let pool = config.create_pool(None, NoTls)?;
let client = pool.get().await?;
// ❌ Tránh: Tạo connection mỗi lần query
}
Monitoring và Observability
use std::time::Instant;
fn main() -> Result<()> {
let start = Instant::now();
// Process data
let df = process_data()?;
println!("⏱️ Processing time: {:?}", start.elapsed());
println!("📊 Records processed: {}", df.height());
println!("💾 Memory used: {} MB", df.estimated_size() / 1_000_000);
Ok(())
}
So sánh Performance
| Operation | Python/Pandas | Rust/Polars | Speedup |
|---|---|---|---|
| Read CSV (1GB) | 5.2s | 0.8s | 6.5x |
| GroupBy + Agg | 3.1s | 0.4s | 7.8x |
| Join (10M rows) | 12.5s | 1.2s | 10.4x |
| Write Parquet | 2.8s | 0.5s | 5.6x |
Tổng kết
Rust cung cấp ecosystem mạnh mẽ để xây dựng data-driven applications:
- ✅ DataFusion: SQL query engine cực nhanh
- ✅ Polars: DataFrame library như Pandas nhưng nhanh hơn nhiều
- ✅ Tantivy: Full-text search engine
- ✅ Arrow: Columnar format chuẩn công nghiệp
- ✅ Tokio: Async runtime cho concurrent processing
- ✅ Axum/Actix: Web frameworks để serve data
Với Rust, bạn có thể xây dựng ứng dụng vừa nhanh, vừa safe, vừa dễ maintain và scale.