>> Justifying Rust

Before I start my story, this is not a language battle on which is "better" but to see in action how a Rust equivalent compares with a Go implementation in terms of performance given its promises of performance without a garbage collector and precise memory management. After this challenge, I can see how good Go's automatic memory manager even if my implementation was able to outperform it. Preamble aside, I hope this journey can be informative as well as add some perspective to what Rust might need.

It started when a person was asking for help with porting some Go gist to Rust and getting a significantly worse performance profile given how similar the translation was. The gist was for the One Billion Row Challenge by Corlin Palmer which is a straightforward solution with some clever optimizations. This piqued my interest and decided to take a stab at the challenge. Since the challenge deadline is over and mainly for Java, my goal is to only beat this Go implementation on even terms and not necessarily optimizing beyond it such as using mmap, io_uring or SIMD.

The challenge is simply parsing a generated text file with two data columns (weather station city and temperature measurement) and one billion rows, calculating aggregates (min, mean and max temperature per city) and printing the output in a JSON-like format. Generating this file is around 15 GB and can be done with the non-authoritative Python script from the main repository:

git clone https://github.com/gunnarmorling/1brc cd 1brc

cd src/main/python
python create_measurements.py 1_000_000_000

mv ../../../data/measurements.txt $HOME/measurements.txt

To set the target, running the Go code on this massive file:

wget 'https://gist.githubusercontent.com/corlinp/176a97c58099bca36bcd5679e68f9708/raw/5028f54e727f0f1716a2195de922460949a759fd/1brc_corlinp.go'

time go run 1brc_corlinp.go $HOME/measurements.txt
# real          0m24.533s

On my AMD Ryzen 7 machine with 32 GB RAM and 16 logical cores, it takes an amazingly fast 24.5 seconds that is my time to beat. (It does break the rules a bit by fitting to a single dataset or assuming a smaller city length, but who cares and this is for fun.) Since I rarely tweak for performance, this is not my expertise as I enjoy Rust for its safety and ergonomics above all. This is to say my techniques are simple and other more efficient implementations might appeal for more technical readers, but I also assume the reader is comfortable enough with the borrow checker. For this article, I will journey through with the most basic solution, trying different leads and ending with the winning solution.

>> Baseline Solution (5m26s)

Reference: Commit | main.rs

A basic solution to this challenge is to read the file incrementally per line to avoid loading the whole file to memory, parsing each line and then storing the results to a hash map of cities and its aggregate calculation. Starting with a rough implementation:

const KB: usize = 1024;

/// The Go code uses 256 KB buffer size.
/// The default buffer size is 8 KB.
const BUF_SIZE: usize = 256 * KB;

fn main() {
    /// Read first commandline argument as file path
    let path = std::env::args().nth(1).expect("no path given");

    /// Use a buffered line reader over the file
    let file = File::open(path).expect("file does not exist");
    let mut lines = BufReader::with_capacity(file, BUF_SIZE).lines();

    /// Collect the results using a HashMap
    let mut aggregate_map: HashMap<String, Aggregate> = HashMap::new();

    /// Read the file by line
    while let Some(Ok(line)) = lines.next() {
        /// Parse each line for a record
        let Record { city, measurement } = parse_line(&line);

        /// Record each measurement
        aggregate_map
            .entry(city)
            .and_modify(|aggregate| aggregate.record(measurement))
            .or_insert_with(|| Aggregate::new(measurement));
    }
}

Tip: IO Buffering

Two things to define in this code is Aggregate struct and parse_line. First, the aggregate struct, Aggregate, holds the minimum, maximum and both the sum and count to calculate the mean of the output:

#[derive(Debug)]
struct Aggregate {
    total: f32,
    count: usize,
    min: f32,
    max: f32,
}

/// Straightforward enough definition
impl Aggregate {
    fn new(measurement: f32) -> Self {
        Self {
            total: measurement,
            count: 1,
            min: measurement,
            max: measurement,
        }
    }

    fn record(&mut self, measurement: f32) {
        self.total = self.total + measurement;
        self.count = self.count + 1;
        self.min = self.min.min(measurement);
        self.max = self.max.max(measurement);
    }

    fn mean(&self) -> f32 {
        self.total / (self.count as f32)
    }
}

Second, parse_line parses a line for the city and temperature measurement:

/// Can be a two tuple of (String, f32) but I prefer structs for clarity
#[derive(Debug)]
struct Record {
    city: String,
    measurement: f32,
}

fn parse_line(line: String) -> Record {
    let (raw_city, raw_measurement) = line
        .split_once(';')
        .expect("line does not have a semicolon");

    Record {
        city: raw_city.to_owned(),
        measurement: raw_measurement.parse().expect("measurement is not a f32"),
    }
}

Since the data file is separated by a semicolon (;), using String::split_once and String::parse make this easy. Once the data has been collected, the output is sorted by city which can be done by converting the HashMap to a sorted BTreeMap:

let report_map: BTreeMap<String, Aggregate> = aggregate_map.into_iter().collect();

for (city, aggregate) in report_map {
    println!(
        "City: {:<20} | {:.2} | {:.2} | {:.2}",
        city,
        stat.min,
        stat.mean(),
        stat.max
    );
}

Tip: stdout Locking

As a side note, this is not the challenge's output format but rather from the person asking for help which I keep here and is easier to read although this only matters for validating the results. To visualize the difference between output format, this is the official format:

{Abha=-23.0/18.0/59.2, Abidjan=-16.2/26.0/67.3}

This is the alternative format:

City | Abha                 | -23.00 | 18.00 | 59.20
City | Abidjan              | -16.20 | 26.00 | 67.30

With a working solution, how fast does it take it to run?

time cargo run --release -- $HOME/measurements.txt
# real        05m26.50s

Ignoring compile time and making sure it is a release build, the baseline solution is around 5 minutes 26 seconds which is 13 times slower than the target. Moving forward, this will be the command I use to check the time.

>> Async Rust (5m51s)

Reference: Commit | main.rs

The initial solution's performance is somewhat expected given this does not use asynchronous I/O to avoid blocking on file reads as well as use all cores. To enable asynchronous I/O or Rust overall, I will be utilizing tokio by adding it to the dependencies:

[dependencies]
tokio = { version = "1.36.0", features = ["full"] }

Migrating to asynchronous I/O is as simple as changing some imports from std::fs to tokio::fs and adding a few await points:

/// From std::io and std::fs
use tokio::{
    fs::File,
    io::{AsyncBufReadExt, BufReader},
};

/// Enable async
#[tokio::main]
async fn main() {
    let file = File::open(path).await.expect("file does not exist");
    let mut lines = BufReader::with_capacity(BUF_SIZE, file).lines();

    while let Ok(Some(line)) = lines.next_line().await {
    }
}

After updating the code to async, the performance is worse at 6 minutes 29 seconds. Checking the process viewer, it still uses a single thread because I did not use task::spawn to spawn tasks on other threads. Breaking down the current code into three tasks: read the file, parse lines, aggregate results. This is a good starting point:

/// Unbounded channels to pass data between tasks
let (parser_tx, mut parser_rx) = mpsc::unbounded_channel();
let (collector_tx, mut collector_rx) = mpsc::unbounded_channel();

/// Task to read data and send to the parsing task
let reader_task = task::spawn(async move {
    while let Ok(Some(line)) = lines.next_line().await {
        parser_tx.send(line).ok();
    }
});

/// Task to parse records per line and send to the collector task
let parser_task = task::spawn(async move {
    while let Some(line) = parser_rx.recv().await {
        let record = parse_line(line);

        collector_tx.send(record).ok();
    }
});

/// Task to collect the records
let collector_task = task::spawn(async move {
    let mut aggregate_map: HashMap<String, Aggregate> = HashMap::new();

    while let Some(record) = collector_rx.recv().await {
        let Record { city, measurement } = record;

        aggregate_map
            .entry(city)
            .and_modify(|aggregate| aggregate.record(measurement))
            .or_insert_with(|| Aggregate::new(measurement));
    }

    aggregate_map
});

/// Wait for all tasks to finish
let (_, _, collector_res) = join!(reader_task, parser_task, collector_task);
let aggregate_map = collector_res.expect("results not sent");

The code should look similar despite it being moved around to their own task. Each task loop ends when their corresponding resource (file or channel) is dropped which creates a neat and clean task dependency without any concurrency primitives. While message passing between the channels might be adding some overhead, checking the time again reveals a faster yet overall slower 5 minutes 51 seconds execution. Although this may be a step backwards, fully utilizing tokio may take a few adjustments.

>> Byte Chunks (4m49s)

Reference: Commit | main.rs

By reading and passing only one line, it only gives each task little to work with and no avenue to divide the task into more workers. So reading a whole chunk of data and spawning a parsing task for each chunk might be more efficient.

Replacing the convenient BufReader::lines() for the more low level File::read_buf requires some thought. When reading a whole chunk of data, only chunks that end with a newline (\n) should be sent to the parser so it can split them neatly. For each new chunk, it is split at the last newline and this leftover segment is merged at the beginning of the next chunk to create a smooth data pipeline. The new reader task looks like this:

let reader_task = task::spawn(async move {
    let mut prev = Vec::new();

    loop {
        /// Setting capacity is required for File.read_buf to work
        let mut buf = Vec::with_capacity(BUF_SIZE);

        /// Add leftover segment
        if !prev.is_empty() {
            buf.extend(prev.clone());
        }

        /// Read chunk
        file.read_buf(&mut buf).await.expect("fill buffer failed");
        let buf_len = buf.len();

        /// Split the chunk after (+1) the last newline.
        ///
        /// Using Iterator::rposition is more efficient here than Iterator::position
        /// by starting at the end
        let newline_index = buf
            .iter()
            .rposition(|byte| *byte == b'\n')
            .expect("newline is missing");

        /// Splitting the chunk with Vec::split_off makes this easy.
        ///
        /// Minor edge case if the newline is the last byte
        if newline_index + 1 < buf_len {
            prev = buf.split_off(newline_index + 1);
        } else {
            prev = Vec::new();
        }

        /// Sanity check if the chunk is valid
        assert_eq!(*buf.last().unwrap(), b'\n');

        parser_tx.send(buf).ok();

        /// If fewer bytes are filled, this is the last chunk
        if buf_len < BUF_SIZE {
            break;
        }
    }
});

Since a valid chunk is now passed to the parser task, it might be more efficient to split this chunk using a parallel processing library like rayon:

[dependencies]
rayon = "1.9.0"

The juicy function here is ParallelSlice::par_split which hopefully splits the chunks faster than sequential iteration. A small caveat when using rayon with tokio is avoiding a long-running task that can degrade the runtime performance explained more thoroughly in this article. It is also ambiguous how thread resources should be utilized between them without fine-tuning, but this is not a strong concern for now.

If the chunks are now processed into a list of records, it might be also more effective to collect the results into a mini-aggregate instead of passing multiple records. The parsing task should be:

let parser_task = task::spawn(async move {
    while let Some(chunk) = parser_rx.recv().await {
        let collector_tx = collector_tx.clone();

        /// Spawn a task to parse this chunk
        task::spawn(async move {
            /// Since chunk ends with \n, par_split will create an empty
            /// chunk, so skipping the last byte avoids this problem for
            /// parse_line.
            let records: Vec<_> = chunk[0..chunk.len() - 1]
                .par_split(|byte| *byte == b'\n')
                .map(parse_line)
                .collect();

            /// A good point to yield if parsing takes too long
            task::yield_now().await;

            /// Collect into a mini-aggregate
            let mut aggregate_map = HashMap::new();

            for record in records {
            }

            collector_tx.send(aggregate_map).ok();
        });
    }
});

Since parse_line needs to take a byte slice (&[u8]) instead of a string reference (&str), the function has to be updated:

fn parse_line(line: &[u8]) -> Record {
    /// Since slice.split_once is locked behind a feature gate, do it the
    /// longer way.
    ///
    /// See https://github.com/rust-lang/rust/issues/112811
    let semicolon_index = line
        .iter()
        .rposition(|byte| *byte == b';')
        .expect("line does not have a semicolon");

    let raw_city = std::str::from_utf8(&line[0..semicolon_index]).expect("city is not UTF8");
    let raw_measurement =
        std::str::from_utf8(&line[semicolon_index + 1..]).expect("measurement is not UTF8");

    /// ...
}

Lastly, the collector task is the easiest to update to combine mini-aggregates instead of a single record:

impl Aggregate {
    /// Merging two aggregates together
    fn merge(&mut self, other: &Self) {
        self.total = self.total + other.total;
        self.count = self.count + other.count;
        self.min = self.min.min(other.min);
        self.max = self.max.max(other.max);
    }
}

let collector_task = task::spawn(async move {
    let mut aggregate_map: HashMap<String, Aggregate> = HashMap::new();

    while let Some(other_map) = collector_rx.recv().await {
        /// Merging the smaller aggregate map to the larger one
        for (city, other_aggregate) in other_map {
            aggregate_map
                .entry(city)
                .and_modify(|aggregate| aggregate.merge(&other_aggregate))
                .or_insert_with(|| other_aggregate);
        }
    }

    aggregate_map
});

The most work by far and it does produce faster results at around 4 minutes 49 seconds.

>> Fixed Workers (5m12s)

Reference: Commit | main.rs

At this point, the code is similar enough to Go in its overall approach yet it is significantly slower. Checking the process monitor again, I did notice the memory usage was high enough to use all my RAM and some of my swap memory which is strange as the file is read in chunks. After tinkering with the time format option to reveal memory stats, how much memory does the code use?

# For -f to work, use /usr/bin/time instead of just time
$(which time) -f 'Time: %E | Memory: %M | Swapped: %W | Context Switched: %w' cargo run --release -- $HOME/measurements.txt
# Time: 4:52.60 | Memory: 29893648 | Swapped: 23559520 | Context Switched: 5698047

# For comparison
$(which time) -f 'Time: %E | Memory: %M | Swapped: %W | Context Switched: %w' go run 1brc_corlinp.go $HOME/measurements.txt
# Time: 0:26.06 | Memory: 1363336 | Swapped: 0 | Context Switched: 25325

