timsaucer commented on code in PR #1123:
URL:
https://github.com/apache/datafusion-python/pull/1123#discussion_r2145455758
##########
python/datafusion/dataframe.py:
##########
@@ -704,38 +694,135 @@ def write_csv(self, path: str | pathlib.Path,
with_header: bool = False) -> None
def write_parquet(
self,
path: str | pathlib.Path,
- compression: Union[str, Compression] = Compression.ZSTD,
- compression_level: int | None = None,
+ data_pagesize_limit: int = 1024 * 1024,
+ write_batch_size: int = 1024,
+ writer_version: str = "1.0",
+ skip_arrow_metadata: bool = False,
+ compression: Optional[str] = "zstd(3)",
+ dictionary_enabled: Optional[bool] = True,
+ dictionary_page_size_limit: int = 1024 * 1024,
+ statistics_enabled: Optional[str] = "page",
+ max_row_group_size: int = 1024 * 1024,
+ created_by: str = "datafusion-python",
+ column_index_truncate_length: Optional[int] = 64,
+ statistics_truncate_length: Optional[int] = None,
+ data_page_row_count_limit: int = 20_000,
+ encoding: Optional[str] = None,
+ bloom_filter_on_write: bool = False,
+ bloom_filter_fpp: Optional[float] = None,
+ bloom_filter_ndv: Optional[int] = None,
+ allow_single_file_parallelism: bool = True,
+ maximum_parallel_row_group_writers: int = 1,
+ maximum_buffered_record_batches_per_stream: int = 2,
+ column_specific_options: Optional[dict[str, ParquetColumnOptions]] =
None,
Review Comment:
I think the full parquet writer options are going to be less commonly used
for most users. I would suggest we want to have two different versions of the
`write_parquet` signature, one that is the simple existing compression
parameters and on that looks like
```
def write_parquet(self, path: str | pathlib.Path, options:
ParquetWriterOptions)
```
And then we have a `ParquetWriterOptions` class that has this large
initialize options.
##########
python/tests/test_dataframe.py:
##########
@@ -1560,40 +1588,323 @@ def test_write_compressed_parquet(df, tmp_path,
compression, compression_level):
@pytest.mark.parametrize(
- ("compression", "compression_level"),
- [("gzip", 12), ("brotli", 15), ("zstd", 23), ("wrong", 12)],
+ "compression",
+ ["gzip(12)", "brotli(15)", "zstd(23)"],
)
-def test_write_compressed_parquet_wrong_compression_level(
- df, tmp_path, compression, compression_level
-):
+def test_write_compressed_parquet_wrong_compression_level(df, tmp_path,
compression):
path = tmp_path
- with pytest.raises(ValueError):
- df.write_parquet(
- str(path),
- compression=compression,
- compression_level=compression_level,
- )
+ with pytest.raises(Exception, match=r"valid compression range .*?
exceeded."):
+ df.write_parquet(str(path), compression=compression)
[email protected]("compression", ["wrong"])
[email protected]("compression", ["wrong", "wrong(12)"])
def test_write_compressed_parquet_invalid_compression(df, tmp_path,
compression):
path = tmp_path
- with pytest.raises(ValueError):
+ with pytest.raises(Exception, match="Unknown or unsupported parquet
compression"):
df.write_parquet(str(path), compression=compression)
-# not testing lzo because it it not implemented yet
-# https://github.com/apache/arrow-rs/issues/6970
[email protected]("compression", ["zstd", "brotli", "gzip"])
-def test_write_compressed_parquet_default_compression_level(df, tmp_path,
compression):
- # Test write_parquet with zstd, brotli, gzip default compression level,
- # ie don't specify compression level
- # should complete without error
- path = tmp_path
[email protected](
+ ("writer_version", "format_version"),
+ [("1.0", "1.0"), ("2.0", "2.6"), (None, "1.0")],
+)
+def test_write_parquet_writer_version(df, tmp_path, writer_version,
format_version):
+ """Test the Parquet writer version. Note that writer_version=2.0 results in
+ format_version=2.6"""
+ if writer_version is None:
+ df.write_parquet(tmp_path)
+ else:
+ df.write_parquet(tmp_path, writer_version=writer_version)
- df.write_parquet(str(path), compression=compression)
+ for file in tmp_path.rglob("*.parquet"):
+ parquet = pq.ParquetFile(file)
+ metadata = parquet.metadata.to_dict()
+ assert metadata["format_version"] == format_version
+
+
[email protected]("writer_version", ["1.2.3", "custom-version", "0"])
+def test_write_parquet_wrong_writer_version(df, tmp_path, writer_version):
+ """Test that invalid writer versions in Parquet throw an exception."""
+ with pytest.raises(
+ Exception, match="Unknown or unsupported parquet writer version"
+ ):
+ df.write_parquet(tmp_path, writer_version=writer_version)
+
+
[email protected]("dictionary_enabled", [True, False, None])
+def test_write_parquet_dictionary_enabled(df, tmp_path, dictionary_enabled):
+ """Test enabling/disabling the dictionaries in Parquet."""
+ df.write_parquet(tmp_path, dictionary_enabled=dictionary_enabled)
+ # by default, the dictionary is enabled, so None results in True
+ result = dictionary_enabled if dictionary_enabled is not None else True
+
+ for file in tmp_path.rglob("*.parquet"):
+ parquet = pq.ParquetFile(file)
+ metadata = parquet.metadata.to_dict()
+
+ for row_group in metadata["row_groups"]:
+ for col in row_group["columns"]:
+ assert col["has_dictionary_page"] == result
+
+
[email protected](
+ ("statistics_enabled", "has_statistics"),
+ [("page", True), ("chunk", True), ("none", False), (None, True)],
+)
+def test_write_parquet_statistics_enabled(
+ df, tmp_path, statistics_enabled, has_statistics
+):
+ """Test configuring the statistics in Parquet. In pyarrow we can only
check for
+ column-level statistics, so "page" and "chunk" are tested in the same
way."""
+ df.write_parquet(tmp_path, statistics_enabled=statistics_enabled)
+
+ for file in tmp_path.rglob("*.parquet"):
+ parquet = pq.ParquetFile(file)
+ metadata = parquet.metadata.to_dict()
+
+ for row_group in metadata["row_groups"]:
+ for col in row_group["columns"]:
+ if has_statistics:
+ assert col["statistics"] is not None
+ else:
+ assert col["statistics"] is None
+
+
[email protected]("max_row_group_size", [1000, 5000, 10000, 100000])
+def test_write_parquet_max_row_group_size(large_df, tmp_path,
max_row_group_size):
+ """Test configuring the max number of rows per group in Parquet. These
test cases
+ guarantee that the number of rows for each row group is
max_row_group_size, given
+ the total number of rows is a multiple of max_row_group_size."""
+ large_df.write_parquet(tmp_path, max_row_group_size=max_row_group_size)
+
+ for file in tmp_path.rglob("*.parquet"):
+ parquet = pq.ParquetFile(file)
+ metadata = parquet.metadata.to_dict()
+ for row_group in metadata["row_groups"]:
+ assert row_group["num_rows"] == max_row_group_size
+
+
[email protected]("created_by", ["datafusion", "datafusion-python",
"custom"])
+def test_write_parquet_created_by(df, tmp_path, created_by):
+ """Test configuring the created by metadata in Parquet."""
+ df.write_parquet(tmp_path, created_by=created_by)
+
+ for file in tmp_path.rglob("*.parquet"):
+ parquet = pq.ParquetFile(file)
+ metadata = parquet.metadata.to_dict()
+ assert metadata["created_by"] == created_by
+
+
[email protected]("statistics_truncate_length", [5, 25, 50])
+def test_write_parquet_statistics_truncate_length(
+ df, tmp_path, statistics_truncate_length
+):
+ """Test configuring the truncate limit in Parquet's row-group-level
statistics."""
+ ctx = SessionContext()
+ data = {
+ "a": [
+ "a_the_quick_brown_fox_jumps_over_the_lazy_dog",
+ "m_the_quick_brown_fox_jumps_over_the_lazy_dog",
+ "z_the_quick_brown_fox_jumps_over_the_lazy_dog",
+ ],
+ "b": ["a_smaller", "m_smaller", "z_smaller"],
+ }
+ df = ctx.from_arrow(pa.record_batch(data))
+ df.write_parquet(tmp_path,
statistics_truncate_length=statistics_truncate_length)
+
+ for file in tmp_path.rglob("*.parquet"):
+ parquet = pq.ParquetFile(file)
+ metadata = parquet.metadata.to_dict()
+
+ for row_group in metadata["row_groups"]:
+ for col in row_group["columns"]:
+ statistics = col["statistics"]
+ assert len(statistics["min"]) <= statistics_truncate_length
+ assert len(statistics["max"]) <= statistics_truncate_length
+
+
+def test_write_parquet_default_encoding(tmp_path):
+ """Test that, by default, Parquet files are written with dictionary
encoding.
+ Note that dictionary encoding is not used for boolean values, so it is not
tested
+ here."""
+ ctx = SessionContext()
+ data = {
+ "a": [1, 2, 3],
+ "b": ["1", "2", "3"],
+ "c": [1.01, 2.02, 3.03],
+ }
+ df = ctx.from_arrow(pa.record_batch(data))
+ df.write_parquet(tmp_path)
+
+ for file in tmp_path.rglob("*.parquet"):
+ parquet = pq.ParquetFile(file)
+ metadata = parquet.metadata.to_dict()
+
+ for row_group in metadata["row_groups"]:
+ for col in row_group["columns"]:
+ assert col["encodings"] == ("PLAIN", "RLE", "RLE_DICTIONARY")
+
+
[email protected](
+ ("encoding", "data_types", "result"),
+ [
+ ("plain", ["int", "float", "str", "bool"], ("PLAIN", "RLE")),
+ ("rle", ["bool"], ("RLE",)),
+ ("delta_binary_packed", ["int"], ("RLE", "DELTA_BINARY_PACKED")),
+ ("delta_length_byte_array", ["str"], ("RLE",
"DELTA_LENGTH_BYTE_ARRAY")),
+ ("delta_byte_array", ["str"], ("RLE", "DELTA_BYTE_ARRAY")),
+ ("byte_stream_split", ["int", "float"], ("RLE", "BYTE_STREAM_SPLIT")),
+ ],
+)
+def test_write_parquet_encoding(tmp_path, encoding, data_types, result):
+ """Test different encodings in Parquet in their respective support column
types."""
+ ctx = SessionContext()
+
+ data = {}
+ for data_type in data_types:
+ match data_type:
+ case "int":
+ data["int"] = [1, 2, 3]
+ case "float":
+ data["float"] = [1.01, 2.02, 3.03]
+ case "str":
+ data["str"] = ["a", "b", "c"]
+ case "bool":
+ data["bool"] = [True, False, True]
Review Comment:
This code failed for me because our minimum supported python is 3.9 and
match was introduced in 3.10. 3.9 doesn't reach end of life until Oct 2025.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]