Skip to main content

Command Palette

Search for a command to run...

Breaking the Rust Loop: Building Your First Async Data Pipeline (for Data Engineers)

From Python Scripts to Rust Binaries: How to map your ETL mental models to Rust's async ecosystem.

Updated
5 min read
Breaking the Rust Loop: Building Your First Async Data Pipeline (for Data Engineers)

As a Data Engineer, I often found myself stuck in the "Rust Tutorial Loop." I’d learn the syntax, fight the borrow checker, and then forget it all because I didn't have a practical use case.

The breakthrough happened when I stopped trying to learn "Rust" and started trying to build "Data Engineering tools" with Rust.

In this post, we will build a classic Extract-Load (EL) pipeline. Instead of Python, Requests, and Pandas, we will use Rust, Reqwest, and SQLite. We will fetch real-time Bitcoin data from a public API, validate the schema strictly (something Python struggles with), and archive it into a database.

Why Rust for Data Engineering?

  • Schema Safety: If the API changes its data format, our code fails before processing, not after filling the database with garbage.

  • Single Binary: No virtual environments or pip install. The final output is a single file you can drop onto a server or Lambda function.

  • Performance: It handles high-throughput async tasks with a fraction of the memory Python requires.

The Stack

We are using the "Big Four" crates (libraries) for Rust data ops:

  1. reqwest: The HTTP client (Rust's requests).

  2. tokio: The async runtime (handles concurrency).

  3. serde: Serialization/Deserialization (turns JSON into Structs).

  4. rusqlite: Lightweight SQLite interaction.

Step 1: Project Setup

First, initialize a new project and add the dependencies.

# Create the project
cargo new crypto_ingestor
cd crypto_ingestor

# Add dependencies via CLI (modern Cargo)
cargo add reqwest --features json
cargo add tokio --features full
cargo add rusqlite --features bundled
cargo add serde --features derive

Step 2: Define the Data Contract

In Python, we might just grab data['bitcoin']['usd']. In Rust, we define the structure upfront. This is our contract. If the API violates this, the pipeline stops immediately.

use serde::Deserialize;

#[derive(Debug, Deserialize)]
struct CoinGeckoResponse {
    bitcoin: PriceData,
}

#[derive(Debug, Deserialize)]
struct PriceData {
    usd: f64,
    usd_market_cap: f64,
}

Step 3: The "Gotcha" (Handling Real World APIs)

Public APIs often block generic HTTP clients. During development, I ran into a 403 Forbidden or Schema Error because the API rejected the default User-Agent.

Here is the robust, production-ready fetch function. Note how we handle errors: instead of a generic traceback, we log the raw response if parsing fails (acting like a mini Dead Letter Queue).

use std::error::Error;

async fn fetch_price() -> std::result::Result<CoinGeckoResponse, Box<dyn Error>> {
    let url = "https://api.coingecko.com/api/v3/simple/price?ids=bitcoin&vs_currencies=usd&include_market_cap=true";

    // 1. Build a client to set headers (mimic a browser)
    let client = reqwest::Client::new();

    let response = client
        .get(url)
        .header("User-Agent", "MyCryptoIngestor/1.0") 
        .send()
        .await?;

    // 2. Check Status Code before parsing
    if !response.status().is_success() {
        return Err(Box::from(format!("API Request failed with status: {}", response.status())));
    }

    // 3. Get raw text to debug schema failures
    let text_content = response.text().await?;

    // 4. Parse JSON
    let parsed_data: CoinGeckoResponse = match serde_json::from_str(&text_content) {
        Ok(data) => data,
        Err(e) => {
            eprintln!("⚠️ RAW RESPONSE RECEIVED: {}", text_content);
            return Err(Box::new(e));
        }
    };

    Ok(parsed_data)
}

Step 4: Database Operations & Verification

We need to create the table if it doesn't exist, insert the data, and then—to prove it worked—read it back within the same app.

use rusqlite::{params, Connection, Result};

fn init_db() -> Result<Connection> {
    let conn = Connection::open("prices.db")?;
    conn.execute(
        "CREATE TABLE IF NOT EXISTS bitcoin_prices (
            id INTEGER PRIMARY KEY,
            price_usd REAL,
            market_cap REAL,
            timestamp DATETIME DEFAULT CURRENT_TIMESTAMP
        )",
        [],
    )?;
    Ok(conn)
}

