Figuring out how to elegantly consume multi-file Parquet data may seem challenging unless you dig into the test cases and the source code for the Rust Parquet crate. The one example given in documentation is misleadingly simple. I’ll show a couple examples of how it’s done. Jump to “The Solution” at the end to skip the journey it took to find it.

Multi-File Parquet

Some data science / “big data” tools that produce Parquet formatted data will produce a single “dataset” as a directory full of files. This may be done to allow parallel execution and creation of the output data, or to provide a Parquet dataset partitioned by some useful values for faster response on a particular type of query. Spark is one such tool.

IPUMS produces multi-file Parquet when we produce our large IPUMS data in parallel. In our case each file represents a contiguous slice of the whole dataset.

You might refer to a multi-file Parquet dataset by a single directory name or a list of absolute paths that together make up a dataset. In the examples here I show reading Parquet data when the dataset has a name like “../my_data/dataset-A.parquet/” where the directory has the “.parquet” suffix just as the single-file Parquet data would. But this is just a convention.

Create an Iterator

In the Rust Parquet library in the high-level record API you use a RowIter to iterate over a Parquet file and yield records full of rows constructed from the columnar data. Optionally you can supply a “schema projection” to cause the reader to read – and the records to contain – only a selected subset of the full schema in that file:

// Straight from the arrow-rs  repository documentation

use std::fs::File;
use std::path::Path;
use parquet::file::reader::{FileReader, SerializedFileReader};

let file = File::open(&Path::new("/path/to/file")).unwrap();
let reader = SerializedFileReader::new(file).unwrap();
let mut iter = reader.get_row_iter(None).unwrap();
while let Some(record) = iter.next() {
    println!("{}", record);
}

The key bit is:

let mut iter = reader.get_row_iter(None).unwrap();

Replace that ‘None’ with the optional schema projection:

let mut row_iter = reader.get_row_iter(Some(schema_projection)).unwrap();

For full details on producing this schema projection see my previous post.

The important point is that it appears the API allows you to use get_row_iter(None | Some(schema)) as a way to get an iterator over Parquet data. That’s not quite right, though. It only works on a single file reader.

Iterating Over Multiple Files

There’s a deceptively elegant example given in the documentation:

// From the rs-arrow parquet::File docs 
//
// In the case where you don't need to use .get_row_iter(Option<Type>) 
// for a schema  projection, the solution is very neat.
//	
let paths = vec![
    "/path/to/sample.parquet/part-1.snappy.parquet",
    "/path/to/sample.parquet/part-2.snappy.parquet"
];
// Create a reader for each file and flat map rows
let rows = paths.iter()
    .map(|p| SerializedFileReader::try_from(*p).unwrap())
	// You might think you can put .get_row_iter(schema) here...
    .flat_map(|r| r.into_iter());

// Use the 'row' iterator to get all the rows in all the files 
for row in rows {
    println!("{}", row);
}

To make this sample code much more useful, it needs to accept a “schema projection” as discussed earlier. The serialized reader struct has a get_row_iter(None | Some(schema)) function; You might think you can put .get_row_iter(schema) on the reader in the flat_map() call.

Trying to use get_row_iter() fails to compile, however because the row iter struct has a pointer to the Schema but doesn’t own it and can’t borrow; that can be remedied with a simple clone on the schema to allow an into_iter. However, we still won’t be able to compile code like the above example using get_row_iter() anywhere in it to project a schema, because the borrow checker isn’t satisfied we own the reader!

Investigating

To understand the issue better we need to view the “arrow-rs” source.

It appearsRowIter has a file reader that depending on construction may be just a reference:

// from src/record/reader.rs

pub struct RowIter<'a> {
    descr: SchemaDescPtr,
    tree_builder: TreeBuilder,
    file_reader: Option<Either<'a>>,
    current_row_group: usize,

....
}

The ‘filereader : Either’ is:

enum Either<'a> {
    Left(&'a dyn FileReader),
    Right(Box<dyn FileReader>),
}

This makes it so you can’t flat map row iter types and return an iterator owning the readers created with the “left” varient even though you can flat map the iter using into_iter() right off of the SerializedFileReader. We need to figure out a way to get a reader constructed as the “right” varient.

Looks like we need this method in order to return an owned iterator?

/// Creates a iterator of [`Row`](crate::record::Row)s from a
    /// [`FileReader`](crate::file::reader::FileReader) using the full file schema.
    pub fn from_file_into(reader: Box<dyn FileReader>) -> Self {

Reviewing the source, get_row_iter(schema) isn’t the only way to ask for a schema projection; you can use “project(schema)” on the results of RowIter::from_file_into(...). If you pass RowIter::from_file_into() a boxed serialized file reader you get a row iterator from it and can ask it to be made with project(schema).

Before spending more time trying to figure out how exactly to code the solution, I reviewed the tests, figuring that the ‘Either’ varient and the RowIter::from_file_into() must get tested, right?

Sure enough:

	fn test_file_reader_iter_projection() {
        let path = get_test_path("alltypes_plain.parquet");
        let values = vec![path]
            .iter()
            .map(|p| SerializedFileReader::try_from(p.as_path()).unwrap())
            .flat_map(|r| {
                let schema = "message schema { OPTIONAL INT32 id; }";
                let proj = parse_message_type(&schema).ok();

                RowIter::from_file_into(Box::new(r)).project(proj).unwrap()
            })
            .map(|r| format!("id:{}", r.fmt(0)))
            .collect::<Vec<_>>()
            .join(", ");

        assert_eq!(values, "id:4, id:5, id:6, id:7, id:2, id:3, id:0, id:1");
    }    

The crucial difference is the line:

	RowIter::from_file_into(Box::new(r)).project(proj).unwrap()

that replaced

 r.into_iter()

in the flat_map().

The Solution

We map a collection of serialized file readers from the multiple files. Then we box them up, give them a schema projection, make a RowIter from the result with from_file_into_iter() and flat map them.

The code would look like this:

fn example_multifile(
		&self, 
		path:&str, 
		schema:parquet::schema::types::Type
	){
	// Return all parquet files in a directory,  and all 
	// subdirectories:
	let paths = Extractor::get_all_parquet_parts(path);
	
	let rows = paths.iter()
		.map(|p| 	
			SerializedFileReader::try_from(p.as_str()).unwrap())
		.flat_map(|r| 
			RowIter::from_file_into(
				Box::new(r))
				.project(Some(schema.clone()))
			.unwrap());

	for row in rows{
		println!("{}", row);
	}

Elegant, and much more powerful than being forced to read rows with all columns included.

A Less Streamlined Alternative

I investigated the above solution partly because I was learning Rust and simply wanted to understand the Parquet library better. Surely, given the nice row iterator in the sample code there would be a slightly different way to do the same but with a schema projection?

If you let go of the goal of having a single iterator for multi-file Parquet data you can use a for_each() and create the serialized reader and row iterator inside the for_each() block on each file thus avoiding most ownership complexities.

This is from working code that reads multi-file data and formats it as CSV:

// ... after obtaining the paths like before 

paths.iter()
	.for_each(|p|  {			
		let file = File::open(&Path::new(p))
			.expect(
				&format!("Couldn't open parquet data at {}", &p));
				
		let reader = SerializedFileReader::new(file).unwrap();			
		let mut row_iter = reader
			.get_row_iter(Some(schema.clone()))
			.expect("
				Cannot create row iterator on input parquet with given schema.");
			
		while let Some(record) = row_iter.next() {				
			let csv_row = self.format_row(&record, &delimiter);
			out_writer
				.write(format!(
					"{}{}", 
						csv_row, line_ending).as_bytes())
				.expect(
					&format!("Write failed at row: {}, writing to {}."
						,rows_read, &output_name));
		}
	
	});