If I read this correctly, my code uses around 30 GB of memory and occasional swap memory used compared to the Go code with only 1.3 GB of memory, no swaps and fewer context switches. Clearly, the immediate objective is to reduce excessive memory usage. Moving forward again, this will be new command to check both time and now memory.

To address first a glaring fault in my code, I spawned an unbounded number of parsing tasks which maybe causing some huge memory and context switching overhead. The quick fix is to spawn a fixed number of workers and send each incoming chunk to a worker sequentially:

/// The Go code uses 256 workers
const NUM_WORKERS: usize = 256;

let parser_task = task::spawn(async move {
    let mut worker_txs: Vec<UnboundedSender<Vec<u8>>> = Vec::with_capacity(NUM_WORKERS);

    for _ in 0..NUM_WORKERS {
        /// Create workers and only store their send channels
        let (worker_tx, mut worker_rx) = mpsc::unbounded_channel();
        worker_txs.push(worker_tx);

        let collector_tx = collector_tx.clone();

        task::spawn(async move {
            while let Some(chunk) = worker_rx.recv().await {
            }
        });
    }

    /// Send each new chunk to the next worker
    let mut worker_iter = (0..worker_txs.len()).cycle();

    while let Some(chunk) = parser_rx.recv().await {
        /// I would have used Iter::cycle_mut if it existed
        let worker_index = worker_iter.next().unwrap();
        let worker_tx = &mut worker_txs[worker_index];

        /// Moved the parsing code to the worker loop
        worker_tx.send(chunk).ok();
    }
});

Tip: Task Pool

Side note, I did find tokio_task_pool before but was not sure of its quality and it was a good opportunity to write a simple worker pool. The only caveat with this is fine-tuning the number of workers for performance, but it is still slightly slower at 5 minutes 12 seconds. For now, I will keep this change just to keep the memory bounded.

>> Object Pool (2m53s)

Reference: Commit | main.rs

Since Go uses a memory pool to reduce heap allocations, reusing allocations as well by adding an object pool might be the way to go. One of the biggest allocations and opportunity is the large read buffer. Although I initially did write my own object pool and worked, I decided to incorporate the more robust object-pool to the mix:

[dependencies]
object-pool = "0.5.4"
# To get the number of cores for allocation
num_cpus = "1.16.0"
# To allow static initializers
once_cell = "1.19.0"

I want to easily create a global object pool where I allocate one buffer for each logical core, so that is why I also pulled in once_cell for once_cell::sync::Lazy and num_cpus for num_cpus.get. Aside from the convenience crates, the changes in the code is quite minimal:

/// Create a pool with a byte buffer for each core
static BUF_POOL: Lazy<Pool<Vec<u8>>> =
    Lazy::new(|| Pool::new(num_cpus::get(), || Vec::with_capacity(BUF_SIZE)));

/// Reuse an available buffer
let mut buf: Reusable<Vec<u8>> = if let Some(buf) = BUF_POOL.try_pull() {
    buf
} else {
    /// Important to work on another task first before retrying
    task::yield_now().await;

    continue;
};

/// Reset the reused buffer
buf.clear();
buf.reserve(BUF_SIZE);

Checking out the performance of this change gives us a better 3 minutes 1 seconds with less memory used. If this works, what if I apply it to the aggregates as well? Before that, the challenge specifies a maximum of 10,000 unique cities, so the overall aggregate result can be prepared with that capacity:

const NUM_CITIES: usize = 10000;

let mut aggregate_map: HashMap<String, Aggregate> = HashMap::with_capacity(NUM_CITIES);

For the mini-aggregates, waiting for an available object here is not advisable nor am I certain what a good initial capacity is. So I opt to allocate when needed and hope 10,000 or NUM_CITIES is good enough. Another challenge rule is that the maximum length of each weather station city is 100, so each aggregate can have an estimated capacity of the chunk size divided by that value:

const CITY_LEN: usize = 100;

/// Likewise, a global aggregate pool
static AGGREGATE_POOL: Lazy<Pool<HashMap<String, Aggregate>>> =
    Lazy::new(|| Pool::new(NUM_CITIES, || HashMap::with_capacity(BUF_SIZE / CITY_LEN)));

/// Reuse a mini-aggregate or create one again
let mut aggregate_map =
    AGGREGATE_POOL.pull(|| HashMap::with_capacity(BUF_SIZE / CITY_LEN));
/// Make sure to clear before use
aggregate_map.clear();

After all the prepped allocations, the new code runs at 2 minutes 53 seconds and the memory usage better at 19 GB which is better but still large.

>> String Interning (0m52s)

Reference: Commit | main.rs

While the object pool did help, many objects are still being allocated. On a closer look, the code allocates a lot of city strings just to be a key in a hash map and only sorted near the end. So what if I place every city string in a global hash map where the key is a unique yet cheap to copy identifier? My initial prototype was using hash::BuildHasher with once_map::OnceMap which is a concurrent hash map where entries can only be inserted and not modified:

static STRING_MAP: Lazy<OnceMap<u64, String>> = Lazy::new(|| OnceMap::new());

/// Use the map hash as the key
let hash = STRING_MAP.hasher().hash_one(raw_city);

if !STRING_MAP.contains_key(&hash) {
    STRING_MAP.insert(hash, move |_| raw_city.to_owned());
};

This concept sounded familiar and I recognized this as string interning. Instead of my prototype, I decided to pull in a more mature crate like lasso for the job:

[dependencies]
lasso = { version = "0.7.2", features = ["multi-threaded", "ahasher"] }

I will be using lasso::ThreadedRodeo with ahash as the hasher for a fast keyed hash. The key function is ThreadedRodeo::get_or_intern that will be used during the parsing function:

/// Create a global string map
static STRING_MAP: Lazy<ThreadedRodeo> =
    Lazy::new(|| ThreadedRodeo::with_capacity(Capacity::for_strings(NUM_CITIES)));

#[derive(Debug)]
struct Record {
    /// Change city to city_key as well as from String to lasso::Spur
    city_key: Spur,
}

fn parse_line(line: &[u8]) -> Record {
    /// Intern the string slice for a key
    let city_key = STRING_MAP.get_or_intern(raw_city);

    Record {
        city_key,
    }
}

lasso::Spur is a 32 bit key which is much better than my 64 bit key hash. Since the key type changed, I have to update a few types and resolve the key into a string during the sorting process which is easy to refactor:

let report_map: BTreeMap<&str, Aggregate> = aggregate_map
    .into_iter()
    /// Convert the key back into a string slice
    .map(|(city_key, aggregate)| (STRING_MAP.resolve(&city_key), aggregate))
    .collect();

After updating the code, it runs much faster and better at below 52 seconds and with a memory usage of 1.2 GB which is less than Go's 1.3 GB. Amazing how much memory those small strings contributed to the performance of the code.

>> No Rayon (0m38s)

Reference: Commit | main.rs

If any small but numerous heap allocations are the issue, then rayon is on the chopping block for allocating intermediate vectors as part of its parallel processing. What would happen if the code was refactored without rayon to optimize for memory instead? Looking at task::spawn_blocking, it can create threads on demand as well as put the chunks into a queue unlike task::spawn. This means the parser workers can be reverted and back to its simpler form:

/// The parser task back to its simpler form
let parser_task = task::spawn(async move {
    while let Some(chunk) = parser_rx.recv().await {
        /// From task::spawn to task::spawn_blocking
        task::spawn_blocking(move || {
            /// Turn the parser into an iterator
            let mut record_iter = chunk[0..chunk.len()]
                .split(|byte| *byte == b'\n')
                .map(parse_line);

            while let Some(record) = record_iter.next() {
            }
        });
    }
});

A notable change is that parsing records is now an iterator which will be improved later on. With this simple refactor, the time is now at 38 seconds which is a minor improvement, but the code does feel less complex without another thread pool.

>> Integer Arithmetic (0m36s)

Reference: Commit | main.rs

Aside from one more minor savings in vector allocation, it might be better to optimize in other avenues again. The challenge specifies that the temperature measurements are floating numbers from -99.9 to 99.9 with the decimal number always present. Meaning instead of storing it as a f32, it can be stored as an i16 where the value is multiplied by 10 to be an integer which can take advantage of integer arithmetic and finally divided by 10 when displayed. Seeing it may be better:

/// Sum is now an isize instead of f32
let isum: isize = 0;
let count: usize = 0;

/// Temperatures always have 1 decimal value
let temps = vec![ -54.3, 21.0 -0.9, 77.0 ];

for temp in temps {
    ///  Hypothetical value from parsing a line
    let itemp = (temp * 10) as i16;

    isum += itemp as isize;
}

/// Convert integer sum back to f32
println!("MEAN: {.2}", ((isum as f32) / 10) / (count as f32));

Changing the types are easy and added the i prefix in the field names to indicate them as float as integer variables:

/// Added an 'i' prefix for each variable to indicate the change
#[derive(Debug)]
struct Record {
    city_key: Spur,
    imeasurement: i16,
}

#[derive(Debug)]
struct Aggregate {
    itotal: isize,
    count: usize,
    imin: i16,
    imax: i16,
}

/// Display function updated as well as new min and max function
impl Aggregate {
    fn min(&self) -> f32 {
        (self.imin as f32) / 10.0
    }

    fn mean(&self) -> f32 {
        (self.itotal as f32) / (self.count as f32) / 10.0
    }

    fn max(&self) -> f32 {
        (self.imax as f32) / 10.0
    }
}

Aside from some minor type casting changes, the parsing of measurement needs to be updated since f32 parsing cannot be used. The parser should be able to handle the measurement format that boils down to:

  1. Optional Hyphen
  2. Digit
  3. Optional Digit
  4. Period
  5. Digit

