Playing with 🦀 Rust: Building a cli app to estimate disk space usage

rust Oct 10, 2021

I have been fascinated by 🦀 Rust for a while now. However unlike some other languages I've tried over the years, the journey has not been straightforward so far. That is mostly due to the fact that you have to be the a coder as well as the garbage collector at the same time.

I am slowly getting the grip on this, thanks to the best in class documentations and community support available in the internet. Since I learn best by "doing", I've spent this weekend building a small Cli tool to estimate disk space used by a folder including all subfolders. Something like the du command that comes with GNU Coreutils

$ du -hd 0
156M    .

❓This post was written by a Rust newbie - so you know what to do! I may be talking nonsense here so help this friend out by posting your suggestions in the comment.


The Idea

The cli will take a path or default to the current working directory and then display number of files it contains and total disk space used by them.

# current path
$ dux
Total size is 204828703 bytes (204.83 MB) across 1176 items

# specify a path
$ dux ~/bin/
Total size is 586934311 bytes (586.93 MB) across 3372 items

Quick and dirty implementation

The first version was very straight-forward that simply traverses the file system using a single thread. Obviously this was very slow.

[package]
name = "dux"
version = "0.1.0"
edition = "2018"

[profile.release]
lto = true

[dependencies]
pretty-bytes = "0.2.2"
cargo.toml
use pretty_bytes::converter::convert as humanize_byte;
use std::path::{Path, PathBuf};
use std::{env, path};

fn main() {
    let current_path = env::current_dir()
        .expect("")
        .to_str()
        .expect("")
        .to_string();
    let args: Vec<String> = env::args().collect();
    let target = args.get(1).unwrap_or(&current_path);
    let path = path::Path::new(target);

    if !path.exists() {
        eprintln!("Invalid path: {}", &target);
    } else if path.is_file() {
        todo!();
    } else if path.is_dir() {
        // Single threaded
        let size: f64 = size_of_dir_single_threaded(path) as f64;
        println!("Total size is {} bytes ({})", size, humanize_byte(size));
    } else {
        eprintln!("Unknown type {}", target);
    }
}

fn size_of_dir_single_threaded(path: &path::Path) -> u64 {
    if !path.is_dir() {
        return 0;
    }

    let mut count = 0;
    for entry in path.read_dir().expect("Read dir").flatten() {
        let path = entry.path();
        if path.is_file() {
            count += path.metadata().unwrap().len();
        } else if path.is_dir() {
            count += size_of_dir_single_threaded(&path);
        }
    }
    count
}
main.rs

This is obviously going to be painfully slow! we can do better! 👐

Objectives

I am hoping to touch a few more Rust concepts and implement a more performant solution that gives me exposure to these:

  • Channels - multiple producers walk the directory tree and multiple receivers will count them in parallel.
  • Spawning multiple threads (default is 1 thread per CPU core)
  • Explore relevant rust traits,

Better version

It always starts with a c̶l̶a̶s̶s̶  struct

Lets start by defining a struct to hold the statistics we will collect as we will travarse the file system. We are interested in keeping track of total disk space in bytes and number of files we've travarsed to far.

#[derive(Default)]
struct Stats {
    size: u64,
    count: i32,
}

We are using the derive macro to auto generate the code for

  • Default Trait: So we can create a default instance using Stats::default(). Another approach could be to implement a new static function that returns a new instance - but I think this trait is more idiomatic and well understood by others. Initial value for both fields are the default value of their data type size = 0, count = 0

Next, lets implement the Display trait so we can specify how these data is formatted to look like our expected output, eg: 586934311 bytes (586.93 MB) across 3372 items

use pretty_bytes::converter::convert as humanize_byte;

impl Display for Stats {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        write!(
            f,
            "{} bytes ({}) across {} items",
            self.size,
            humanize_byte(self.size as f64),
            self.count
        )
    }
}

The humanize_byte came from a crate called pretty-bytes, so we need to update cargo.toml accordingly. While we are at it, we will also add another crate that lets us retrieve number of CPU cores available in the computer.

# ...
[dependencies]
pretty-bytes = "0.2.2"
num_cpus = "1.13.0"
cargo.toml

The better size_of_dir function

Lets update our main function to use function that should walk all the nested subdirectories and return a Stats object with all the information we are after. We will specify the number of workers thread to use to speed up the process.

fn main() {    
    // [...]
    if !path.exists() {
        // [...]
    } else if path.is_file() {
       // [...]
    } else if path.is_dir() {
        // Single threaded
        // code is removed

        // Multi threaded
        let cores = num_cpus::get().to_string();
        let cores = std::env::var("WORKERS").unwrap_or(cores).parse().unwrap();
        let stat = size_of_dir(path, cores);

        println!("Total size is {}", stat);
    } else {
    // [...]
    }
}

// new function
fn size_of_dir(path: &path::Path, num_threads: usize) -> Stats {
    todo!();
}