fn read_history(conn: &Connection) -> Result<()> {
    let mut stmt = conn.prepare("SELECT id, price_usd, timestamp FROM bitcoin_prices ORDER BY id DESC LIMIT 5")?;

    // Map rows to a tuple
    let rows = stmt.query_map([], |row| {
        Ok((
            row.get::<_, i32>(0)?,    // id
            row.get::<_, f64>(1)?,    // price
            row.get::<_, String>(2)?  // timestamp
        ))
    })?;

    println!("\n📊 Recent History (Last 5 records):");
    println!("-------------------------------------");

    for row_result in rows {
        let (id, price, time) = row_result?;
        println!("[{}] {} | Price: ${:.2}", id, time, price);
    }

    Ok(())
}

Step 5: The Full Pipeline (Putting it together)

Here is the complete src/main.rs file.

use rusqlite::{params, Connection, Result};
use serde::Deserialize;
use std::error::Error;

// --- Data Models ---
#[derive(Debug, Deserialize)]
struct CoinGeckoResponse {
    bitcoin: PriceData,
}

#[derive(Debug, Deserialize)]
struct PriceData {
    usd: f64,
    usd_market_cap: f64,
}

// --- Database Functions ---
fn init_db() -> Result<Connection> {
    let conn = Connection::open("prices.db")?;
    conn.execute(
        "CREATE TABLE IF NOT EXISTS bitcoin_prices (
            id INTEGER PRIMARY KEY,
            price_usd REAL,
            market_cap REAL,
            timestamp DATETIME DEFAULT CURRENT_TIMESTAMP
        )",
        [],
    )?;
    Ok(conn)
}

fn read_history(conn: &Connection) -> Result<()> {
    let mut stmt = conn.prepare("SELECT id, price_usd, timestamp FROM bitcoin_prices ORDER BY id DESC LIMIT 5")?;
    let rows = stmt.query_map([], |row| {
        Ok((
            row.get::<_, i32>(0)?,
            row.get::<_, f64>(1)?,
            row.get::<_, String>(2)?
        ))
    })?;

    println!("\n📊 Recent History (Last 5 records):");
    println!("-------------------------------------");
    for row_result in rows {
        let (id, price, time) = row_result?;
        println!("[{}] {} | Price: ${:.2}", id, time, price);
    }
    Ok(())
}

// --- Extract Function ---
async fn fetch_price() -> std::result::Result<CoinGeckoResponse, Box<dyn Error>> {
    let url = "https://api.coingecko.com/api/v3/simple/price?ids=bitcoin&vs_currencies=usd&include_market_cap=true";
    let client = reqwest::Client::new();

    let response = client
        .get(url)
        .header("User-Agent", "MyCryptoIngestor/1.0") 
        .send()
        .await?;

    if !response.status().is_success() {
        return Err(Box::from(format!("API Request failed with status: {}", response.status())));
    }

    let text_content = response.text().await?;
    let parsed_data: CoinGeckoResponse = match serde_json::from_str(&text_content) {
        Ok(data) => data,
        Err(e) => {
            eprintln!("⚠️ RAW RESPONSE RECEIVED: {}", text_content);
            return Err(Box::new(e));
        }
    };
    Ok(parsed_data)
}

// --- Main Orchestration ---
#[tokio::main]
async fn main() -> std::result::Result<(), Box<dyn Error>> {
    println!("🚀 Starting Ingestion Pipeline...");

    // 1. Initialize DB
    let conn = init_db()?;

    // 2. Fetch Data (Extract)
    println!("📥 Fetching data from API...");
    match fetch_price().await {
        Ok(data) => {
            let price = data.bitcoin.usd;
            let cap = data.bitcoin.usd_market_cap;
            println!("✅ Data received: BTC at ${}", price);

            // 3. Insert into DB (Load)
            conn.execute(
                "INSERT INTO bitcoin_prices (price_usd, market_cap) VALUES (?1, ?2)",
                params![price, cap],
            )?;
            println!("💾 Saved to SQLite database.");

            // 4. Verification
            read_history(&conn)?;
        }
        Err(e) => {
            eprintln!("❌ Pipeline Failed: {}", e);
        }
    }

    Ok(())
}

Running the Pipeline

To run the application:

cargo run

Expected Output:

🚀 Starting Ingestion Pipeline...
📥 Fetching data from API...
✅ Data received: BTC at $90610
💾 Saved to SQLite database.

📊 Recent History (Last 5 records):
-------------------------------------
[1] 2024-01-10 12:30:01 | Price: $90610.00

Conclusion

We just built a fully functional data app in Rust. It handles network failures, parses JSON into strong types, and persists data to disk.

For a Data Engineer, the biggest shift is moving from "Scripting" (where you run code and hope it works) to "Systems Programming" (where you handle every possible error state upfront). It’s stricter, but the result is a pipeline that sleeps soundly at night.