Translating this as an iterator:

let raw_measurement = &line[semicolon_index + 1..];

/// Parsing byte by byte with an iterator
let mut measurement_iter = raw_measurement.iter().peekable();

/// 1. Optional Sign
///
/// If the first character is an hyphen, the number is negative and
/// advance the iterator.
let sign = if **measurement_iter.peek().unwrap() == b'-' {
    let _hyphen = measurement_iter.next();

    -1 as i16
} else {
    1 as i16
};

/// 2. Digit
///
/// A neat trick to get the digit value from a byte
let first_digit = (measurement_iter.next().unwrap() - b'0') as i16;

/// 3 & 4. Optional Digit and Period
///
/// If the next byte is a period, the second digit is 0; otherwise, it
/// is a digit followed by a period
let second_digit = if **measurement_iter.peek().unwrap() == b'.' {
    let _period = measurement_iter.next();

    0 as i16
} else {
    let digit = (measurement_iter.next().unwrap() - b'0') as i16;
    let _period = measurement_iter.next();

    digit
};

/// 5. Digit
let third_digit = (measurement_iter.next().unwrap() - b'0') as i16;

/// Safety assertion that it is the end
assert!(measurement_iter.next().is_none());

/// Calculate the integer measurement using the digits and sign
let imeasurement = sign * (first_digit * 100 + second_digit * 10 + third_digit);

This change improves the time to 36 seconds.

>> Parser Iterator (0m33s)

Reference: Commit | main.rs

The last improvement with the line parsing is to minimize reading the large buffer as much as possible. The record iterator uses the slice::split that reads the buffer once to pass to parse_line which. Removing this extra iteration is a potential optimization:

let mut record_iter = chunk[0..chunk.len() - 1]
    .split(|byte| *byte == b'\n') /// Extra iteration on the buffer
    .map(parse_line);

Tip: Iterator Chaining

The next move is to expand parse_line as an iterator to take the whole chunk instead of a line:

/// What it should look like.
let mut record_iter = parse_chunk(&chunk);

fn parse_chunk<'a>(bytes: &'a [u8]) -> ChunkParser<'a> {
    ChunkParser { bytes }
}

/// Define the record iterator
struct ChunkParser<'a> {
    bytes: &'a [u8],
}

impl<'a> Iterator for ChunkParser<'a> {
    type Item = Record;

    fn next(&mut self) -> Option<Self::Item> {
        /// Move expanded parse_line here
    }
}

Since the iterator takes a borrowed slice, the trick to consuming it is to borrow the slice again after the first newline until it is empty. To sanely get the index, I had to use Iterator::enumerate that made the measurement_iter change its return type and slightly updated the parse_line logic. Showing just the main changes as an iterator:

/// Termination condition
if self.bytes.is_empty() {
    return None;
}

let semicolon_index = self.bytes.iter().position(|byte| *byte == b';').unwrap();

/// Reborrow here to make it easier for the next section
self.bytes = &self.bytes[semicolon_index + 1..];

/// Adding Iterator::enumerate to determine the position of the last byte
let mut measurement_iter = self.bytes.iter().enumerate().peekable();


let imeasurement = sign * (first_digit * 100 + second_digit * 10 + third_digit);

/// A newline must also be consumed now
let (_, newline) = measurement_iter.next().unwrap();
assert_eq!(*newline, b'\n');

/// If the iterator has more bytes, move to that index
if let Some((next_index, _)) = measurement_iter.peek() {
    self.bytes = &self.bytes[*next_index..];
} else {
    /// Borrow an empty slice to end the cycle
    self.bytes = &self.bytes[0..0];
}

Removing the extra iteration brings the time down to 33 seconds.

>> Unsafe str (0m29s)

Reference: Commit | main.rs

While working on the previous change, I noticed the following line:

let city_bytes = &self.bytes[0..semicolon_index];

/// Is the string conversion fast?
let raw_city =
    std::str::from_utf8(city_bytes).expect("city is not UTF8");

The function str::from_utf8 checks first if the bytes are UTF-8 valid before performing the conversion which is good. Since the data is guaranteed to be valid, the check can be skipped in favor of str::from_utf8_unchecked to convert the type directly:

let city_bytes = &self.bytes[0..semicolon_index];

/// Safety: Skip the validity check given the challenge data rules
let raw_city = unsafe { std::str::from_utf8_unchecked(city_bytes) };

Tip: Bounds Check

This is the only time unsafe Rust is used so no need to panic. Still, the time did go down to 29 seconds for just skipping data validation.

>> Allocation Cleanup (0m27.75s)

Reference: Commit | main.rs

Using the object pool did help with the memory usage. Since I limited the number of buffers in the pool, it has to wait:

let mut buf: Reusable<Vec<u8>> = if let Some(buf) = BUF_POOL.try_pull() {
    buf
} else {
    /// How many times does this branch trigger?
    task::yield_now().await;

    continue;
};

Checking the branch with a counter, this branch is triggered on average 300000 times. One way to reduce the branch entry is to increase the number of available buffers. Going that route requires num_cpus::get() * 32 or 512 buffers which increases the memory allocations which did make it run slightly slower. Another route perhaps is using the bytes crate which is a more efficient shared byte buffer:

[dependencies]
bytes = "1.5.0"

It is functionally an Arc<Vec<u8>> with more optimizations as well as utilizing disjointed buffers (rope structure) which coincides with joining each chunk with the previous leftover. Since the API is similar to Vec, it is simply a type change:

let reader_task = task::spawn(async move {
    /// Bytes::new is zero allocation
    let mut prev = Bytes::new();
    /// Create an empty buffer
    let buf = BytesMut::with_capacity(BUF_SIZE);

    loop {
        /// Instead of using an object pool, cloning the empty buffer
        /// works well enough
        let mut buf: BytesMut = buf.clone();
    }
})

This is strangely more efficient bringing the time down to 27.5 seconds.

>> Block Remove (0m26.5s)

Reference: Commit | main.rs

To satisfy the compiler, I added HashMap::drain while refactoring on floating to integer types for measurements:

while let Some(mut other_map) = collector_rx.recv().await {
    /// HashMap::drain so that insert works
    for (city, other_aggregate) in other_map.drain() {
        aggregate_map
            .entry(city)
            .and_modify(|aggregate| aggregate.merge(&other_aggregate))
            /// Insert requires owned data
            .or_insert_with(|| other_aggregate);
    }
}

Since I was afraid of cloning, I opted to remove entries one-by-one from a map; however, since these are reused by the object pool when dropped, it might be better to clone them and let HashMap::clear remove them all at once. Also, cloning occurs only on insert which is less frequent than modification:

aggregate_map
    .entry(*city)
    .and_modify(|aggregate| aggregate.merge(&other_aggregate))
    /// Accept cloning
    .or_insert_with(|| other_aggregate.clone());

Tip: Reusing Collections

This simple change drops the time to 26.5 seconds which is somewhat surprising.

>> No Strings (0m24s)

Reference: Commit | main.rs

Reviewing the Go code, it does not actually use strings but fixed length byte arrays specially as keys for the city name and then displayed as a string via fmt.Printf:

type CityName [64]byte

fmt.Printf("City: %s", city[:])

For Rust, this means replacing the string slice (&str) for a byte slice (&[u8]) and converting it a string slice right before output. This does make sense and saves numerous type reallocation call from byte to string slice. Since this is not doable with lasso, I have to write my own byte slice equivalent optimized for this use case. Ideally, it only does the following:

/// No more str conversion
let raw_city = &self.bytes[0..semicolon_index];

/// Must take a byte slice now
let city_key = STRING_MAP.get_or_intern(raw_city);

println!(
    "City: {:<20} | {:.2} | {:.2} | {:.2}",
    /// Conversion to str moved here
    unsafe { std::str::from_utf8_unchecked(&city_bytes) },
    aggregate.min(),
    aggregate.mean(),
    aggregate.max()
);

A quick implementation is to create a very large buffer and allocate a segment for each unique hash of a byte slice. To make this work, I will be adding a concurrent hash map dashmap and with the familiar ahash to the fold which lasso both uses internally:

[dependencies]
dashmap = { version = "5.5.3", features = ["inline"] }
ahash = { version = "0.8.11", features = ["compile-time-rng"] }

My idea looks like this:

/// Use ahash for the hasher
use ahash::RandomState;

/// Use const generics to represent the length of a line (LEN) and the
/// maximum number of lines (CAP). Perhaps unnecessary?
struct ByteStringPool<const LEN: usize, const CAP: usize> {
    buf: Mutex<BytesMut>,
    segments: DashMap<u32, Bytes, RandomState>,
}

impl<const LEN: usize, const CAP: usize> ByteStringPool<LEN, CAP> {
    fn new() -> Self {
        Self {
            /// The shared global buffer
            buf: Mutex::new(BytesMut::zeroed(LEN * CAP)),
            /// Each hash is mapped to an immutable buffer region
            segments: DashMap::with_capacity_and_hasher(CAP, RandomState::new()),
        }
    }
}

/// New definition
static STRING_MAP: Lazy<ByteStringPool<CITY_LEN, NUM_CITIES>> = Lazy::new(|| ByteStringPool::new());

By allocating the required byte string memory needed, I am consciously trading off memory for speed of around 1000000 bytes for every possible city. Since it is a write-once data structure, it is easy to ignore any locking errors as long as any data is written:

