Introduction

Update: 2018-10-19: Specific instructions for building Parquet and Arrow libraries in this post are out of date as of the most recent major release of Arrow. The rest is still correct and useful. See the Arrow homepage for instructions. In essence you build both Parquet and Arrow libraries from the Arrow project ; you can optionally build the Parquet libraries from the Arrow project, or only build the Arrow library.


For a number of reasons you may wish to read and write Parquet format data files from C++ code rather than using pre-built readers and writers found in Apache Spark, Drill, or other big data execution frameworks.

You can achieve more efficient use of limited resources for one, and you may want to create a library for an existing C++ application or another language. If nothing else, it’s very convenient to be able to create stand -alone utilities with the capability to read and write Parquet format or give your existing applications such capabilities.

In my case the unusual data format of IPUMS micro-data necessitated a custom conversion tool of some sort. Also, I knew I needed to convert large batches of data quickly, so C++ seemed the best approach for conserving memory and getting fast execution. With an understanding of columnar data and after studying the “parquet-cpp” API I knew this would be possible.

What follows are my notes on using and building the parquet-cpp C++ Parquet library. I’ll also show parts of two utilities: make-parquet and tabulate-parquet. The former creates Parquet formatted files out of CSV or fixed-width formatted data and the latter reads and tabulates data from Parquet files.

The Arrow and Parquet API

The parquet-cpp library has a low-level API, which is what I used to build “tabulate-pq” and “make-parquet”. There’s a higher level API that could be used to write a tool similar to “tabulate-pq”, and it includes support for the Arrow in-memory data storage library.

In the rest of this post I will show how to use both C++ APIs with some example code.

Among other things Arrow makes dealing with mixed column types dynamically in C++ easier and it allows passing data around without excessive copying, saving memory and time. Arrow defines a type of in-memory table built with an Arrow Schema and columns of data.

The Arrow and Parquet low-level and high-level APIs are defined in the http://github.com/apache/parquet-cpp library. For documentation of the API see the /tools/ and /examples directories. There are two programs in the examples, one demonstrating use of Arrow and Parquet together and a similar example program implemented with the low-level Parquet API.

Low-level interface to Parquet

When I began writing C++ tools to handle Parquet formatted data the low-level API was the only interface to the library, so that’s what I used to make “make-parquet.”

The parquet-cpp library gives you these types:

  • parquet types to group together into a schema
  • parquet::FileReader
  • parquet::FileWriter
  • parquet::RowGroupReader
  • parquet::RowGroup?Writer
  • parquet::ColumnReader
  • parquet::ColumnWriter

You call ColumnReader::ReadBatch() and ColumnWriter::WriteBatch() to actually move data in and out of Parquet files; compression and buffering get handled by the library.

Detailed working code examples and build instructions follow.

Once you’ve extracted data from a data source, say a CSV or hierarchical fixed width text file, The core of the “make-parquet” program looks like:


/* 
Handles int32 and string types; you could extend to handle floats and larger ints. by
 adding and handling additional types of buffers.

The new high level interface supports a varient type that would allow you to pass all data
buffers as a single argument; that's less memory efficient than what I do here but not by much.

To avoid one argument per data type, while using the low-level interface, you could defer  conversion 
from raw string  data until right before sending to  WriteBatch(). However  this means that buffering 
a single untyped, (represented by strings) optimaly sized row group in RAM requires much more space; 
perhaps four to five times  as much.  You'd soon run out of memory before running out of CPU cores on 
most systems. if you ran this utility in batches; with good row group sizes you could easily exceed 5, 
10, or even 20 GB.

The perfect solution in terms of RAM would be to know in advance exactly how many row groups you will
consume and their sizes, removing the need to buffer at all; but that would necessitate scanning the
input data in advance to compute row group sizes,  which is time- consuming on its own. This is all a
result of needing to set the row group size before writing to the row group.
*/

