This is an automated email from the ASF dual-hosted git repository.
fokko pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-python.git
The following commit(s) were added to refs/heads/main by this push:
new 9e4ed29 Use `write.parquet.compression-{codec,level}` (#358)
9e4ed29 is described below
commit 9e4ed296de7448e036dedc227b71dc51214fb679
Author: Jonas Haag <[email protected]>
AuthorDate: Mon Feb 5 10:40:43 2024 +0100
Use `write.parquet.compression-{codec,level}` (#358)
* Use `write.parquet.compression-{codec,level}`
* Cleanup
* Review feedback
* Review feedback
* Review feedback
* Update pyiceberg/io/pyarrow.py
Co-authored-by: Fokko Driesprong <[email protected]>
* Fixup
* Fixup
* Fixup
---------
Co-authored-by: Fokko Driesprong <[email protected]>
---
pyiceberg/io/pyarrow.py | 44 +++++--
tests/integration/test_reads.py | 6 +-
tests/integration/test_writes.py | 254 ++++++++++++++++++++-------------------
3 files changed, 173 insertions(+), 131 deletions(-)
diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py
index 3e056b1..f7c0aef 100644
--- a/pyiceberg/io/pyarrow.py
+++ b/pyiceberg/io/pyarrow.py
@@ -26,6 +26,7 @@ with the pyarrow library.
from __future__ import annotations
import concurrent.futures
+import fnmatch
import itertools
import logging
import os
@@ -1720,13 +1721,14 @@ def write_file(table: Table, tasks:
Iterator[WriteTask]) -> Iterator[DataFile]:
except StopIteration:
pass
+ parquet_writer_kwargs = _get_parquet_writer_kwargs(table.properties)
+
file_path =
f'{table.location()}/data/{task.generate_data_file_filename("parquet")}'
file_schema = schema_to_pyarrow(table.schema())
- collected_metrics: List[pq.FileMetaData] = []
fo = table.io.new_output(file_path)
with fo.create(overwrite=True) as fos:
- with pq.ParquetWriter(fos, schema=file_schema, version="1.0",
metadata_collector=collected_metrics) as writer:
+ with pq.ParquetWriter(fos, schema=file_schema, version="1.0",
**parquet_writer_kwargs) as writer:
writer.write_table(task.df)
data_file = DataFile(
@@ -1745,14 +1747,42 @@ def write_file(table: Table, tasks:
Iterator[WriteTask]) -> Iterator[DataFile]:
key_metadata=None,
)
- if len(collected_metrics) != 1:
- # One file has been written
- raise ValueError(f"Expected 1 entry, got: {collected_metrics}")
-
fill_parquet_file_metadata(
data_file=data_file,
- parquet_metadata=collected_metrics[0],
+ parquet_metadata=writer.writer.metadata,
stats_columns=compute_statistics_plan(table.schema(),
table.properties),
parquet_column_mapping=parquet_path_to_id_mapping(table.schema()),
)
return iter([data_file])
+
+
+def _get_parquet_writer_kwargs(table_properties: Properties) -> Dict[str, Any]:
+ def _get_int(key: str) -> Optional[int]:
+ if value := table_properties.get(key):
+ try:
+ return int(value)
+ except ValueError as e:
+ raise ValueError(f"Could not parse table property {key} to an
integer: {value}") from e
+ else:
+ return None
+
+ for key_pattern in [
+ "write.parquet.row-group-size-bytes",
+ "write.parquet.page-row-limit",
+ "write.parquet.bloom-filter-max-bytes",
+ "write.parquet.bloom-filter-enabled.column.*",
+ ]:
+ if unsupported_keys := fnmatch.filter(table_properties, key_pattern):
+ raise NotImplementedError(f"Parquet writer option(s)
{unsupported_keys} not implemented")
+
+ compression_codec =
table_properties.get("write.parquet.compression-codec", "zstd")
+ compression_level = _get_int("write.parquet.compression-level")
+ if compression_codec == "uncompressed":
+ compression_codec = "none"
+
+ return {
+ "compression": compression_codec,
+ "compression_level": compression_level,
+ "data_page_size": _get_int("write.parquet.page-size-bytes"),
+ "dictionary_pagesize_limit": _get_int("write.parquet.dict-size-bytes"),
+ }
diff --git a/tests/integration/test_reads.py b/tests/integration/test_reads.py
index 3fc06fb..b35b348 100644
--- a/tests/integration/test_reads.py
+++ b/tests/integration/test_reads.py
@@ -224,7 +224,11 @@ def test_ray_all_types(catalog: Catalog) -> None:
@pytest.mark.parametrize('catalog', [pytest.lazy_fixture('catalog_hive'),
pytest.lazy_fixture('catalog_rest')])
def test_pyarrow_to_iceberg_all_types(catalog: Catalog) -> None:
table_test_all_types = catalog.load_table("default.test_all_types")
- fs = S3FileSystem(endpoint_override="http://localhost:9000",
access_key="admin", secret_key="password")
+ fs = S3FileSystem(
+ endpoint_override=catalog.properties["s3.endpoint"],
+ access_key=catalog.properties["s3.access-key-id"],
+ secret_key=catalog.properties["s3.secret-access-key"],
+ )
data_file_paths = [task.file.file_path for task in
table_test_all_types.scan().plan_files()]
for data_file_path in data_file_paths:
uri = urlparse(data_file_path)
diff --git a/tests/integration/test_writes.py b/tests/integration/test_writes.py
index c8552a2..a65d98f 100644
--- a/tests/integration/test_writes.py
+++ b/tests/integration/test_writes.py
@@ -17,12 +17,17 @@
# pylint:disable=redefined-outer-name
import uuid
from datetime import date, datetime
+from typing import Any, Dict, List
+from urllib.parse import urlparse
import pyarrow as pa
+import pyarrow.parquet as pq
import pytest
+from pyarrow.fs import S3FileSystem
from pyspark.sql import SparkSession
+from pytest_mock.plugin import MockerFixture
-from pyiceberg.catalog import Catalog, load_catalog
+from pyiceberg.catalog import Catalog, Properties, Table, load_catalog
from pyiceberg.exceptions import NamespaceAlreadyExistsError, NoSuchTableError
from pyiceberg.schema import Schema
from pyiceberg.types import (
@@ -158,142 +163,79 @@ def arrow_table_with_only_nulls(pa_schema: pa.Schema) ->
pa.Table:
return pa.Table.from_pylist([{}, {}], schema=pa_schema)
[email protected](scope="session", autouse=True)
-def table_v1_with_null(session_catalog: Catalog, arrow_table_with_null:
pa.Table) -> None:
- identifier = "default.arrow_table_v1_with_null"
-
+def _create_table(session_catalog: Catalog, identifier: str, properties:
Properties, data: List[pa.Table]) -> Table:
try:
session_catalog.drop_table(identifier=identifier)
except NoSuchTableError:
pass
- tbl = session_catalog.create_table(identifier=identifier,
schema=TABLE_SCHEMA, properties={'format-version': '1'})
- tbl.append(arrow_table_with_null)
+ tbl = session_catalog.create_table(identifier=identifier,
schema=TABLE_SCHEMA, properties=properties)
+ for d in data:
+ tbl.append(d)
+
+ return tbl
+
[email protected](scope="session", autouse=True)
+def table_v1_with_null(session_catalog: Catalog, arrow_table_with_null:
pa.Table) -> None:
+ identifier = "default.arrow_table_v1_with_null"
+ tbl = _create_table(session_catalog, identifier, {"format-version": "1"},
[arrow_table_with_null])
assert tbl.format_version == 1, f"Expected v1, got: v{tbl.format_version}"
@pytest.fixture(scope="session", autouse=True)
def table_v1_without_data(session_catalog: Catalog, arrow_table_without_data:
pa.Table) -> None:
identifier = "default.arrow_table_v1_without_data"
-
- try:
- session_catalog.drop_table(identifier=identifier)
- except NoSuchTableError:
- pass
-
- tbl = session_catalog.create_table(identifier=identifier,
schema=TABLE_SCHEMA, properties={'format-version': '1'})
- tbl.append(arrow_table_without_data)
-
+ tbl = _create_table(session_catalog, identifier, {"format-version": "1"},
[arrow_table_without_data])
assert tbl.format_version == 1, f"Expected v1, got: v{tbl.format_version}"
@pytest.fixture(scope="session", autouse=True)
def table_v1_with_only_nulls(session_catalog: Catalog,
arrow_table_with_only_nulls: pa.Table) -> None:
identifier = "default.arrow_table_v1_with_only_nulls"
-
- try:
- session_catalog.drop_table(identifier=identifier)
- except NoSuchTableError:
- pass
-
- tbl = session_catalog.create_table(identifier=identifier,
schema=TABLE_SCHEMA, properties={'format-version': '1'})
- tbl.append(arrow_table_with_only_nulls)
-
+ tbl = _create_table(session_catalog, identifier, {"format-version": "1"},
[arrow_table_with_only_nulls])
assert tbl.format_version == 1, f"Expected v1, got: v{tbl.format_version}"
@pytest.fixture(scope="session", autouse=True)
def table_v1_appended_with_null(session_catalog: Catalog,
arrow_table_with_null: pa.Table) -> None:
identifier = "default.arrow_table_v1_appended_with_null"
-
- try:
- session_catalog.drop_table(identifier=identifier)
- except NoSuchTableError:
- pass
-
- tbl = session_catalog.create_table(identifier=identifier,
schema=TABLE_SCHEMA, properties={'format-version': '1'})
-
- for _ in range(2):
- tbl.append(arrow_table_with_null)
-
+ tbl = _create_table(session_catalog, identifier, {"format-version": "1"},
2 * [arrow_table_with_null])
assert tbl.format_version == 1, f"Expected v1, got: v{tbl.format_version}"
@pytest.fixture(scope="session", autouse=True)
def table_v2_with_null(session_catalog: Catalog, arrow_table_with_null:
pa.Table) -> None:
identifier = "default.arrow_table_v2_with_null"
-
- try:
- session_catalog.drop_table(identifier=identifier)
- except NoSuchTableError:
- pass
-
- tbl = session_catalog.create_table(identifier=identifier,
schema=TABLE_SCHEMA, properties={'format-version': '2'})
- tbl.append(arrow_table_with_null)
-
+ tbl = _create_table(session_catalog, identifier, {"format-version": "2"},
[arrow_table_with_null])
assert tbl.format_version == 2, f"Expected v2, got: v{tbl.format_version}"
@pytest.fixture(scope="session", autouse=True)
def table_v2_without_data(session_catalog: Catalog, arrow_table_without_data:
pa.Table) -> None:
identifier = "default.arrow_table_v2_without_data"
-
- try:
- session_catalog.drop_table(identifier=identifier)
- except NoSuchTableError:
- pass
-
- tbl = session_catalog.create_table(identifier=identifier,
schema=TABLE_SCHEMA, properties={'format-version': '2'})
- tbl.append(arrow_table_without_data)
-
+ tbl = _create_table(session_catalog, identifier, {"format-version": "2"},
[arrow_table_without_data])
assert tbl.format_version == 2, f"Expected v2, got: v{tbl.format_version}"
@pytest.fixture(scope="session", autouse=True)
def table_v2_with_only_nulls(session_catalog: Catalog,
arrow_table_with_only_nulls: pa.Table) -> None:
identifier = "default.arrow_table_v2_with_only_nulls"
-
- try:
- session_catalog.drop_table(identifier=identifier)
- except NoSuchTableError:
- pass
-
- tbl = session_catalog.create_table(identifier=identifier,
schema=TABLE_SCHEMA, properties={'format-version': '2'})
- tbl.append(arrow_table_with_only_nulls)
-
+ tbl = _create_table(session_catalog, identifier, {"format-version": "2"},
[arrow_table_with_only_nulls])
assert tbl.format_version == 2, f"Expected v2, got: v{tbl.format_version}"
@pytest.fixture(scope="session", autouse=True)
def table_v2_appended_with_null(session_catalog: Catalog,
arrow_table_with_null: pa.Table) -> None:
identifier = "default.arrow_table_v2_appended_with_null"
-
- try:
- session_catalog.drop_table(identifier=identifier)
- except NoSuchTableError:
- pass
-
- tbl = session_catalog.create_table(identifier=identifier,
schema=TABLE_SCHEMA, properties={'format-version': '2'})
-
- for _ in range(2):
- tbl.append(arrow_table_with_null)
-
+ tbl = _create_table(session_catalog, identifier, {"format-version": "2"},
2 * [arrow_table_with_null])
assert tbl.format_version == 2, f"Expected v2, got: v{tbl.format_version}"
@pytest.fixture(scope="session", autouse=True)
def table_v1_v2_appended_with_null(session_catalog: Catalog,
arrow_table_with_null: pa.Table) -> None:
identifier = "default.arrow_table_v1_v2_appended_with_null"
-
- try:
- session_catalog.drop_table(identifier=identifier)
- except NoSuchTableError:
- pass
-
- tbl = session_catalog.create_table(identifier=identifier,
schema=TABLE_SCHEMA, properties={'format-version': '1'})
- tbl.append(arrow_table_with_null)
-
+ tbl = _create_table(session_catalog, identifier, {"format-version": "1"},
[arrow_table_with_null])
assert tbl.format_version == 1, f"Expected v1, got: v{tbl.format_version}"
with tbl.transaction() as tx:
@@ -397,15 +339,7 @@ def test_query_filter_v1_v2_append_null(spark:
SparkSession, col: str) -> None:
@pytest.mark.integration
def test_summaries(spark: SparkSession, session_catalog: Catalog,
arrow_table_with_null: pa.Table) -> None:
identifier = "default.arrow_table_summaries"
-
- try:
- session_catalog.drop_table(identifier=identifier)
- except NoSuchTableError:
- pass
- tbl = session_catalog.create_table(identifier=identifier,
schema=TABLE_SCHEMA, properties={'format-version': '1'})
-
- tbl.append(arrow_table_with_null)
- tbl.append(arrow_table_with_null)
+ tbl = _create_table(session_catalog, identifier, {"format-version": "1"},
2 * [arrow_table_with_null])
tbl.overwrite(arrow_table_with_null)
rows = spark.sql(
@@ -423,39 +357,39 @@ def test_summaries(spark: SparkSession, session_catalog:
Catalog, arrow_table_wi
assert summaries[0] == {
'added-data-files': '1',
- 'added-files-size': '5283',
+ 'added-files-size': '5437',
'added-records': '3',
'total-data-files': '1',
'total-delete-files': '0',
'total-equality-deletes': '0',
- 'total-files-size': '5283',
+ 'total-files-size': '5437',
'total-position-deletes': '0',
'total-records': '3',
}
assert summaries[1] == {
'added-data-files': '1',
- 'added-files-size': '5283',
+ 'added-files-size': '5437',
'added-records': '3',
'total-data-files': '2',
'total-delete-files': '0',
'total-equality-deletes': '0',
- 'total-files-size': '10566',
+ 'total-files-size': '10874',
'total-position-deletes': '0',
'total-records': '6',
}
assert summaries[2] == {
'added-data-files': '1',
- 'added-files-size': '5283',
+ 'added-files-size': '5437',
'added-records': '3',
'deleted-data-files': '2',
'deleted-records': '6',
- 'removed-files-size': '10566',
+ 'removed-files-size': '10874',
'total-data-files': '1',
'total-delete-files': '0',
'total-equality-deletes': '0',
- 'total-files-size': '5283',
+ 'total-files-size': '5437',
'total-position-deletes': '0',
'total-records': '3',
}
@@ -464,12 +398,7 @@ def test_summaries(spark: SparkSession, session_catalog:
Catalog, arrow_table_wi
@pytest.mark.integration
def test_data_files(spark: SparkSession, session_catalog: Catalog,
arrow_table_with_null: pa.Table) -> None:
identifier = "default.arrow_data_files"
-
- try:
- session_catalog.drop_table(identifier=identifier)
- except NoSuchTableError:
- pass
- tbl = session_catalog.create_table(identifier=identifier,
schema=TABLE_SCHEMA, properties={'format-version': '1'})
+ tbl = _create_table(session_catalog, identifier, {"format-version": "1"},
[])
tbl.overwrite(arrow_table_with_null)
# should produce a DELETE entry
@@ -490,15 +419,100 @@ def test_data_files(spark: SparkSession,
session_catalog: Catalog, arrow_table_w
@pytest.mark.integration
-def test_invalid_arguments(spark: SparkSession, session_catalog: Catalog,
arrow_table_with_null: pa.Table) -> None:
- identifier = "default.arrow_data_files"
[email protected]("format_version", ["1", "2"])
[email protected](
+ "properties, expected_compression_name",
+ [
+ # REST catalog uses Zstandard by default:
https://github.com/apache/iceberg/pull/8593
+ ({}, "ZSTD"),
+ ({"write.parquet.compression-codec": "uncompressed"}, "UNCOMPRESSED"),
+ ({"write.parquet.compression-codec": "gzip",
"write.parquet.compression-level": "1"}, "GZIP"),
+ ({"write.parquet.compression-codec": "zstd",
"write.parquet.compression-level": "1"}, "ZSTD"),
+ ({"write.parquet.compression-codec": "snappy"}, "SNAPPY"),
+ ],
+)
+def test_write_parquet_compression_properties(
+ spark: SparkSession,
+ session_catalog: Catalog,
+ arrow_table_with_null: pa.Table,
+ format_version: str,
+ properties: Dict[str, Any],
+ expected_compression_name: str,
+) -> None:
+ identifier = "default.write_parquet_compression_properties"
- try:
- session_catalog.drop_table(identifier=identifier)
- except NoSuchTableError:
- pass
+ tbl = _create_table(session_catalog, identifier, {"format-version":
format_version, **properties}, [arrow_table_with_null])
- tbl = session_catalog.create_table(identifier=identifier,
schema=TABLE_SCHEMA, properties={'format-version': '1'})
+ data_file_paths = [task.file.file_path for task in tbl.scan().plan_files()]
+
+ fs = S3FileSystem(
+ endpoint_override=session_catalog.properties["s3.endpoint"],
+ access_key=session_catalog.properties["s3.access-key-id"],
+ secret_key=session_catalog.properties["s3.secret-access-key"],
+ )
+ uri = urlparse(data_file_paths[0])
+ with fs.open_input_file(f"{uri.netloc}{uri.path}") as f:
+ parquet_metadata = pq.read_metadata(f)
+ compression = parquet_metadata.row_group(0).column(0).compression
+
+ assert compression == expected_compression_name
+
+
[email protected]
[email protected](
+ "properties, expected_kwargs",
+ [
+ ({"write.parquet.page-size-bytes": "42"}, {"data_page_size": 42}),
+ ({"write.parquet.dict-size-bytes": "42"},
{"dictionary_pagesize_limit": 42}),
+ ],
+)
+def test_write_parquet_other_properties(
+ mocker: MockerFixture,
+ spark: SparkSession,
+ session_catalog: Catalog,
+ arrow_table_with_null: pa.Table,
+ properties: Dict[str, Any],
+ expected_kwargs: Dict[str, Any],
+) -> None:
+ print(type(mocker))
+ identifier = "default.test_write_parquet_other_properties"
+
+ # The properties we test cannot be checked on the resulting Parquet file,
so we spy on the ParquetWriter call instead
+ ParquetWriter = mocker.spy(pq, "ParquetWriter")
+ _create_table(session_catalog, identifier, properties,
[arrow_table_with_null])
+
+ call_kwargs = ParquetWriter.call_args[1]
+ for key, value in expected_kwargs.items():
+ assert call_kwargs.get(key) == value
+
+
[email protected]
[email protected](
+ "properties",
+ [
+ {"write.parquet.row-group-size-bytes": "42"},
+ {"write.parquet.page-row-limit": "42"},
+ {"write.parquet.bloom-filter-enabled.column.bool": "42"},
+ {"write.parquet.bloom-filter-max-bytes": "42"},
+ ],
+)
+def test_write_parquet_unsupported_properties(
+ spark: SparkSession,
+ session_catalog: Catalog,
+ arrow_table_with_null: pa.Table,
+ properties: Dict[str, str],
+) -> None:
+ identifier = "default.write_parquet_unsupported_properties"
+
+ tbl = _create_table(session_catalog, identifier, properties, [])
+ with pytest.raises(NotImplementedError):
+ tbl.append(arrow_table_with_null)
+
+
[email protected]
+def test_invalid_arguments(spark: SparkSession, session_catalog: Catalog,
arrow_table_with_null: pa.Table) -> None:
+ identifier = "default.arrow_data_files"
+ tbl = _create_table(session_catalog, identifier, {'format-version': '1'},
[])
with pytest.raises(ValueError, match="Expected PyArrow table, got: not a
df"):
tbl.overwrite("not a df")
@@ -512,15 +526,9 @@ def test_summaries_with_only_nulls(
spark: SparkSession, session_catalog: Catalog, arrow_table_without_data:
pa.Table, arrow_table_with_only_nulls: pa.Table
) -> None:
identifier = "default.arrow_table_summaries_with_only_nulls"
-
- try:
- session_catalog.drop_table(identifier=identifier)
- except NoSuchTableError:
- pass
- tbl = session_catalog.create_table(identifier=identifier,
schema=TABLE_SCHEMA, properties={'format-version': '1'})
-
- tbl.append(arrow_table_without_data)
- tbl.append(arrow_table_with_only_nulls)
+ tbl = _create_table(
+ session_catalog, identifier, {'format-version': '1'},
[arrow_table_without_data, arrow_table_with_only_nulls]
+ )
tbl.overwrite(arrow_table_without_data)
rows = spark.sql(
@@ -547,12 +555,12 @@ def test_summaries_with_only_nulls(
assert summaries[1] == {
'added-data-files': '1',
- 'added-files-size': '4045',
+ 'added-files-size': '4217',
'added-records': '2',
'total-data-files': '1',
'total-delete-files': '0',
'total-equality-deletes': '0',
- 'total-files-size': '4045',
+ 'total-files-size': '4217',
'total-position-deletes': '0',
'total-records': '2',
}