As you can see, we are making it possible to override the number of threads to be used using an environment variable WORKERS.

Type inference

Note we are asking the envrionment variable or the number of cores - both of which returns String to be parsed into usize without explicitly specifying the type (eg: parse::<T>()).

// `parse()` doesn't know the target type yet
let cores = std::env::var("WORKERS").unwrap_or(cores).parse().unwrap();

// `cores` is passed as an argument that requires type `usize` so `parse` in line above is inferred as `parse::<usize>()`
let stat = size_of_dir(path, cores);

As soon as we pass the cores into size_of_dir function as an argument for num_threads which accepts usize type - the compilers infers the type requested for parse must be usize. Nice! 💛💛

Hard question: How to channel in 🦀 Rust?

Channels allow you to safely share data between threads. Depending on the type of channels - there can me multiple publishers and multiple receivers on different end of a channel. This replaces the need to introduce shared variables across threads which is notoriously hard to get right (in other languages) by giving us a pub-sub model. Channels are available in all modern languages like C# System.Threading.Channels or go's channel. I was blown away with the simplicity of using channels across different goroutes.

In my use case, I want

  • thread 1: to walk into each folder and then, for each sub-folder publish a message using this channel no notify other (idle) worker to process them.
    • For each file it encounters, it increments a Stats object it holds with the size of the file and increment the count by 1.
  • thread 2..n: receives each of these message and walk into these folder and repeat what thread a does.

Now, a basic channel implementation for Rust is in std::sync::mpsc::channel. It's explained brilliantly in Jon Gjengset's Crust Of Rust video about channels.

However looking at the documentation,

The Sender can be cloned to send to the same channel multiple times, but only one Receiver is supported.

I see this supports only one Receiver. Not useful for our use case. Quick google took me to a similar question in rust user group where a Rustaceans suggested crossbeam_channel crate. Sweet!, lets add this into our cargo.toml

# ...
[dependencies]
# ...
crossbeam-channel = "0.5.1"
cargo.toml

Lets implement our size_of_dir function

use crossbeam_channel::{unbounded, Receiver, Sender};

fn size_of_dir(path: &path::Path, num_threads: usize) -> Stats {
    let mut stats = Vec::new();
    let mut consumers = Vec::new();
    {
        let (producer, rx) = unbounded();

        for idx in 0..num_threads {
            let producer = producer.clone();
            let rx = rx.clone();

            consumers.push(std::thread::spawn(move || worker(idx, rx, &producer)));
        }

        // walk the root folder
        stats.push(walk(path, &producer));
    } // extra block so the channel is dropped early, 
      // therefore all threads waiting for new message will encounter the
      // exit codition and will run to the end.

    // wait for all receiver to finish
    for c in consumers {
        let stat = c.join().unwrap();
        stats.push(stat);
    }

    stats.iter().sum()
}

// this is an worker
fn worker(idx: usize, receiver: Receiver<PathBuf>, sender: &Sender<PathBuf>) -> Stats {
    todo!();
    // walks into each PathBuf it receives from receiver,
    // returns the Stat object in the end
}

fn walk(path: &path::Path, sender: &Sender<PathBuf>) -> Stats {
    todo!();
    // actual calculation happens here
    // 1. for each file in path, increment a local Stat object with the size
    // 2. for each folder encountered - publish a message using sender, so another worker can pick up and process this
    // 3. return the Stat object in the end
}

We start by creating a list of Stats object that we will populate once each of the worker thread is completed that each return an instance of Stats object. These worker thread will process random number of folders - so the Stats will

New Trait: Sum<T>

First error is, we are unable to perform sum() on a collection of arbitrary struct: Stats.

stats.iter().sum()
    Checking dux v0.1.0 (/home/mustakim/code/rust-projects/dux)
error[E0277]: the trait bound `Stats: Sum<&Stats>` is not satisfied
   --> src/main.rs:124:18
    |
124 |     stats.iter().sum()
    |                  ^^^ the trait `Sum<&Stats>` is not implemented for `Stats`

For more information about this error, try `rustc --explain E0277`.
error: could not compile `dux` due to previous error

As with most other cases, Rust compiler actually tells you exactly what to do next. So I got introduced with the Trait Sum<T> and need to implement this.

impl<'a> std::iter::Sum<&'a Stats> for Stats {
    fn sum<I: Iterator<Item = &'a Stats>>(iter: I) -> Self {
        let mut result = Self::default();
        for stat in iter {
            result.count += stat.count;
            result.size += stat.size;
        }
        result
    }
}

If you are a newbie like me, you may start without the lifetime annotation anywhere or even without the generic type parameter - the compiler will guide you there.

The walk function

Looking at the requirement written as comment, this seem more straightforward to implement,