static void write_to_parquet(
	const std::map<int, std::vector<std::string>> & string_buffers,
	const std::map<int, std::vector<int32_t>> & int_buffers,
	std::shared_ptr<parquet::ParquetFileWriter> file_writer,
	int row_group_size,
	const std::vector<VarPtr> & output_schema){

  // Create a row group in the parquet file, The row group size should be rather large
  // for good performance, so that row_group_size * columns == 1GB
    parquet::RowGroupWriter* rg_writer =
        file_writer->AppendRowGroup(row_group_size);

// Need to loop through columns in order; order of the output_schema matters therefore.
	for (int  col_num=0;col_num<output_schema.size();col_num++){
		// Grab the description of a column
		auto var = output_schema[col_num];

		// Figure out the type of data and where to get it from
		if (var->type ==parquet_type::_int ){
			auto column_writer=
        		static_cast<parquet::Int32Writer*>(rg_writer->NextColumn());
	        	auto & data_to_write = int_buffers.at(col_num);
			column_writer->WriteBatch(
				data_to_write.size(), nullptr, nullptr, data_to_write.data());
		}else if (var->type == parquet_type::_string){
			// This is how UTF-8 strings are handled at the low level
			auto & data_to_write =string_buffers.at(col_num);
			auto column_writer = 
				static_cast<parquet::ByteArrayWriter*>(rg_writer->NextColumn());
			for(const std::string & item:data_to_write){
				parquet::ByteArray value;
				int16_t definition_level = 1;
				value.ptr = reinterpret_cast<const uint8_t*>(item.c_str());
				value.len = var->width;
				column_writer->WriteBatch(1, &definition_level, nullptr, &value);
			}
		}else{
			cerr << "Type " << var->type_name << " not supported." << endl;
			exit(1);
		}
	}
}

Arrow

First, a quick over-view of important types from the Arrow library, then some eexample code will follow.

Arrow provides the following types to make sstoring columnar data in memory and reading and writing Parquet format more convenient and fast:

  • arrow::Array
  • arrow::Schema
  • arrow::Field
  • arrow::Table

For moving to and from Parquet:

  • arrow::io::ReadableFile
  • parquet::arrow::FileReader
  • arrow::io::FileOutputStream
  • parquet::arrow::WriteTable

Arrow features data structures called Array that hold columns of same-type data, filled by a “builder” of a given type:

	arrow::Int64Builder i64builder;
	i64builder.Append({1, 2, 3, 4});

	std::shared_ptr<arrow::Array> i64array;
	i64builder.Finish(&i64array);

	arrow::StringBuilder strbuilder;
	strbuilder.append("one");
	strbuilder.append("two");
	strbuilder.append("three");
	strbuilder.append("four");

	std::shared_ptr<arrow::Array> strarray;
	strbuilder.finish(&strarray);

You make Arrow “tables” by combining an Arrow schema object with Arrow data:

	// Make the schema by supplying fields, each field needs a name and type:
	auto schema = arrow::schema(
	      {arrow::field("int", arrow::int64()),
	      arrow::field("str", arrow::utf8())});
	/*
	 The data table needs the schema and the data. The data is from 
	 arrow::Array objects, and they must be in the same order as the
	 fields in the schema.
	 */
	auto data_table = arrow::Table::Make(schema, {i64array, strarray});

You can save Arrow tables into Parquet files (see the example programs with parquet-cpp.)

To read selected columns into Arrow Arrays, an ability crucial for a tool like “tabulate-pq”, you can use the Arrow wrapping of the Parquet API:

	std::shared_ptr<arrow::io::ReadableFile> infile;
	arrow::io::ReadableFile::Open(
	      "parquet-arrow-example.parquet", arrow::default_memory_pool(), &infile);

	 // Attach an  arrow::io::ReadableFile to a Parquet file reader:
	std::unique_ptr<parquet::arrow::FileReader> reader;
	parquet::arrow::OpenFile(infile, arrow::default_memory_pool(), &reader);

	// Here we can make an Arrow Array object without knowing
	// the type of data it will hold.
	std::shared_ptr<arrow::Array> array;

	// Read the zeroth column; you read columns by position in the schema.
	reader->ReadColumn(0, &array);

	  arrow::PrettyPrint(*array, 4, &std::cout);

The concept of row groups is important; if you’re memory constrained you may need to read in one row group worth of a column at a time (these are known as column chuncks.) This way you can read in part of a column, deal with the data by performing some reduce operation and dispose of the memory before moving on to the next row group.

