The Parquet columnar data format typically has columns of simple types: int32, int64, string and a few others. However, columns can have logical types of “List”, “Map” as well, and their members may be more “List” or “Map” structures or primitive types.
Not all tools that read Parquet can handle these complex nested column types. Also, exploring a schema with these types can be a bit difficult as the nesting properties can be represented poorly. I’ll go through a few things to try to better understand a schema and then at least one approach to convert this kind of Parquet into a simpler format.
Here’s a snippet of the kind of Parquet schema I’m talking about, formatted by Data Fusion:
id: int32 not null
ds46_gv6: string not null
ds46_gv2: string not null
...
ds46_dt1: map<string, decimal128(9, 0)> not null
child 0, entries: struct<key: string not null, value: decimal128(9, 0)> not null
child 0, key: string not null
child 1, value: decimal128(9, 0)
ds46_dt2: map<string, decimal128(9, 0)> not null
child 0, entries: struct<key: string not null, value: decimal128(9, 0)> not null
child 0, key: string not null
child 1, value: decimal128(9, 0)
ds46_dt3: map<string, decimal128(9, 0)> not null
child 0, entries: struct<key: string not null, value: decimal128(9, 0)> not null
child 0, key: string not null
child 1, value: decimal128(9, 0)
Notice the nice indentation. It shows the names of the columns (no indentation) and the data types for the keys and values in the columns with “Map” type data. This is part of the schema for some IPUMS NHGIS Parquet data created by Spark. Spark doesn’t automatically put data into maps,but it’s possible, and was done here for Spark read performance reasons.
A problem (among several) with the above structure is that you don’t know the names of the keys in the “Map” type columns. So, it’s not obvious how to construct a query to extract just some of the data from a “Map” type column. You can get the key names by first selecting data from a map column and selecting all the keys, but that’s cumbersome.
Why?
Why would you want to simplify Parquet with list or map type columns? Does it even make sense to do so?
- Speed up queries: You might wish to eliminate map or list data from your Parquet because it slows down large queries. Extracting an entire series full of a particular key’s values isn’t especially fast.
- More complex to extract: It’s also just more complicated to construct queries or programmatically extract data from “Map” columns.
- Consistency: Different tools will provide different ways to get at the key-value data or list items which can be confusing.
- Usability: Finally, your tool of choice may not support these nested types well or at all, and you require a different format to do anything with the data.
It makes sense to simplify nested columns when they have the same number of members and the same keys (if “Map” type) on all rows. In these cases you probably never needed “Map” type columns from a logical point of view.
On the other hand, if your data has varying list lengths or different keys or key counts from row to row, you can’t do much to “flatten” the data easily (perhaps fill out shorter entries with null values?) It depends. These are the cases for which “List” and “Map” types were intended.
In cases where the “Map” or “List” data has the same shape throughout the Parquet data, the only reason to nest the data is for performance in Spark. Combining data into “Map” type columns reduces the overall column count in the schema. This used to be (maybe still is?) a solution for Spark’s tendancy to slow way down on wide data, something like 1000+ columns. This happened even when selecting just one column, which should not be, given Parquet’s design – other tools have no performance issues with super wide Parquet files. Using vector or map types was a common suggestion as a workaround in Spark, so you may encounter data with this structure when it really doesn’t need it anymore.
Viewing the schema as machine-readable data
Two tools I found to work well were DuckDB and DataFusion. Using PyArrow
or Polars
directly to read the schema failed for me, though PyArrow
can certainly represent these complex types. I’m sure there are other approaches – Spark, for one should do fine for getting at a schema. I know Polars will support more column types in the future. Polars uses Arrow2 which should support “Map” and “List” columns according to the documentation.
Here I’m primarily concerned with getting a nice, machine readable representation of the schema with already built tools. For just learning about a new-to-you schema, try the VisiData tool to interactively explore a file, at least if it’s not too large.
I used the Python libraries for DuckDB and DataFusion. You can do basically the same operations from the command line, or Polars in Rust, respectively.
The combination of DataFusion with PyArrow gives the easiest to interpret results when formatting and printing. Be sure to “pip install” datafusion
and pyarrow
first.
from datafusion import SessionContext
ctx = SessionContext()
ctx.register_parquet("example","ge_00.parquet")
# This is getting a DataFusion data frame of the query result
df = ctx.sql("select * from example limit 1")
df.schema()
Printing the schema out as a string produces a nicely indented layout, as I showed at the top of this post. To programmatically read the schema, index over the .types
and .names
attributes, or index using the field(index)
method or field_by_name()
method.
s = df.schema()
for name in s.names:
field = s.field_by_name(name)
print(f"{name}: {field.type}")
Then you can handle all your column type dependent logic in Python or Rust.
DuckDB
gives similar information, but doesn’t structure the output as nicely. It returns the schema as a result set, just like other queries. It’s not obvious how to deal with deeply nested data where you have maps within maps. Actually extracting poses no difficulty but programmatically reading that structure from the schema table could be difficult. Fortunately for me, the problem I had last week pertained to a structure one level deep.
select name, logical_type, converted_type from parquet_schema('file.parquet');
You work with the results as you’d expect to with a database.
D select name,converted_type,num_children from parquet_schema('data.parquet/part-00000-df83d567-687a-467a-9f42-9acc1ad73a09.snapp
┌───────────────────┬────────────────┬──────────────┐
│ name │ converted_type │ num_children │
│ varchar │ varchar │ int64 │
├───────────────────┼────────────────┼──────────────┤
│ spark_schema │ │ 33 │
│ id │ │ │
│ ds46_gv6 │ UTF8 │ │
│ ds46_gv2 │ UTF8 │ │
│ ds46_gv5 │ UTF8 │ │
│ ds46_gv3 │ UTF8 │ │
│ ds46_gv1 │ UTF8 │
│ · │ · │ · │
│ · │ · │ · │
│ · │ · │ · │
│ ds46_dt4 │ MAP │ 1 │
│ key_value │ │ 2 │
│ key │ UTF8 │ │
│ value │ DECIMAL │ │
│ ds46_dt5 │ MAP │ 1 │
│ key_value │ │ 2 │
│ key │ UTF8 │ │
│ value │ DECIMAL │ │
│ ds46_dt6 │ MAP │ 1 │
│ key_value │ │ 2 │
│ key │ UTF8 │ │
│ value │ DECIMAL │ │
├───────────────────┴────────────────┴──────────────┤
One other complication with DuckDB: You use the parquet_schema()
function to read a Parquet schema as a table, but when dealing with multi-part Parquet it returns the union of all columns,. So, if you know a whole directory has data using the same schema, just pick one and read it, like I did in this example. One might expect it to validate that all files have the same schema and if so, return it or error out, but it doesn’t do that
. When using select
to get data, it scans all files and correctly selects rows. You can select from a logical Parquet file with a glob “/*.parquet”. However, the parquet_schema()
function doesn’t work the same way with globbing.
Converting the schema and flattening the data
I found DuckDB
to be the easiest tool to work with the schema in SQL, and extract and flatten the data out. This is done mostly with SQL, making the solution really easy to understand and pretty portable. While you need some programming language to make manipulating the result sets easier, it’s a very thin layer on top of the database. This is great for future-proofing and documentation. Less technical users who know SQL can understand it.
The Data Fusion approach is probably more powerful though, and not too hard either. In the future I’d make my decision based on which tool performs the best on large data, has the fewest bugs, and the best performance. Right now the two seem pretty even. The biggest problem I ran into was when a logical Parquet file is broken into thousands of parts. Querying these with DuckDB beyond about 5,000 files requires more memory than it ought to.
Step 1: Discover the schema
def get_map_type_columns(parquet_file: Path) -> list[str]:
con = duckdb.connect()
con.execute(
f"select name, converted_type from parquet_schema('{str(parquet_file)}') where converted_type = 'MAP' and name not in ('key','value','keys','values')"
)
results = con.fetchall()
con.close()
print(f"There are {len(results)} MAP type columns...")
return [r[0] for r in results]
Remember, you could do the same from the DuckDB REPL tool or other languages.
Step 2: Generate new schema with a complex ‘select’ and aliases
Now having a list of the map columns, get their keys and map each one to an alias to use in a future select
statement.
For each “Map” type column discovered, query for its keys like this:
select map_keys(column_name) from 'file.parquet' limit 1;
This assumes all rows have the same set of keys for the map column. It is very fast. If you don’t know, try something like:
select distinct(map_keys(column_name)) from 'file.parquet';
Keep in mind this will possibly run a long time on a big file. If the logical parquet file is broken into many parts, you must have 'file.parquet/*.parquet'
in the from
clause; in those cases be ready to wait a long time to scan every file.
After collecting the keys, make aliases to add to a select
statement that will look like:
select
map_col1['key1'][1] as col1_key1,
map_col1['key2'][1] as map_col1_key2,
map_col2['key3'][1] as map_col1_key3,
map_col2['key1'][1] as map_col2_key1,
...
from 'file.parquet/*.parquet'
Use index 1, not 0 to extract the value.
Here’s how I generated the select
statement with Python:
def flatten_map_columns(parquet_file: Path) -> list[str]:
con = duckdb.connect()
map_type_columns = get_map_type_columns(parquet_file)
flattened_columns = []
for c in map_type_columns:
names_query = f"select map_keys({c}) from '{parquet_file}' limit 1"
con.execute(names_query)
results = con.fetchall()
aliases = []
for k in results:
for key_names in k:
for name in key_names:
alias = c + "_" + name
select_clause = f"{c}['{name}'][1]"
aliases.append(select_clause + " as " + alias)
if len(aliases) > 0:
flattened_columns.extend(aliases)
print(
f"Retrieved {len(map_type_columns)} and converted to {len(flattened_columns)} flattened columns."
)
con.close()
return flattened_columns
Notice the deep nesting in the loop to extract the results from the map_keys()
query.
Step 3: Query the data to get a result set
With a select
query prepared, issue it and get data back with the column names following the aliases constructed by flatten_map_columns()
.
This is a simplified version of my Python function.
import duckdb
import pyarrow.parquet as pq
def flatten_data(input_parquet_file: Path, output_parquet_file: Path):
# Make sure the output destination exists or can be created.
output_path = Path(os.path.dirname(output_parquet_file))
output_path.mkdir(parents=True, exist_ok=True)
# The NHGIS parquet was made by Spark and is all multi-file parquet in
# subdirectories. Any file will do as a representative sample. There's
# no file with just the schema.
input_parquet_file = input_parquet_file / "*.parquet"
one_part = get_representative_parquet_part(input_parquet_file)
flattened_columns = flatten_map_columns(one_part)
query = (
f"select "
+ ",\n".join(flattened_columns)
+ f" from '{input_parquet_file}'"
)
con = duckdb.connect()
new_data = (
con.execute(query)
.arrow()
.replace_schema_metadata({"version_data_format": "2"})
)
pq.write_table(new_data, output_parquet_file)
con.close()
Step 4: Export query results as Parquet
Notice at the end of the last step’s code where I return the DuckDB results as Arrow and replace the file metadata key ‘version_data_format’. This is so we can later check our Parquet for “Version 2” data format from our extract system. If the extraction software doesn’t see the “version_data_format” the system knows to use the old Spark-based extraction method. Soon enough all our files will have this new format and we can ditch Spark.
I used pyarrow.parquet
to write the Arrow data into one physical Parquet file.
Rather than returning as Arrow and saving with PyArrow, I could have saved using Polars with .pl().write_parquet()
.
Other tools to try
DataFusion might be just as easy for someone familiar with it: You’d issue a query like select * from parquet_file
using a DataFusion context.sql()
. Then query the schema from the returned data frame as I showed before. Choose the “Map” type columns and generate the select query to unfold the data similar to the DuckDB solution.
Alternatively, you could query the data, get a lazy data frame and replace the map type columns one by one using .with_column()
for every key to add all the unfolded key-values and drop the old columns. It might prove tricky to do this in a streaming or lazy way which very large files could necessitate.
You might try combining Polars with DataFusion or DuckDB, using each tool’s strengths, since they all speak Arrow.
I’ve only mentioned the most accessible, open source tools that I found to work well. For really large data, Spark is going to be a good choice even though it’s more trouble to set up for more than a local instance. (Which you need to tackle really big data.) If you only need a local Spark instance to process your data, you probably don’t really need Spark in the first place.
Notes for when the preceeding advice doesn’t work
Out of thousands of logical Parquet files with “Map” type columns, only one gave me trouble. It took 10GB on disk but was split into 10,200 files. Other larger Parquet files easily converted but this one ate up RAM. It consumed about 500GB at the peak. Fortunately I had access to a server with 1TB of RAM so I used it.
Finally I concluded that DuckDB has some overhead for each partial file; it must check each one for the schema and open every file. These particular files have hundreds of “Map” columns with thousands of total keys so each schema on each file is quite large. In addition, this file uncompresses to a much larger size. Once the file was totally in memory it took just a few minutes to save into the simpler, flattened structure.
If you have a lot of data like this I suggest to convert each file by itself, then consolidate into one afterward. Or, use DataFusion to read the whole thing in with a lazy data frame and save as one Parquet file, then convert to a flattened structure afterward. If you still run out of RAM – if the file was hundreds of gigabytes in total – the best solutions are:
- Use Spark on a Spark cluster. Try to control the number of parts the file gets saved into.
- Write a custom Rust or Go program to read record by record and convert into one file. You don’t have to read everything into memory or store the whole output in a buffer, you can do it by row groups. I’d avoid C++ even though the Arrow and Parquet libraries are written in C++; it’s a pain.