fn walk(path: &path::Path, sender: &Sender<PathBuf>) -> Stats {
    let mut stat = Stats::default();

    // Optimisation (makes it faster)
    // if !path.is_dir() {
    //     return;
    // }
    if let Err(e) = path.read_dir() {
        eprintln!("Error {} ({})", e, path.to_str().unwrap());
        return stat;
    } else if let Ok(dir_items) = path.read_dir() {
        for entry in dir_items.flatten() {
            let path = entry.path();
            if path.is_file() {
                stat.add_file(&path).unwrap();
            } else if path.is_dir() {
            	// publish message to the channel
                sender.try_send(path).unwrap();
            }
        }
    }
    stat
}
The walk function

The worker function

Next up is the final piece of the puzzle, the worker code - that handles all incoming message as long as the channel is active (eg. not dropped)

fn worker(idx: usize, receiver: Receiver<PathBuf>, sender: &Sender<PathBuf>) -> Stats {
    let mut stat = Stats::default();
    while let Ok(path) = receiver.recv_timeout(Duration::from_millis(50)) {
        let newstat = walk(&path, sender);
        stat += newstat;
    }

    stat
}

The receiver.recv_timeout sleeps until a new message is available or it times out (50 ms). There are other options like receiver.recv() that should work as well but I thought it's good to specify a timeout.

Once a message is available, we call the walk function with the new path received and then add the Stats returned to the local stat variable.

stat += newstat;
what is going on there?

As I'm exploring Rust, I thought it would look nicer if I allowed a Stat object could be added and assigned to another Stats object, something like overloading += operator. There is a Trait for that.

impl std::ops::AddAssign for Stats {
    fn add_assign(&mut self, rhs: Self) {
        self.count += rhs.count;
        self.size += rhs.size;
    }
}

Similiary if I wanted to allow let s = stat + newstat I'd need to implement std::ops::Add trait. You don't need to remember these, simply write the code you wished worked, eg

stat = stat + newstat;

Let the compiller say it for you

error[E0369]: cannot add `Stats` to `Stats`
   --> src/main.rs:132:21
    |
132 |         stat = stat + newstat;
    |                ---- ^ ------- Stats
    |                |
    |                Stats
    |
    = note: an implementation of `std::ops::Add` might be missing for `Stats`

For more information about this error, try `rustc --explain E0369`.

I ♥ this! coming from OOP languages - i find it a great relief that I do not need deal with interface, abstract class etc to introduce shared behaviour in my own code and understand "what's what" in someone else's code.

In my own "how can you do this" mental model - If C# is on the far left 🤷 and go is on the far right ⛔ - Rust puts itself right in the middle in this.

The end result

Lets compare the difference in performance between the first quick and dirty version with this implementaiton

# quick and dirty implementation
$ time dux
Total size is 666559986 bytes (666.56 MB)
dux  0.14s user 3.55s system 50% cpu 7.280 total

# this implementation
$ time dux
Total size is 666559986 bytes (666.56 MB) across 31623 items
dux  0.24s user 3.12s system 76% cpu 4.365 total

That's good enough for now. It was not about performance gain, it was about trying out different Rust elements.

The journey

This was result of many trial and errors in an entire lazy Saturday afternoon. In the first iteration of the optimised implementation, I was not using a struct instead only counting the disk space and sharing the same variable across all the worker threads (although safely, using Rusts mandatory Mutex and Arc etc smart pointers). This implementation looked like this.

fn size_of(path: &path::Path) -> u64 {
    let size = Arc::from(Mutex::new(Box::from(0 as u64))); //<-- yuk
    let mut consumers = Vec::new();
    {
        let (producer, rx) = unbounded();
        let producer = Box::new(producer);

        for idx in 1..10 {
            let producer = producer.clone();
            let rx = rx.clone();
            let size = size.clone();

            consumers.push(std::thread::spawn(move || -> () {
                let p = producer.as_ref().clone();
                worker(idx, rx.clone(), &p, &size);
            }));
        }

        // if in trouble - just `clone` it.  ¯\_(ツ)_/¯
        walk(path, &producer.as_ref().clone(), &size.clone().as_ref());
    }

    for c in consumers {
        c.join().unwrap();
    }

    *size.clone().lock().unwrap().as_ref()  //<-- yuk
}

So I was basically just cloneing when in trouble! as well as started with shared states (the size variable) even though this problem can be solved without any shared states at all.

During my limited test, this performed almost same as the final implementation. I am not sure why this Mutex didn't introduce any perf penalty here.

Anyway, this looked messy to me and I had to put little bit more effort in order to come up with a better implementation than this.

clippy the next best friend after rustc

A great friend on this was clippy - it suggested various cases where the clone wasn't necessary. It event suggest me to rewrite some loop in a much nicer way.

Next step

Not sure if I will come back to this, but if I do, then I'll try to make it act like a cli tool

  • Use clap to parse command line arguments and render a --help screen.
  • Output the result to make it easier to pipe

The code is available in GitHub. Feel free to suggest me any Rust feature that I could have used here.

Mohammad Mustakim Ali

I'm a Software Engineer living in London, UK. My passion is to make *very fast* software with great user experience and I have just got little better on this than I was yesterday.