/*
The setup goes exactly as before but we call the RowGroup() method, passing in the row group number.
You can chain the return of RowGroup() and call Column(), passing in the column number you want,
 and call the Read() method on the returned Column object.
 */
	// (just for instance)
	int column_number = 8;
	int row_group_number = 3;

	reader->RowGroup(row_group_number)->Column(column_number)->Read(&array);

Use the arrow::Table class to read in columns in one call (and in parallel threads as an option.)


	// Assuming we have set up the reader as in previous snippets...
	reader->set_num_threads(4);

	std::shared_ptr<arrow::Table> table;

	// Read a selected set of columns into the table
	// assuming we know what those columns are...
	auto selected_columns = {5,9,23,28};
	reader->ReadTable(selected_columns, &table);

	// Now the table has data for the selected columns
	for(auto c=0;c<selected_columns.size();c++){
		auto this_column = table->column(c);
		std::cout << "name: " << this_column->name() << "\n";

	}

The core of a very simple memory efficient tabulator:

*/
Suppose we can determine the column indices to
 extract by matching column names in the schema
 with their positions....
 */
	vector<int> columns_to_tabulate = get_from_schema("{"PERWT",AGE","MARST"});

	// Can extract columns in parallel. This won't help with bottlenecked I/O  but
	// CPU time gets spent on decompressing column data which can run in parallel.
	reader->set_num_threads(4);

	int rg = reader->num_row_groups();

	// You can read into a table just a group of rows
	// rather than the entire Parquet file...
	for(int group_num=0;group_num<rg;group_num++){
		std::shared_ptr<arrow::Table> table;
		
		//  Fill the table with data from a single row group:
		reader->ReadRowGroup(group_num, columns_to_tabulate, &table);
		
		// Find out how many rows were in the row group:
		auto rows = table->num_rows();

		// A raw version of the data in the Arrow table
		//  ordered by column in the arrow schema
		vector<const int32_t*> raw_data;


		// We pull out a list of pointers to raw data so that
		// we can produce an arbitrary length record as long as
		// the data type is known up-front. This way we can support
		// tabulating one, two, three, four or even more columns.		
		for(int c=0;c<columns_from_schema.size();c++){
			auto column_data =  std::static_pointer_cast<arrow::Int32Array>(
				table->column(c)->data()->chunk(0));
			raw_data.push_back(column_data->raw_values());
		}

		// There is an experimental API in the works to automate conversion
		// from columnar to record layouts, but for now it's manual ...
		 for(int row_num=0;row_num<rows;row_num++){
		 	vector<int32_t> record;
		 	for (int c=0;c<columns_to_tabulate.size();c++){
		 		auto datum = raw_data[c][row_num];
		 		record.push_back(datum);
		 	}
		 	// First column is assumed to be weight, the rest get crossed
		 	// with each other and the counts weighted.
		 	add_to_tabulation(record);
		}

	}

Building and Using the Parquet and Arrow Libraries

The Parquet libraries use CMake to build and in general follow standard practice. I’ll just briefly run through the easy path to building the Parquet libraries. Then I give a bit of a C++ build and link refresher for those who aren’t sure what to do next once they have built the libraries.

First you need to have access to some required libraries. Download the source:

git clone http://github.com/apache/parquet-cpp parquet-cpp-dir

Builds require CMake 3.2 or later; Ubuntu 14.04 ships with 2.8 and you will need to build from source. Additionally a version of curl supporting https is needed which you may need to build from source on older Linux distributions before installing CMake. Also gcc 4.8 or later is required, which ships with Ubuntu 14.04 and higher.

I mention Ubuntu only because it’s what I use and is probably most common; builds should work on nearly any distribution. You can even build on Mac OS without difficulty with the assistance of Home Brew. My development has mostly taken place on a Ubuntu 16.04 distribution running on WSL (Windows 10.)

Next install some Boost libraries and standard UNIX build tools if you don’t have them. On Ubuntu assuming you have the rights:

sudo apt-get install libboost-dev libboost-filesystem-dev \
                     libboost-program-options-dev libboost-regex-dev \
                     libboost-system-dev libboost-test-dev \
                     libssl-dev libtool bison flex pkg-config

On MacOS you can simply install all of Boost, after ensuring you have XCode 6 or later.

brew install boost

Build Parquet Locally

You simply create the make file with

cd parquet-cpp-dir
cmake .

Then make everything; libraries and example programs will get placed in parquet-cpp-dir/build/latest.

make

Of course deviating from the happy path is where things get tricky, see the README and the CMakeLists.txt for some help. This build will get you access to binary libraries and let you play with example code in parquet-cpp-dir/examples.

Among other things, the -DPARQUET_LINKAGE_TYPE=static build type has never worked for me even though the notes in CMakeLists.txt indicate it should.

In general, to statically link – which is not the default – you will need to build Boost from source with -fPIC on; the version you get from ‘apt-get’ on Ubuntu is not compiled with this flag and will not work with static builds.

Build Arrow and Parquet Separately and Install

To make life easier down the road you may want to install libraries in the standard locations /usr/local/… or some other location on your system to which you can point PARQUET_HOME and ARROW_HOME environment variables. The parquet-cpp build scripts for the example programs will pick up on these and use them and they will make your builds that use the libraries simpler.

Build Arrow on its own:

git clone http://github.com/apache/arrow arrow-dir
cd arrow-dir/cpp
cmake . -DCMAKE_INSTALL_PREFIX=$ARROW_HOME
make install

Then build Parquet, having set the ARROW_HOME environment variable, so that build uses this version of Arrow:

git clone http://github.com/apache/parquet-cpp parquet-cpp-dir
cd parquet-cpp-dir
cmake . -DCMAKE_INSTALL_PREFIX=$PARQUET_HOME
make install

Using the Libraries with CMake Builds

Check the parquet-cpp-dir/examples/parquet-arrow/ directory for a sample CMake project that incorporates Arrow and Parquet libraries in a C++ application. To build the example:

cd parquet-cpp-dir/examples/parquet-arrow
cmake .
make

If you have ARROW_HOME and PARQUET_HOME defined and pointing to compatible libraries the build will go smoothly. If you don’t define the locations of installed libraries you have to build Parquet locally first and then the build will use the local versions of both (parquet-cpp pulls down and builds a local version of Arrow.)

Remember that if you distribute your binary the locations of the Parquet libraries will need to be the same as with your PARQUET_HOME and ARROW_HOME locations if they aren’t going to be installed in standard locations on target systems.

Simple Builds with Make

In the easy case where you installed to somewhere in the search path

example:
	c++ example.cpp -o example -lparquet -larrow

If you’ve built libraries somewhere else, perhaps to test out the latest version available without disrupting the installed versions you would do something like:

PARQUET_HOME:= /home/me/dev/parquet_lib
ARROW_HOME:= /home/me/dev/arrow_lib


example: 
	c++ -std=c++1y example.cpp -O3 \
	-L${ARROW_HOME}/lib -L${PARQUET_HOME}/lib \
	-I${ARROW_HOME}/include -I${PARQUET_HOME}/include \
	-o rw  -lparquet -larrow \
	-Wl,-rpath,${ARROW_HOME}/lib -Wl,-rpath,${PARQUET_HOME}/lib
	

To make a non-statically linked binary for distribution you could distribute the libraries with your own binary and set the LD_RUN_PATH (or RPATH on the gcc command line) to point at the relative path to the deployed libraries. In this example files from $PARQUET_HOME/lib and $ARROW_HOME/lib are expected to be findable in a relative path to the binary ($ORIGIN/lib.)

Set the run path then build:

export LD_RUN_PATH='$ORIGIN'/lib
make example

and the Makefile would look like:

PARQUET_HOME:= /home/me/dev/parquet_lib
ARROW_HOME:= /home/me/dev/arrow_lib


example: 
	c++ -std=c++1y example.cpp -O3 \
	-L${ARROW_HOME}/lib -L${PARQUET_HOME}/lib \
	-I${ARROW_HOME}/include -I${PARQUET_HOME}/include \
	-o rw  -lparquet -larrow 	

Before running, ensure wherever you place the binary you have a ./lib/ directory locally with all the needed libraries which do not otherwise reside on the standard library search path.

Finally don’t forget you can use ‘ldd’ to verify what libraries your own library or binary links to.