Breaking the Rust Loop: Building Your First Async Data Pipeline (for Data Engineers)
From Python Scripts to Rust Binaries: How to map your ETL mental models to Rust's async ecosystem.

As a Data Engineer, I often found myself stuck in the "Rust Tutorial Loop." I’d learn the syntax, fight the borrow checker, and then forget it all because I didn't have a practical use case.
The breakthrough happened when I stopped trying to learn "Rust" and started trying to build "Data Engineering tools" with Rust.
In this post, we will build a classic Extract-Load (EL) pipeline. Instead of Python, Requests, and Pandas, we will use Rust, Reqwest, and SQLite. We will fetch real-time Bitcoin data from a public API, validate the schema strictly (something Python struggles with), and archive it into a database.
Why Rust for Data Engineering?
Schema Safety: If the API changes its data format, our code fails before processing, not after filling the database with garbage.
Single Binary: No virtual environments or
pip install. The final output is a single file you can drop onto a server or Lambda function.Performance: It handles high-throughput async tasks with a fraction of the memory Python requires.
The Stack
We are using the "Big Four" crates (libraries) for Rust data ops:
reqwest: The HTTP client (Rust'srequests).tokio: The async runtime (handles concurrency).serde: Serialization/Deserialization (turns JSON into Structs).rusqlite: Lightweight SQLite interaction.
Step 1: Project Setup
First, initialize a new project and add the dependencies.
# Create the project
cargo new crypto_ingestor
cd crypto_ingestor
# Add dependencies via CLI (modern Cargo)
cargo add reqwest --features json
cargo add tokio --features full
cargo add rusqlite --features bundled
cargo add serde --features derive
Step 2: Define the Data Contract
In Python, we might just grab data['bitcoin']['usd']. In Rust, we define the structure upfront. This is our contract. If the API violates this, the pipeline stops immediately.
use serde::Deserialize;
#[derive(Debug, Deserialize)]
struct CoinGeckoResponse {
bitcoin: PriceData,
}
#[derive(Debug, Deserialize)]
struct PriceData {
usd: f64,
usd_market_cap: f64,
}
Step 3: The "Gotcha" (Handling Real World APIs)
Public APIs often block generic HTTP clients. During development, I ran into a 403 Forbidden or Schema Error because the API rejected the default User-Agent.
Here is the robust, production-ready fetch function. Note how we handle errors: instead of a generic traceback, we log the raw response if parsing fails (acting like a mini Dead Letter Queue).
use std::error::Error;
async fn fetch_price() -> std::result::Result<CoinGeckoResponse, Box<dyn Error>> {
let url = "https://api.coingecko.com/api/v3/simple/price?ids=bitcoin&vs_currencies=usd&include_market_cap=true";
// 1. Build a client to set headers (mimic a browser)
let client = reqwest::Client::new();
let response = client
.get(url)
.header("User-Agent", "MyCryptoIngestor/1.0")
.send()
.await?;
// 2. Check Status Code before parsing
if !response.status().is_success() {
return Err(Box::from(format!("API Request failed with status: {}", response.status())));
}
// 3. Get raw text to debug schema failures
let text_content = response.text().await?;
// 4. Parse JSON
let parsed_data: CoinGeckoResponse = match serde_json::from_str(&text_content) {
Ok(data) => data,
Err(e) => {
eprintln!("⚠️ RAW RESPONSE RECEIVED: {}", text_content);
return Err(Box::new(e));
}
};
Ok(parsed_data)
}
Step 4: Database Operations & Verification
We need to create the table if it doesn't exist, insert the data, and then—to prove it worked—read it back within the same app.
use rusqlite::{params, Connection, Result};
fn init_db() -> Result<Connection> {
let conn = Connection::open("prices.db")?;
conn.execute(
"CREATE TABLE IF NOT EXISTS bitcoin_prices (
id INTEGER PRIMARY KEY,
price_usd REAL,
market_cap REAL,
timestamp DATETIME DEFAULT CURRENT_TIMESTAMP
)",
[],
)?;
Ok(conn)
}
fn read_history(conn: &Connection) -> Result<()> {
let mut stmt = conn.prepare("SELECT id, price_usd, timestamp FROM bitcoin_prices ORDER BY id DESC LIMIT 5")?;
// Map rows to a tuple
let rows = stmt.query_map([], |row| {
Ok((
row.get::<_, i32>(0)?, // id
row.get::<_, f64>(1)?, // price
row.get::<_, String>(2)? // timestamp
))
})?;
println!("\n📊 Recent History (Last 5 records):");
println!("-------------------------------------");
for row_result in rows {
let (id, price, time) = row_result?;
println!("[{}] {} | Price: ${:.2}", id, time, price);
}
Ok(())
}
Step 5: The Full Pipeline (Putting it together)
Here is the complete src/main.rs file.
use rusqlite::{params, Connection, Result};
use serde::Deserialize;
use std::error::Error;
// --- Data Models ---
#[derive(Debug, Deserialize)]
struct CoinGeckoResponse {
bitcoin: PriceData,
}
#[derive(Debug, Deserialize)]
struct PriceData {
usd: f64,
usd_market_cap: f64,
}
// --- Database Functions ---
fn init_db() -> Result<Connection> {
let conn = Connection::open("prices.db")?;
conn.execute(
"CREATE TABLE IF NOT EXISTS bitcoin_prices (
id INTEGER PRIMARY KEY,
price_usd REAL,
market_cap REAL,
timestamp DATETIME DEFAULT CURRENT_TIMESTAMP
)",
[],
)?;
Ok(conn)
}
fn read_history(conn: &Connection) -> Result<()> {
let mut stmt = conn.prepare("SELECT id, price_usd, timestamp FROM bitcoin_prices ORDER BY id DESC LIMIT 5")?;
let rows = stmt.query_map([], |row| {
Ok((
row.get::<_, i32>(0)?,
row.get::<_, f64>(1)?,
row.get::<_, String>(2)?
))
})?;
println!("\n📊 Recent History (Last 5 records):");
println!("-------------------------------------");
for row_result in rows {
let (id, price, time) = row_result?;
println!("[{}] {} | Price: ${:.2}", id, time, price);
}
Ok(())
}
// --- Extract Function ---
async fn fetch_price() -> std::result::Result<CoinGeckoResponse, Box<dyn Error>> {
let url = "https://api.coingecko.com/api/v3/simple/price?ids=bitcoin&vs_currencies=usd&include_market_cap=true";
let client = reqwest::Client::new();
let response = client
.get(url)
.header("User-Agent", "MyCryptoIngestor/1.0")
.send()
.await?;
if !response.status().is_success() {
return Err(Box::from(format!("API Request failed with status: {}", response.status())));
}
let text_content = response.text().await?;
let parsed_data: CoinGeckoResponse = match serde_json::from_str(&text_content) {
Ok(data) => data,
Err(e) => {
eprintln!("⚠️ RAW RESPONSE RECEIVED: {}", text_content);
return Err(Box::new(e));
}
};
Ok(parsed_data)
}
// --- Main Orchestration ---
#[tokio::main]
async fn main() -> std::result::Result<(), Box<dyn Error>> {
println!("🚀 Starting Ingestion Pipeline...");
// 1. Initialize DB
let conn = init_db()?;
// 2. Fetch Data (Extract)
println!("📥 Fetching data from API...");
match fetch_price().await {
Ok(data) => {
let price = data.bitcoin.usd;
let cap = data.bitcoin.usd_market_cap;
println!("✅ Data received: BTC at ${}", price);
// 3. Insert into DB (Load)
conn.execute(
"INSERT INTO bitcoin_prices (price_usd, market_cap) VALUES (?1, ?2)",
params![price, cap],
)?;
println!("💾 Saved to SQLite database.");
// 4. Verification
read_history(&conn)?;
}
Err(e) => {
eprintln!("❌ Pipeline Failed: {}", e);
}
}
Ok(())
}
Running the Pipeline
To run the application:
cargo run
Expected Output:
🚀 Starting Ingestion Pipeline...
📥 Fetching data from API...
✅ Data received: BTC at $90610
💾 Saved to SQLite database.
📊 Recent History (Last 5 records):
-------------------------------------
[1] 2024-01-10 12:30:01 | Price: $90610.00
Conclusion
We just built a fully functional data app in Rust. It handles network failures, parses JSON into strong types, and persists data to disk.
For a Data Engineer, the biggest shift is moving from "Scripting" (where you run code and hope it works) to "Systems Programming" (where you handle every possible error state upfront). It’s stricter, but the result is a pipeline that sleeps soundly at night.