fn intern(&self, bytes: &[u8]) -> u32 {
    /// Make sure city bytes follows the length restriction. LEN can be
    /// lowered even further later on that is why this check is here.
    let bytes_len = bytes.len();
    assert!(bytes.len() <= LEN);

    /// Generate the hash or the key
    let hash = self.segments.hasher().hash_one(bytes) as u32;

    /// Make sure capacity is correct
    assert!(self.segments.len() <= CAP);

    /// Lock the entry if it does not exist
    if let Some(Entry::Vacant(entry)) = self.segments.try_entry(hash) {
        if let Ok(mut buf) = self.buf.lock() {
            /// Check if the buffer can be split
            assert!(!buf.is_empty());

            /// Cut a segment in the front
            let mut segment = buf.split_to(LEN);

            /// Drop the mutex lock earlier for speed
            drop(buf);

            /// BytesMut.truncate needed for BytesMut.copy_from_slice to
            /// work properly
            segment.truncate(bytes_len);
            segment.copy_from_slice(bytes);

            /// Use BytesMut.freeze to mark it read-only
            entry.insert(segment.freeze());
        }
    }

    hash
}

Implementing resolve is trivial but just for completion:

fn resolve(&self, key: &u32) -> Bytes {
    self.segments.get(key).expect("Invalid key").clone()
}

Surprisingly this DIY byte string map pushes down the time enough to beat the Go implementation at 24 seconds although the memory usage did spike to 4 GB. Side note, I do have an even faster implementation of this, but it uses pointers and atomic types which I am not yet well versed to explain.

>> Tokio Tuning (0m23.5s)

Reference: Commit | main.rs

Before wrapping up, time for one final tuning to make the time more consistent. I wanted to keep the memory usage under control, but it varies from each run the code has minimal allocations that leaves tokio's runtime in question. Among all configurations, I found setting the Builder::max_blocking_threads to the number of cores (16) minus each of the main spawn tasks and current thread (4) a sweet spot in being fast and minimizing allocations:

fn main() {
    let cpus = num_cpus::get();

    let rt = Builder::new_multi_thread()
        .max_blocking_threads(cpus - 4)
        .build()
        .expect("Runtime config error");

    rt.block_on(execute());
}

/// Formerly main
async fn execute() {
}

This finally brings the time around 23.5 seconds and 0.6 GB of memory usage which finalizes my solution.

>> Conclusion

Given the same task, Go's allocates and reuses memory really well without much effort where most of my best time savings comes from doing the same in Rust. While I do not necessarily want an encompassing solution, a general automatic memory manager crate or similar that is well documented and supported would really have closed the gap much easily which I feel is a barren field. I only know of crossbeam::Epoch which is well-supported, but I do not want to deal with atomic ordering and nothing else seem to fit. Although the object_pool and lasso crates really helped as well, they do not feel like standard and accepted solutions. Dropping rayon and writing a DIY byte string cache without strong and obvious alternatives made me feel this hole in the crate ecosystem. Perhaps a skill and knowledge failing on my part, I just thought this general performance challenge would be easier to tackle now.

My introspective takeaway though is being wary in overselling memory efficiency as a core value seeing the gap in effort and time. Nonetheless, I did enjoy working on this challenge in my own terms and being rewarded in lowering the time specially by reducing allocations and being more precise was a dopamine hit. To reiterate, this is not about Rust vs Go, this is satisfying my curiosity why a Rust implementation can underperform and how to resolve that with a good Go implementation as a really tight target to which I am satisfied.

>> Notable Attempts

Since I already achieved my goal, I am not optimizing further than this. I do want to note some of my other attempts that did not make it through:

>> Special Mention: mo8it/obrc (0m10s)

Reference: Repo

Special mention to @m08bit@fosstodon.org for an implementation that just impresses me and want to add a special section on my findings about it. It primarily uses memmap2 where I did not notice it can be treated as a byte slice which makes bytes unnecessary as each chunk can be borrowed directly instead of waiting for each read buffer. This also means that the whole file data can be shared across multiple threads using thread::scope meaning tokio is no longer needed. The general idea looks like this:

let file = OpenOptions::new().read(true).open(path).expect("file does not exist");
let mmap = unsafe { Mmap::map(&file) }.expect("mmap failed");
/// mmap as a byte slice
let bytes = &mmap;

/// Divide the work to each thread
let workers = num_cpus::get();
let chunk_len = bytes.len / workers;

thread::scope(|s| {
    for i in 0..workers {
        /// Spawn a new worker for each chunk
        let workder_handle = s::spawn(|| {
            let chunk = &workers[chunk_len * i..chunk_len * (i + 1)];
            let mut record_iter = parse_chunk(chunk);
        });
    }
})

Aside from having a more optimized measurement parser, here are some points that I picked up:

The code is so much shorter and more efficient that I am thankful to have learned a bit more about performance.

>> Resources & References

Much gratitude for the resources and references I used for this article: