This is an automated email from the ASF dual-hosted git repository.

fokko pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-python.git


The following commit(s) were added to refs/heads/main by this push:
     new 9e4ed29  Use `write.parquet.compression-{codec,level}` (#358)
9e4ed29 is described below

commit 9e4ed296de7448e036dedc227b71dc51214fb679
Author: Jonas Haag <[email protected]>
AuthorDate: Mon Feb 5 10:40:43 2024 +0100

    Use `write.parquet.compression-{codec,level}` (#358)
    
    * Use `write.parquet.compression-{codec,level}`
    
    * Cleanup
    
    * Review feedback
    
    * Review feedback
    
    * Review feedback
    
    * Update pyiceberg/io/pyarrow.py
    
    Co-authored-by: Fokko Driesprong <[email protected]>
    
    * Fixup
    
    * Fixup
    
    * Fixup
    
    ---------
    
    Co-authored-by: Fokko Driesprong <[email protected]>
---
 pyiceberg/io/pyarrow.py          |  44 +++++--
 tests/integration/test_reads.py  |   6 +-
 tests/integration/test_writes.py | 254 ++++++++++++++++++++-------------------
 3 files changed, 173 insertions(+), 131 deletions(-)

diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py
index 3e056b1..f7c0aef 100644
--- a/pyiceberg/io/pyarrow.py
+++ b/pyiceberg/io/pyarrow.py
@@ -26,6 +26,7 @@ with the pyarrow library.
 from __future__ import annotations
 
 import concurrent.futures
+import fnmatch
 import itertools
 import logging
 import os
@@ -1720,13 +1721,14 @@ def write_file(table: Table, tasks: 
Iterator[WriteTask]) -> Iterator[DataFile]:
     except StopIteration:
         pass
 
+    parquet_writer_kwargs = _get_parquet_writer_kwargs(table.properties)
+
     file_path = 
f'{table.location()}/data/{task.generate_data_file_filename("parquet")}'
     file_schema = schema_to_pyarrow(table.schema())
 
-    collected_metrics: List[pq.FileMetaData] = []
     fo = table.io.new_output(file_path)
     with fo.create(overwrite=True) as fos:
-        with pq.ParquetWriter(fos, schema=file_schema, version="1.0", 
metadata_collector=collected_metrics) as writer:
+        with pq.ParquetWriter(fos, schema=file_schema, version="1.0", 
**parquet_writer_kwargs) as writer:
             writer.write_table(task.df)
 
     data_file = DataFile(
@@ -1745,14 +1747,42 @@ def write_file(table: Table, tasks: 
Iterator[WriteTask]) -> Iterator[DataFile]:
         key_metadata=None,
     )
 
-    if len(collected_metrics) != 1:
-        # One file has been written
-        raise ValueError(f"Expected 1 entry, got: {collected_metrics}")
-
     fill_parquet_file_metadata(
         data_file=data_file,
-        parquet_metadata=collected_metrics[0],
+        parquet_metadata=writer.writer.metadata,
         stats_columns=compute_statistics_plan(table.schema(), 
table.properties),
         parquet_column_mapping=parquet_path_to_id_mapping(table.schema()),
     )
     return iter([data_file])
+
+
+def _get_parquet_writer_kwargs(table_properties: Properties) -> Dict[str, Any]:
+    def _get_int(key: str) -> Optional[int]:
+        if value := table_properties.get(key):
+            try:
+                return int(value)
+            except ValueError as e:
+                raise ValueError(f"Could not parse table property {key} to an 
integer: {value}") from e
+        else:
+            return None
+
+    for key_pattern in [
+        "write.parquet.row-group-size-bytes",
+        "write.parquet.page-row-limit",
+        "write.parquet.bloom-filter-max-bytes",
+        "write.parquet.bloom-filter-enabled.column.*",
+    ]:
+        if unsupported_keys := fnmatch.filter(table_properties, key_pattern):
+            raise NotImplementedError(f"Parquet writer option(s) 
{unsupported_keys} not implemented")
+
+    compression_codec = 
table_properties.get("write.parquet.compression-codec", "zstd")
+    compression_level = _get_int("write.parquet.compression-level")
+    if compression_codec == "uncompressed":
+        compression_codec = "none"
+
+    return {
+        "compression": compression_codec,
+        "compression_level": compression_level,
+        "data_page_size": _get_int("write.parquet.page-size-bytes"),
+        "dictionary_pagesize_limit": _get_int("write.parquet.dict-size-bytes"),
+    }
diff --git a/tests/integration/test_reads.py b/tests/integration/test_reads.py
index 3fc06fb..b35b348 100644
--- a/tests/integration/test_reads.py
+++ b/tests/integration/test_reads.py
@@ -224,7 +224,11 @@ def test_ray_all_types(catalog: Catalog) -> None:
 @pytest.mark.parametrize('catalog', [pytest.lazy_fixture('catalog_hive'), 
pytest.lazy_fixture('catalog_rest')])
 def test_pyarrow_to_iceberg_all_types(catalog: Catalog) -> None:
     table_test_all_types = catalog.load_table("default.test_all_types")
-    fs = S3FileSystem(endpoint_override="http://localhost:9000";, 
access_key="admin", secret_key="password")
+    fs = S3FileSystem(
+        endpoint_override=catalog.properties["s3.endpoint"],
+        access_key=catalog.properties["s3.access-key-id"],
+        secret_key=catalog.properties["s3.secret-access-key"],
+    )
     data_file_paths = [task.file.file_path for task in 
table_test_all_types.scan().plan_files()]
     for data_file_path in data_file_paths:
         uri = urlparse(data_file_path)
diff --git a/tests/integration/test_writes.py b/tests/integration/test_writes.py
index c8552a2..a65d98f 100644
--- a/tests/integration/test_writes.py
+++ b/tests/integration/test_writes.py
@@ -17,12 +17,17 @@
 # pylint:disable=redefined-outer-name
 import uuid
 from datetime import date, datetime
+from typing import Any, Dict, List
+from urllib.parse import urlparse
 
 import pyarrow as pa
+import pyarrow.parquet as pq
 import pytest
+from pyarrow.fs import S3FileSystem
 from pyspark.sql import SparkSession
+from pytest_mock.plugin import MockerFixture
 
-from pyiceberg.catalog import Catalog, load_catalog
+from pyiceberg.catalog import Catalog, Properties, Table, load_catalog
 from pyiceberg.exceptions import NamespaceAlreadyExistsError, NoSuchTableError
 from pyiceberg.schema import Schema
 from pyiceberg.types import (
@@ -158,142 +163,79 @@ def arrow_table_with_only_nulls(pa_schema: pa.Schema) -> 
pa.Table:
     return pa.Table.from_pylist([{}, {}], schema=pa_schema)
 
 
[email protected](scope="session", autouse=True)
-def table_v1_with_null(session_catalog: Catalog, arrow_table_with_null: 
pa.Table) -> None:
-    identifier = "default.arrow_table_v1_with_null"
-
+def _create_table(session_catalog: Catalog, identifier: str, properties: 
Properties, data: List[pa.Table]) -> Table:
     try:
         session_catalog.drop_table(identifier=identifier)
     except NoSuchTableError:
         pass
 
-    tbl = session_catalog.create_table(identifier=identifier, 
schema=TABLE_SCHEMA, properties={'format-version': '1'})
-    tbl.append(arrow_table_with_null)
+    tbl = session_catalog.create_table(identifier=identifier, 
schema=TABLE_SCHEMA, properties=properties)
+    for d in data:
+        tbl.append(d)
+
+    return tbl
 
+
[email protected](scope="session", autouse=True)
+def table_v1_with_null(session_catalog: Catalog, arrow_table_with_null: 
pa.Table) -> None:
+    identifier = "default.arrow_table_v1_with_null"
+    tbl = _create_table(session_catalog, identifier, {"format-version": "1"}, 
[arrow_table_with_null])
     assert tbl.format_version == 1, f"Expected v1, got: v{tbl.format_version}"
 
 
 @pytest.fixture(scope="session", autouse=True)
 def table_v1_without_data(session_catalog: Catalog, arrow_table_without_data: 
pa.Table) -> None:
     identifier = "default.arrow_table_v1_without_data"
-
-    try:
-        session_catalog.drop_table(identifier=identifier)
-    except NoSuchTableError:
-        pass
-
-    tbl = session_catalog.create_table(identifier=identifier, 
schema=TABLE_SCHEMA, properties={'format-version': '1'})
-    tbl.append(arrow_table_without_data)
-
+    tbl = _create_table(session_catalog, identifier, {"format-version": "1"}, 
[arrow_table_without_data])
     assert tbl.format_version == 1, f"Expected v1, got: v{tbl.format_version}"
 
 
 @pytest.fixture(scope="session", autouse=True)
 def table_v1_with_only_nulls(session_catalog: Catalog, 
arrow_table_with_only_nulls: pa.Table) -> None:
     identifier = "default.arrow_table_v1_with_only_nulls"
-
-    try:
-        session_catalog.drop_table(identifier=identifier)
-    except NoSuchTableError:
-        pass
-
-    tbl = session_catalog.create_table(identifier=identifier, 
schema=TABLE_SCHEMA, properties={'format-version': '1'})
-    tbl.append(arrow_table_with_only_nulls)
-
+    tbl = _create_table(session_catalog, identifier, {"format-version": "1"}, 
[arrow_table_with_only_nulls])
     assert tbl.format_version == 1, f"Expected v1, got: v{tbl.format_version}"
 
 
 @pytest.fixture(scope="session", autouse=True)
 def table_v1_appended_with_null(session_catalog: Catalog, 
arrow_table_with_null: pa.Table) -> None:
     identifier = "default.arrow_table_v1_appended_with_null"
-
-    try:
-        session_catalog.drop_table(identifier=identifier)
-    except NoSuchTableError:
-        pass
-
-    tbl = session_catalog.create_table(identifier=identifier, 
schema=TABLE_SCHEMA, properties={'format-version': '1'})
-
-    for _ in range(2):
-        tbl.append(arrow_table_with_null)
-
+    tbl = _create_table(session_catalog, identifier, {"format-version": "1"}, 
2 * [arrow_table_with_null])
     assert tbl.format_version == 1, f"Expected v1, got: v{tbl.format_version}"
 
 
 @pytest.fixture(scope="session", autouse=True)
 def table_v2_with_null(session_catalog: Catalog, arrow_table_with_null: 
pa.Table) -> None:
     identifier = "default.arrow_table_v2_with_null"
-
-    try:
-        session_catalog.drop_table(identifier=identifier)
-    except NoSuchTableError:
-        pass
-
-    tbl = session_catalog.create_table(identifier=identifier, 
schema=TABLE_SCHEMA, properties={'format-version': '2'})
-    tbl.append(arrow_table_with_null)
-
+    tbl = _create_table(session_catalog, identifier, {"format-version": "2"}, 
[arrow_table_with_null])
     assert tbl.format_version == 2, f"Expected v2, got: v{tbl.format_version}"
 
 
 @pytest.fixture(scope="session", autouse=True)
 def table_v2_without_data(session_catalog: Catalog, arrow_table_without_data: 
pa.Table) -> None:
     identifier = "default.arrow_table_v2_without_data"
-
-    try:
-        session_catalog.drop_table(identifier=identifier)
-    except NoSuchTableError:
-        pass
-
-    tbl = session_catalog.create_table(identifier=identifier, 
schema=TABLE_SCHEMA, properties={'format-version': '2'})
-    tbl.append(arrow_table_without_data)
-
+    tbl = _create_table(session_catalog, identifier, {"format-version": "2"}, 
[arrow_table_without_data])
     assert tbl.format_version == 2, f"Expected v2, got: v{tbl.format_version}"
 
 
 @pytest.fixture(scope="session", autouse=True)
 def table_v2_with_only_nulls(session_catalog: Catalog, 
arrow_table_with_only_nulls: pa.Table) -> None:
     identifier = "default.arrow_table_v2_with_only_nulls"
-
-    try:
-        session_catalog.drop_table(identifier=identifier)
-    except NoSuchTableError:
-        pass
-
-    tbl = session_catalog.create_table(identifier=identifier, 
schema=TABLE_SCHEMA, properties={'format-version': '2'})
-    tbl.append(arrow_table_with_only_nulls)
-
+    tbl = _create_table(session_catalog, identifier, {"format-version": "2"}, 
[arrow_table_with_only_nulls])
     assert tbl.format_version == 2, f"Expected v2, got: v{tbl.format_version}"
 
 
 @pytest.fixture(scope="session", autouse=True)
 def table_v2_appended_with_null(session_catalog: Catalog, 
arrow_table_with_null: pa.Table) -> None:
     identifier = "default.arrow_table_v2_appended_with_null"
-
-    try:
-        session_catalog.drop_table(identifier=identifier)
-    except NoSuchTableError:
-        pass
-
-    tbl = session_catalog.create_table(identifier=identifier, 
schema=TABLE_SCHEMA, properties={'format-version': '2'})
-
-    for _ in range(2):
-        tbl.append(arrow_table_with_null)
-
+    tbl = _create_table(session_catalog, identifier, {"format-version": "2"}, 
2 * [arrow_table_with_null])
     assert tbl.format_version == 2, f"Expected v2, got: v{tbl.format_version}"
 
 
 @pytest.fixture(scope="session", autouse=True)
 def table_v1_v2_appended_with_null(session_catalog: Catalog, 
arrow_table_with_null: pa.Table) -> None:
     identifier = "default.arrow_table_v1_v2_appended_with_null"
-
-    try:
-        session_catalog.drop_table(identifier=identifier)
-    except NoSuchTableError:
-        pass
-
-    tbl = session_catalog.create_table(identifier=identifier, 
schema=TABLE_SCHEMA, properties={'format-version': '1'})
-    tbl.append(arrow_table_with_null)
-
+    tbl = _create_table(session_catalog, identifier, {"format-version": "1"}, 
[arrow_table_with_null])
     assert tbl.format_version == 1, f"Expected v1, got: v{tbl.format_version}"
 
     with tbl.transaction() as tx:
@@ -397,15 +339,7 @@ def test_query_filter_v1_v2_append_null(spark: 
SparkSession, col: str) -> None:
 @pytest.mark.integration
 def test_summaries(spark: SparkSession, session_catalog: Catalog, 
arrow_table_with_null: pa.Table) -> None:
     identifier = "default.arrow_table_summaries"
-
-    try:
-        session_catalog.drop_table(identifier=identifier)
-    except NoSuchTableError:
-        pass
-    tbl = session_catalog.create_table(identifier=identifier, 
schema=TABLE_SCHEMA, properties={'format-version': '1'})
-
-    tbl.append(arrow_table_with_null)
-    tbl.append(arrow_table_with_null)
+    tbl = _create_table(session_catalog, identifier, {"format-version": "1"}, 
2 * [arrow_table_with_null])
     tbl.overwrite(arrow_table_with_null)
 
     rows = spark.sql(
@@ -423,39 +357,39 @@ def test_summaries(spark: SparkSession, session_catalog: 
Catalog, arrow_table_wi
 
     assert summaries[0] == {
         'added-data-files': '1',
-        'added-files-size': '5283',
+        'added-files-size': '5437',
         'added-records': '3',
         'total-data-files': '1',
         'total-delete-files': '0',
         'total-equality-deletes': '0',
-        'total-files-size': '5283',
+        'total-files-size': '5437',
         'total-position-deletes': '0',
         'total-records': '3',
     }
 
     assert summaries[1] == {
         'added-data-files': '1',
-        'added-files-size': '5283',
+        'added-files-size': '5437',
         'added-records': '3',
         'total-data-files': '2',
         'total-delete-files': '0',
         'total-equality-deletes': '0',
-        'total-files-size': '10566',
+        'total-files-size': '10874',
         'total-position-deletes': '0',
         'total-records': '6',
     }
 
     assert summaries[2] == {
         'added-data-files': '1',
-        'added-files-size': '5283',
+        'added-files-size': '5437',
         'added-records': '3',
         'deleted-data-files': '2',
         'deleted-records': '6',
-        'removed-files-size': '10566',
+        'removed-files-size': '10874',
         'total-data-files': '1',
         'total-delete-files': '0',
         'total-equality-deletes': '0',
-        'total-files-size': '5283',
+        'total-files-size': '5437',
         'total-position-deletes': '0',
         'total-records': '3',
     }
@@ -464,12 +398,7 @@ def test_summaries(spark: SparkSession, session_catalog: 
Catalog, arrow_table_wi
 @pytest.mark.integration
 def test_data_files(spark: SparkSession, session_catalog: Catalog, 
arrow_table_with_null: pa.Table) -> None:
     identifier = "default.arrow_data_files"
-
-    try:
-        session_catalog.drop_table(identifier=identifier)
-    except NoSuchTableError:
-        pass
-    tbl = session_catalog.create_table(identifier=identifier, 
schema=TABLE_SCHEMA, properties={'format-version': '1'})
+    tbl = _create_table(session_catalog, identifier, {"format-version": "1"}, 
[])
 
     tbl.overwrite(arrow_table_with_null)
     # should produce a DELETE entry
@@ -490,15 +419,100 @@ def test_data_files(spark: SparkSession, 
session_catalog: Catalog, arrow_table_w
 
 
 @pytest.mark.integration
-def test_invalid_arguments(spark: SparkSession, session_catalog: Catalog, 
arrow_table_with_null: pa.Table) -> None:
-    identifier = "default.arrow_data_files"
[email protected]("format_version", ["1", "2"])
[email protected](
+    "properties, expected_compression_name",
+    [
+        # REST catalog uses Zstandard by default: 
https://github.com/apache/iceberg/pull/8593
+        ({}, "ZSTD"),
+        ({"write.parquet.compression-codec": "uncompressed"}, "UNCOMPRESSED"),
+        ({"write.parquet.compression-codec": "gzip", 
"write.parquet.compression-level": "1"}, "GZIP"),
+        ({"write.parquet.compression-codec": "zstd", 
"write.parquet.compression-level": "1"}, "ZSTD"),
+        ({"write.parquet.compression-codec": "snappy"}, "SNAPPY"),
+    ],
+)
+def test_write_parquet_compression_properties(
+    spark: SparkSession,
+    session_catalog: Catalog,
+    arrow_table_with_null: pa.Table,
+    format_version: str,
+    properties: Dict[str, Any],
+    expected_compression_name: str,
+) -> None:
+    identifier = "default.write_parquet_compression_properties"
 
-    try:
-        session_catalog.drop_table(identifier=identifier)
-    except NoSuchTableError:
-        pass
+    tbl = _create_table(session_catalog, identifier, {"format-version": 
format_version, **properties}, [arrow_table_with_null])
 
-    tbl = session_catalog.create_table(identifier=identifier, 
schema=TABLE_SCHEMA, properties={'format-version': '1'})
+    data_file_paths = [task.file.file_path for task in tbl.scan().plan_files()]
+
+    fs = S3FileSystem(
+        endpoint_override=session_catalog.properties["s3.endpoint"],
+        access_key=session_catalog.properties["s3.access-key-id"],
+        secret_key=session_catalog.properties["s3.secret-access-key"],
+    )
+    uri = urlparse(data_file_paths[0])
+    with fs.open_input_file(f"{uri.netloc}{uri.path}") as f:
+        parquet_metadata = pq.read_metadata(f)
+        compression = parquet_metadata.row_group(0).column(0).compression
+
+    assert compression == expected_compression_name
+
+
[email protected]
[email protected](
+    "properties, expected_kwargs",
+    [
+        ({"write.parquet.page-size-bytes": "42"}, {"data_page_size": 42}),
+        ({"write.parquet.dict-size-bytes": "42"}, 
{"dictionary_pagesize_limit": 42}),
+    ],
+)
+def test_write_parquet_other_properties(
+    mocker: MockerFixture,
+    spark: SparkSession,
+    session_catalog: Catalog,
+    arrow_table_with_null: pa.Table,
+    properties: Dict[str, Any],
+    expected_kwargs: Dict[str, Any],
+) -> None:
+    print(type(mocker))
+    identifier = "default.test_write_parquet_other_properties"
+
+    # The properties we test cannot be checked on the resulting Parquet file, 
so we spy on the ParquetWriter call instead
+    ParquetWriter = mocker.spy(pq, "ParquetWriter")
+    _create_table(session_catalog, identifier, properties, 
[arrow_table_with_null])
+
+    call_kwargs = ParquetWriter.call_args[1]
+    for key, value in expected_kwargs.items():
+        assert call_kwargs.get(key) == value
+
+
[email protected]
[email protected](
+    "properties",
+    [
+        {"write.parquet.row-group-size-bytes": "42"},
+        {"write.parquet.page-row-limit": "42"},
+        {"write.parquet.bloom-filter-enabled.column.bool": "42"},
+        {"write.parquet.bloom-filter-max-bytes": "42"},
+    ],
+)
+def test_write_parquet_unsupported_properties(
+    spark: SparkSession,
+    session_catalog: Catalog,
+    arrow_table_with_null: pa.Table,
+    properties: Dict[str, str],
+) -> None:
+    identifier = "default.write_parquet_unsupported_properties"
+
+    tbl = _create_table(session_catalog, identifier, properties, [])
+    with pytest.raises(NotImplementedError):
+        tbl.append(arrow_table_with_null)
+
+
[email protected]
+def test_invalid_arguments(spark: SparkSession, session_catalog: Catalog, 
arrow_table_with_null: pa.Table) -> None:
+    identifier = "default.arrow_data_files"
+    tbl = _create_table(session_catalog, identifier, {'format-version': '1'}, 
[])
 
     with pytest.raises(ValueError, match="Expected PyArrow table, got: not a 
df"):
         tbl.overwrite("not a df")
@@ -512,15 +526,9 @@ def test_summaries_with_only_nulls(
     spark: SparkSession, session_catalog: Catalog, arrow_table_without_data: 
pa.Table, arrow_table_with_only_nulls: pa.Table
 ) -> None:
     identifier = "default.arrow_table_summaries_with_only_nulls"
-
-    try:
-        session_catalog.drop_table(identifier=identifier)
-    except NoSuchTableError:
-        pass
-    tbl = session_catalog.create_table(identifier=identifier, 
schema=TABLE_SCHEMA, properties={'format-version': '1'})
-
-    tbl.append(arrow_table_without_data)
-    tbl.append(arrow_table_with_only_nulls)
+    tbl = _create_table(
+        session_catalog, identifier, {'format-version': '1'}, 
[arrow_table_without_data, arrow_table_with_only_nulls]
+    )
     tbl.overwrite(arrow_table_without_data)
 
     rows = spark.sql(
@@ -547,12 +555,12 @@ def test_summaries_with_only_nulls(
 
     assert summaries[1] == {
         'added-data-files': '1',
-        'added-files-size': '4045',
+        'added-files-size': '4217',
         'added-records': '2',
         'total-data-files': '1',
         'total-delete-files': '0',
         'total-equality-deletes': '0',
-        'total-files-size': '4045',
+        'total-files-size': '4217',
         'total-position-deletes': '0',
         'total-records': '2',
     }

Reply via email to