This is an automated email from the ASF dual-hosted git repository.
fokko pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-python.git
The following commit(s) were added to refs/heads/main by this push:
new 4b7e9182 Construction of filenames for partitioned writes (#453)
4b7e9182 is described below
commit 4b7e9182a15c1da60c2d8668fabe063f8ab6b5b2
Author: Adrian Qin <[email protected]>
AuthorDate: Thu Feb 29 16:14:26 2024 -0500
Construction of filenames for partitioned writes (#453)
* PartitionKey Class And Tests
* fix linting; add decimal input transform test
* fix bool to path lower case; fix timestamptz tests; other pr comments
* clean up
* add uuid partition type
* clean up; rename ambiguous function name
---
pyiceberg/partitioning.py | 101 +++-
pyiceberg/transforms.py | 5 +
tests/conftest.py | 51 +-
tests/integration/test_partitioning_key.py | 772 +++++++++++++++++++++++++++++
tests/integration/test_writes.py | 48 --
5 files changed, 925 insertions(+), 52 deletions(-)
diff --git a/pyiceberg/partitioning.py b/pyiceberg/partitioning.py
index cd5a957b..6fa02862 100644
--- a/pyiceberg/partitioning.py
+++ b/pyiceberg/partitioning.py
@@ -16,9 +16,21 @@
# under the License.
from __future__ import annotations
+import uuid
from abc import ABC, abstractmethod
+from dataclasses import dataclass
+from datetime import date, datetime
from functools import cached_property, singledispatch
-from typing import Any, Dict, Generic, List, Optional, Tuple, TypeVar
+from typing import (
+ Any,
+ Dict,
+ Generic,
+ List,
+ Optional,
+ Tuple,
+ TypeVar,
+)
+from urllib.parse import quote
from pydantic import (
BeforeValidator,
@@ -41,8 +53,18 @@ from pyiceberg.transforms import (
YearTransform,
parse_transform,
)
-from pyiceberg.typedef import IcebergBaseModel
-from pyiceberg.types import NestedField, StructType
+from pyiceberg.typedef import IcebergBaseModel, Record
+from pyiceberg.types import (
+ DateType,
+ IcebergType,
+ NestedField,
+ PrimitiveType,
+ StructType,
+ TimestampType,
+ TimestamptzType,
+ UUIDType,
+)
+from pyiceberg.utils.datetime import date_to_days, datetime_to_micros
INITIAL_PARTITION_SPEC_ID = 0
PARTITION_FIELD_ID_START: int = 1000
@@ -199,6 +221,23 @@ class PartitionSpec(IcebergBaseModel):
nested_fields.append(NestedField(field.field_id, field.name,
result_type, required=False))
return StructType(*nested_fields)
+ def partition_to_path(self, data: Record, schema: Schema) -> str:
+ partition_type = self.partition_type(schema)
+ field_types = partition_type.fields
+
+ field_strs = []
+ value_strs = []
+ for pos, value in enumerate(data.record_fields()):
+ partition_field = self.fields[pos]
+ value_str =
partition_field.transform.to_human_string(field_types[pos].field_type,
value=value)
+
+ value_str = quote(value_str, safe='')
+ value_strs.append(value_str)
+ field_strs.append(partition_field.name)
+
+ path = "/".join([field_str + "=" + value_str for field_str, value_str
in zip(field_strs, value_strs)])
+ return path
+
UNPARTITIONED_PARTITION_SPEC = PartitionSpec(spec_id=0)
@@ -326,3 +365,59 @@ def _visit_partition_field(schema: Schema, field:
PartitionField, visitor: Parti
return visitor.unknown(field.field_id, source_name, field.source_id,
repr(transform))
else:
raise ValueError(f"Unknown transform {transform}")
+
+
+@dataclass(frozen=True)
+class PartitionFieldValue:
+ field: PartitionField
+ value: Any
+
+
+@dataclass(frozen=True)
+class PartitionKey:
+ raw_partition_field_values: List[PartitionFieldValue]
+ partition_spec: PartitionSpec
+ schema: Schema
+
+ @cached_property
+ def partition(self) -> Record: # partition key transformed with iceberg
internal representation as input
+ iceberg_typed_key_values = {}
+ for raw_partition_field_value in self.raw_partition_field_values:
+ partition_fields =
self.partition_spec.source_id_to_fields_map[raw_partition_field_value.field.source_id]
+ if len(partition_fields) != 1:
+ raise ValueError("partition_fields must contain exactly one
field.")
+ partition_field = partition_fields[0]
+ iceberg_type =
self.schema.find_field(name_or_id=raw_partition_field_value.field.source_id).field_type
+ iceberg_typed_value = _to_partition_representation(iceberg_type,
raw_partition_field_value.value)
+ transformed_value =
partition_field.transform.transform(iceberg_type)(iceberg_typed_value)
+ iceberg_typed_key_values[partition_field.name] = transformed_value
+ return Record(**iceberg_typed_key_values)
+
+ def to_path(self) -> str:
+ return self.partition_spec.partition_to_path(self.partition,
self.schema)
+
+
+@singledispatch
+def _to_partition_representation(type: IcebergType, value: Any) -> Any:
+ return TypeError(f"Unsupported partition field type: {type}")
+
+
+@_to_partition_representation.register(TimestampType)
+@_to_partition_representation.register(TimestamptzType)
+def _(type: IcebergType, value: Optional[datetime]) -> Optional[int]:
+ return datetime_to_micros(value) if value is not None else None
+
+
+@_to_partition_representation.register(DateType)
+def _(type: IcebergType, value: Optional[date]) -> Optional[int]:
+ return date_to_days(value) if value is not None else None
+
+
+@_to_partition_representation.register(UUIDType)
+def _(type: IcebergType, value: Optional[uuid.UUID]) -> Optional[str]:
+ return str(value) if value is not None else None
+
+
+@_to_partition_representation.register(PrimitiveType)
+def _(type: IcebergType, value: Optional[Any]) -> Optional[Any]:
+ return value
diff --git a/pyiceberg/transforms.py b/pyiceberg/transforms.py
index 9f499a3d..e678a77e 100644
--- a/pyiceberg/transforms.py
+++ b/pyiceberg/transforms.py
@@ -655,6 +655,11 @@ def _(value: int, _type: IcebergType) -> str:
return _int_to_human_string(_type, value)
+@_human_string.register(bool)
+def _(value: bool, _type: IcebergType) -> str:
+ return str(value).lower()
+
+
@singledispatch
def _int_to_human_string(_type: IcebergType, value: int) -> str:
return str(value)
diff --git a/tests/conftest.py b/tests/conftest.py
index aa66f360..5b488c70 100644
--- a/tests/conftest.py
+++ b/tests/conftest.py
@@ -46,9 +46,10 @@ from typing import (
import boto3
import pytest
from moto import mock_aws
+from pyspark.sql import SparkSession
from pyiceberg import schema
-from pyiceberg.catalog import Catalog
+from pyiceberg.catalog import Catalog, load_catalog
from pyiceberg.catalog.noop import NoopCatalog
from pyiceberg.expressions import BoundReference
from pyiceberg.io import (
@@ -1925,3 +1926,51 @@ def table_v2(example_table_metadata_v2: Dict[str, Any])
-> Table:
@pytest.fixture
def bound_reference_str() -> BoundReference[str]:
return BoundReference(field=NestedField(1, "field", StringType(),
required=False), accessor=Accessor(position=0, inner=None))
+
+
[email protected](scope="session")
+def session_catalog() -> Catalog:
+ return load_catalog(
+ "local",
+ **{
+ "type": "rest",
+ "uri": "http://localhost:8181",
+ "s3.endpoint": "http://localhost:9000",
+ "s3.access-key-id": "admin",
+ "s3.secret-access-key": "password",
+ },
+ )
+
+
[email protected](scope="session")
+def spark() -> SparkSession:
+ import importlib.metadata
+ import os
+
+ spark_version =
".".join(importlib.metadata.version("pyspark").split(".")[:2])
+ scala_version = "2.12"
+ iceberg_version = "1.4.3"
+
+ os.environ["PYSPARK_SUBMIT_ARGS"] = (
+ f"--packages
org.apache.iceberg:iceberg-spark-runtime-{spark_version}_{scala_version}:{iceberg_version},"
+ f"org.apache.iceberg:iceberg-aws-bundle:{iceberg_version}
pyspark-shell"
+ )
+ os.environ["AWS_REGION"] = "us-east-1"
+ os.environ["AWS_ACCESS_KEY_ID"] = "admin"
+ os.environ["AWS_SECRET_ACCESS_KEY"] = "password"
+
+ spark = (
+ SparkSession.builder.appName("PyIceberg integration test")
+ .config("spark.sql.extensions",
"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
+ .config("spark.sql.catalog.integration",
"org.apache.iceberg.spark.SparkCatalog")
+ .config("spark.sql.catalog.integration.catalog-impl",
"org.apache.iceberg.rest.RESTCatalog")
+ .config("spark.sql.catalog.integration.uri", "http://localhost:8181")
+ .config("spark.sql.catalog.integration.io-impl",
"org.apache.iceberg.aws.s3.S3FileIO")
+ .config("spark.sql.catalog.integration.warehouse",
"s3://warehouse/wh/")
+ .config("spark.sql.catalog.integration.s3.endpoint",
"http://localhost:9000")
+ .config("spark.sql.catalog.integration.s3.path-style-access", "true")
+ .config("spark.sql.defaultCatalog", "integration")
+ .getOrCreate()
+ )
+
+ return spark
diff --git a/tests/integration/test_partitioning_key.py
b/tests/integration/test_partitioning_key.py
new file mode 100644
index 00000000..12056bac
--- /dev/null
+++ b/tests/integration/test_partitioning_key.py
@@ -0,0 +1,772 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+# pylint:disable=redefined-outer-name
+import uuid
+from datetime import date, datetime, timedelta, timezone
+from decimal import Decimal
+from typing import Any, List
+
+import pytest
+from pyspark.sql import SparkSession
+from pyspark.sql.utils import AnalysisException
+
+from pyiceberg.catalog import Catalog
+from pyiceberg.partitioning import PartitionField, PartitionFieldValue,
PartitionKey, PartitionSpec
+from pyiceberg.schema import Schema
+from pyiceberg.transforms import (
+ BucketTransform,
+ DayTransform,
+ HourTransform,
+ IdentityTransform,
+ MonthTransform,
+ TruncateTransform,
+ YearTransform,
+)
+from pyiceberg.typedef import Record
+from pyiceberg.types import (
+ BinaryType,
+ BooleanType,
+ DateType,
+ DecimalType,
+ DoubleType,
+ FixedType,
+ FloatType,
+ IntegerType,
+ LongType,
+ NestedField,
+ StringType,
+ TimestampType,
+ TimestamptzType,
+ UUIDType,
+)
+
+TABLE_SCHEMA = Schema(
+ NestedField(field_id=1, name="boolean_field", field_type=BooleanType(),
required=False),
+ NestedField(field_id=2, name="string_field", field_type=StringType(),
required=False),
+ NestedField(field_id=3, name="string_long_field", field_type=StringType(),
required=False),
+ NestedField(field_id=4, name="int_field", field_type=IntegerType(),
required=False),
+ NestedField(field_id=5, name="long_field", field_type=LongType(),
required=False),
+ NestedField(field_id=6, name="float_field", field_type=FloatType(),
required=False),
+ NestedField(field_id=7, name="double_field", field_type=DoubleType(),
required=False),
+ NestedField(field_id=8, name="timestamp_field",
field_type=TimestampType(), required=False),
+ NestedField(field_id=9, name="timestamptz_field",
field_type=TimestamptzType(), required=False),
+ NestedField(field_id=10, name="date_field", field_type=DateType(),
required=False),
+ # NestedField(field_id=11, name="time", field_type=TimeType(),
required=False),
+ NestedField(field_id=11, name="binary_field", field_type=BinaryType(),
required=False),
+ NestedField(field_id=12, name="fixed_field", field_type=FixedType(16),
required=False),
+ NestedField(field_id=13, name="decimal_field", field_type=DecimalType(5,
2), required=False),
+ NestedField(field_id=14, name="uuid_field", field_type=UUIDType(),
required=False),
+)
+
+
+identifier = "default.test_table"
+
+
[email protected](
+ "partition_fields, partition_values, expected_partition_record,
expected_hive_partition_path_slice, spark_create_table_sql_for_justification,
spark_data_insert_sql_for_justification",
+ [
+ # # Identity Transform
+ (
+ [PartitionField(source_id=1, field_id=1001,
transform=IdentityTransform(), name="boolean_field")],
+ [False],
+ Record(boolean_field=False),
+ "boolean_field=false",
+ f"""CREATE TABLE {identifier} (
+ boolean_field boolean,
+ string_field string
+ )
+ USING iceberg
+ PARTITIONED BY (
+ identity(boolean_field) -- Partitioning by 'boolean_field'
+ )
+ """,
+ f"""INSERT INTO {identifier}
+ VALUES
+ (false, 'Boolean field set to false');
+ """,
+ ),
+ (
+ [PartitionField(source_id=2, field_id=1001,
transform=IdentityTransform(), name="string_field")],
+ ["sample_string"],
+ Record(string_field="sample_string"),
+ "string_field=sample_string",
+ f"""CREATE TABLE {identifier} (
+ string_field string,
+ another_string_field string
+ )
+ USING iceberg
+ PARTITIONED BY (
+ identity(string_field)
+ )
+ """,
+ f"""INSERT INTO {identifier}
+ VALUES
+ ('sample_string', 'Another string value')
+ """,
+ ),
+ (
+ [PartitionField(source_id=4, field_id=1001,
transform=IdentityTransform(), name="int_field")],
+ [42],
+ Record(int_field=42),
+ "int_field=42",
+ f"""CREATE TABLE {identifier} (
+ int_field int,
+ string_field string
+ )
+ USING iceberg
+ PARTITIONED BY (
+ identity(int_field)
+ )
+ """,
+ f"""INSERT INTO {identifier}
+ VALUES
+ (42, 'Associated string value for int 42')
+ """,
+ ),
+ (
+ [PartitionField(source_id=5, field_id=1001,
transform=IdentityTransform(), name="long_field")],
+ [1234567890123456789],
+ Record(long_field=1234567890123456789),
+ "long_field=1234567890123456789",
+ f"""CREATE TABLE {identifier} (
+ long_field bigint,
+ string_field string
+ )
+ USING iceberg
+ PARTITIONED BY (
+ identity(long_field)
+ )
+ """,
+ f"""INSERT INTO {identifier}
+ VALUES
+ (1234567890123456789, 'Associated string value for long
1234567890123456789')
+ """,
+ ),
+ (
+ [PartitionField(source_id=6, field_id=1001,
transform=IdentityTransform(), name="float_field")],
+ [3.14],
+ Record(float_field=3.14),
+ "float_field=3.14",
+ # spark writes differently as pyiceberg,
Record[float_field=3.140000104904175], path:float_field=3.14 (Record has
difference)
+ # so justification (compare expected value with spark behavior)
would fail.
+ None,
+ None,
+ # f"""CREATE TABLE {identifier} (
+ # float_field float,
+ # string_field string
+ # )
+ # USING iceberg
+ # PARTITIONED BY (
+ # identity(float_field)
+ # )
+ # """,
+ # f"""INSERT INTO {identifier}
+ # VALUES
+ # (3.14, 'Associated string value for float 3.14')
+ # """
+ ),
+ (
+ [PartitionField(source_id=7, field_id=1001,
transform=IdentityTransform(), name="double_field")],
+ [6.282],
+ Record(double_field=6.282),
+ "double_field=6.282",
+ # spark writes differently as pyiceberg,
Record[double_field=6.2820000648498535] path:double_field=6.282 (Record has
difference)
+ # so justification (compare expected value with spark behavior)
would fail.
+ None,
+ None,
+ # f"""CREATE TABLE {identifier} (
+ # double_field double,
+ # string_field string
+ # )
+ # USING iceberg
+ # PARTITIONED BY (
+ # identity(double_field)
+ # )
+ # """,
+ # f"""INSERT INTO {identifier}
+ # VALUES
+ # (6.282, 'Associated string value for double 6.282')
+ # """
+ ),
+ (
+ [PartitionField(source_id=8, field_id=1001,
transform=IdentityTransform(), name="timestamp_field")],
+ [datetime(2023, 1, 1, 12, 0, 1, 999)],
+ Record(timestamp_field=1672574401000999),
+ "timestamp_field=2023-01-01T12%3A00%3A01.000999",
+ f"""CREATE TABLE {identifier} (
+ timestamp_field timestamp_ntz,
+ string_field string
+ )
+ USING iceberg
+ PARTITIONED BY (
+ identity(timestamp_field)
+ )
+ """,
+ f"""INSERT INTO {identifier}
+ VALUES
+ (CAST('2023-01-01 12:00:01.000999' AS TIMESTAMP_NTZ), 'Associated
string value for timestamp 2023-01-01T12:00:00')
+ """,
+ ),
+ (
+ [PartitionField(source_id=8, field_id=1001,
transform=IdentityTransform(), name="timestamp_field")],
+ [datetime(2023, 1, 1, 12, 0, 1)],
+ Record(timestamp_field=1672574401000000),
+ "timestamp_field=2023-01-01T12%3A00%3A01",
+ f"""CREATE TABLE {identifier} (
+ timestamp_field timestamp_ntz,
+ string_field string
+ )
+ USING iceberg
+ PARTITIONED BY (
+ identity(timestamp_field)
+ )
+ """,
+ f"""INSERT INTO {identifier}
+ VALUES
+ (CAST('2023-01-01 12:00:01' AS TIMESTAMP_NTZ), 'Associated string
value for timestamp 2023-01-01T12:00:00')
+ """,
+ ),
+ (
+ [PartitionField(source_id=8, field_id=1001,
transform=IdentityTransform(), name="timestamp_field")],
+ [datetime(2023, 1, 1, 12, 0, 0)],
+ Record(timestamp_field=1672574400000000),
+ "timestamp_field=2023-01-01T12%3A00%3A00",
+ # Spark writes differently as pyiceberg, so justification (compare
expected value with spark behavior) would fail
+ # AssertionError: assert 'timestamp_field=2023-01-01T12%3A00%3A00'
in
's3://warehouse/default/test_table/data/timestamp_field=2023-01-01T12%3A00/00000-5-f9dca69a-9fb7-4830-9ef6-62d3d7afc09e-00001.parquet'
+ # TLDR: CAST('2023-01-01 12:00:00' AS TIMESTAMP_NTZ) becomes
2023-01-01T12:00 in the hive partition path when spark writes it (without the
seconds).
+ None,
+ None,
+ # f"""CREATE TABLE {identifier} (
+ # timestamp_field timestamp_ntz,
+ # string_field string
+ # )
+ # USING iceberg
+ # PARTITIONED BY (
+ # identity(timestamp_field)
+ # )
+ # """,
+ # f"""INSERT INTO {identifier}
+ # VALUES
+ # (CAST('2023-01-01 12:00:00' AS TIMESTAMP_NTZ), 'Associated
string value for timestamp 2023-01-01T12:00:00')
+ # """
+ ),
+ (
+ [PartitionField(source_id=9, field_id=1001,
transform=IdentityTransform(), name="timestamptz_field")],
+ [datetime(2023, 1, 1, 12, 0, 1, 999,
tzinfo=timezone(timedelta(hours=3)))],
+ Record(timestamptz_field=1672563601000999),
+ "timestamptz_field=2023-01-01T09%3A00%3A01.000999%2B00%3A00",
+ # Spark writes differently as pyiceberg, so justification (compare
expected value with spark behavior) would fail
+ # AssertionError: assert
'timestamptz_field=2023-01-01T09%3A00%3A01.000999%2B00%3A00' in
's3://warehouse/default/test_table/data/timestamptz_field=2023-01-01T09%3A00%3A01.000999Z/00000-5-b710fc4d-66b6-47f1-b8ae-6208f8aaa2d4-00001.parquet'
+ # TLDR: CAST('2023-01-01 12:00:01.000999+03:00' AS TIMESTAMP)
becomes 2023-01-01T09:00:01.000999Z in the hive partition path when spark
writes it (while iceberg: timestamptz_field=2023-01-01T09:00:01.000999+00:00).
+ None,
+ None,
+ # f"""CREATE TABLE {identifier} (
+ # timestamptz_field timestamp,
+ # string_field string
+ # )
+ # USING iceberg
+ # PARTITIONED BY (
+ # identity(timestamptz_field)
+ # )
+ # """,
+ # f"""INSERT INTO {identifier}
+ # VALUES
+ # (CAST('2023-01-01 12:00:01.000999+03:00' AS TIMESTAMP),
'Associated string value for timestamp 2023-01-01 12:00:01.000999+03:00')
+ # """
+ ),
+ (
+ [PartitionField(source_id=10, field_id=1001,
transform=IdentityTransform(), name="date_field")],
+ [date(2023, 1, 1)],
+ Record(date_field=19358),
+ "date_field=2023-01-01",
+ f"""CREATE TABLE {identifier} (
+ date_field date,
+ string_field string
+ )
+ USING iceberg
+ PARTITIONED BY (
+ identity(date_field)
+ )
+ """,
+ f"""INSERT INTO {identifier}
+ VALUES
+ (CAST('2023-01-01' AS DATE), 'Associated string value for date
2023-01-01')
+ """,
+ ),
+ (
+ [PartitionField(source_id=14, field_id=1001,
transform=IdentityTransform(), name="uuid_field")],
+ [uuid.UUID("f47ac10b-58cc-4372-a567-0e02b2c3d479")],
+ Record(uuid_field="f47ac10b-58cc-4372-a567-0e02b2c3d479"),
+ "uuid_field=f47ac10b-58cc-4372-a567-0e02b2c3d479",
+ f"""CREATE TABLE {identifier} (
+ uuid_field string,
+ string_field string
+ )
+ USING iceberg
+ PARTITIONED BY (
+ identity(uuid_field)
+ )
+ """,
+ f"""INSERT INTO {identifier}
+ VALUES
+ ('f47ac10b-58cc-4372-a567-0e02b2c3d479', 'Associated string value
for UUID f47ac10b-58cc-4372-a567-0e02b2c3d479')
+ """,
+ ),
+ (
+ [PartitionField(source_id=11, field_id=1001,
transform=IdentityTransform(), name="binary_field")],
+ [b'example'],
+ Record(binary_field=b'example'),
+ "binary_field=ZXhhbXBsZQ%3D%3D",
+ f"""CREATE TABLE {identifier} (
+ binary_field binary,
+ string_field string
+ )
+ USING iceberg
+ PARTITIONED BY (
+ identity(binary_field)
+ )
+ """,
+ f"""INSERT INTO {identifier}
+ VALUES
+ (CAST('example' AS BINARY), 'Associated string value for binary
`example`')
+ """,
+ ),
+ (
+ [PartitionField(source_id=13, field_id=1001,
transform=IdentityTransform(), name="decimal_field")],
+ [Decimal('123.45')],
+ Record(decimal_field=Decimal('123.45')),
+ "decimal_field=123.45",
+ f"""CREATE TABLE {identifier} (
+ decimal_field decimal(5,2),
+ string_field string
+ )
+ USING iceberg
+ PARTITIONED BY (
+ identity(decimal_field)
+ )
+ """,
+ f"""INSERT INTO {identifier}
+ VALUES
+ (123.45, 'Associated string value for decimal 123.45')
+ """,
+ ),
+ # # Year Month Day Hour Transform
+ # Month Transform
+ (
+ [PartitionField(source_id=8, field_id=1001,
transform=MonthTransform(), name="timestamp_field_month")],
+ [datetime(2023, 1, 1, 11, 55, 59, 999999)],
+ Record(timestamp_field_month=((2023 - 1970) * 12)),
+ "timestamp_field_month=2023-01",
+ f"""CREATE TABLE {identifier} (
+ timestamp_field timestamp_ntz,
+ string_field string
+ )
+ USING iceberg
+ PARTITIONED BY (
+ month(timestamp_field) -- Partitioning by month from
'timestamp_field'
+ )
+ """,
+ f"""INSERT INTO {identifier}
+ VALUES
+ (CAST('2023-01-01 11:55:59.999999' AS TIMESTAMP_NTZ), 'Event at
2023-01-01 11:55:59.999999');
+ """,
+ ),
+ (
+ [PartitionField(source_id=9, field_id=1001,
transform=MonthTransform(), name="timestamptz_field_month")],
+ [datetime(2023, 1, 1, 12, 0, 1, 999,
tzinfo=timezone(timedelta(hours=3)))],
+ Record(timestamptz_field_month=((2023 - 1970) * 12 + 1 - 1)),
+ "timestamptz_field_month=2023-01",
+ f"""CREATE TABLE {identifier} (
+ timestamptz_field timestamp,
+ string_field string
+ )
+ USING iceberg
+ PARTITIONED BY (
+ month(timestamptz_field)
+ )
+ """,
+ f"""INSERT INTO {identifier}
+ VALUES
+ (CAST('2023-01-01 12:00:01.000999+03:00' AS TIMESTAMP), 'Event at
2023-01-01 12:00:01.000999+03:00');
+ """,
+ ),
+ (
+ [PartitionField(source_id=10, field_id=1001,
transform=MonthTransform(), name="date_field_month")],
+ [date(2023, 1, 1)],
+ Record(date_field_month=((2023 - 1970) * 12)),
+ "date_field_month=2023-01",
+ f"""CREATE TABLE {identifier} (
+ date_field date,
+ string_field string
+ )
+ USING iceberg
+ PARTITIONED BY (
+ month(date_field) -- Partitioning by month from 'date_field'
+ )
+ """,
+ f"""INSERT INTO {identifier}
+ VALUES
+ (CAST('2023-01-01' AS DATE), 'Event on 2023-01-01');
+ """,
+ ),
+ # Year Transform
+ (
+ [PartitionField(source_id=8, field_id=1001,
transform=YearTransform(), name="timestamp_field_year")],
+ [datetime(2023, 1, 1, 11, 55, 59, 999999)],
+ Record(timestamp_field_year=(2023 - 1970)),
+ "timestamp_field_year=2023",
+ f"""CREATE TABLE {identifier} (
+ timestamp_field timestamp,
+ string_field string
+ )
+ USING iceberg
+ PARTITIONED BY (
+ year(timestamp_field) -- Partitioning by year from
'timestamp_field'
+ )
+ """,
+ f"""INSERT INTO {identifier}
+ VALUES
+ (CAST('2023-01-01 11:55:59.999999' AS TIMESTAMP), 'Event at
2023-01-01 11:55:59.999999');
+ """,
+ ),
+ (
+ [PartitionField(source_id=9, field_id=1001,
transform=YearTransform(), name="timestamptz_field_year")],
+ [datetime(2023, 1, 1, 12, 0, 1, 999,
tzinfo=timezone(timedelta(hours=3)))],
+ Record(timestamptz_field_year=53),
+ "timestamptz_field_year=2023",
+ f"""CREATE TABLE {identifier} (
+ timestamptz_field timestamp,
+ string_field string
+ )
+ USING iceberg
+ PARTITIONED BY (
+ year(timestamptz_field)
+ )
+ """,
+ f"""INSERT INTO {identifier}
+ VALUES
+ (CAST('2023-01-01 12:00:01.000999+03:00' AS TIMESTAMP), 'Event at
2023-01-01 12:00:01.000999+03:00');
+ """,
+ ),
+ (
+ [PartitionField(source_id=10, field_id=1001,
transform=YearTransform(), name="date_field_year")],
+ [date(2023, 1, 1)],
+ Record(date_field_year=(2023 - 1970)),
+ "date_field_year=2023",
+ f"""CREATE TABLE {identifier} (
+ date_field date,
+ string_field string
+ )
+ USING iceberg
+ PARTITIONED BY (
+ year(date_field) -- Partitioning by year from 'date_field'
+ )
+ """,
+ f"""INSERT INTO {identifier}
+ VALUES
+ (CAST('2023-01-01' AS DATE), 'Event on 2023-01-01');
+ """,
+ ),
+ # # Day Transform
+ (
+ [PartitionField(source_id=8, field_id=1001,
transform=DayTransform(), name="timestamp_field_day")],
+ [datetime(2023, 1, 1, 11, 55, 59, 999999)],
+ Record(timestamp_field_day=19358),
+ "timestamp_field_day=2023-01-01",
+ f"""CREATE TABLE {identifier} (
+ timestamp_field timestamp,
+ string_field string
+ )
+ USING iceberg
+ PARTITIONED BY (
+ day(timestamp_field) -- Partitioning by day from
'timestamp_field'
+ )
+ """,
+ f"""INSERT INTO {identifier}
+ VALUES
+ (CAST('2023-01-01' AS DATE), 'Event on 2023-01-01');
+ """,
+ ),
+ (
+ [PartitionField(source_id=9, field_id=1001,
transform=DayTransform(), name="timestamptz_field_day")],
+ [datetime(2023, 1, 1, 12, 0, 1, 999,
tzinfo=timezone(timedelta(hours=3)))],
+ Record(timestamptz_field_day=19358),
+ "timestamptz_field_day=2023-01-01",
+ f"""CREATE TABLE {identifier} (
+ timestamptz_field timestamp,
+ string_field string
+ )
+ USING iceberg
+ PARTITIONED BY (
+ day(timestamptz_field)
+ )
+ """,
+ f"""INSERT INTO {identifier}
+ VALUES
+ (CAST('2023-01-01 12:00:01.000999+03:00' AS TIMESTAMP), 'Event at
2023-01-01 12:00:01.000999+03:00');
+ """,
+ ),
+ (
+ [PartitionField(source_id=10, field_id=1001,
transform=DayTransform(), name="date_field_day")],
+ [date(2023, 1, 1)],
+ Record(date_field_day=19358),
+ "date_field_day=2023-01-01",
+ f"""CREATE TABLE {identifier} (
+ date_field date,
+ string_field string
+ )
+ USING iceberg
+ PARTITIONED BY (
+ day(date_field) -- Partitioning by day from 'date_field'
+ )
+ """,
+ f"""INSERT INTO {identifier}
+ VALUES
+ (CAST('2023-01-01' AS DATE), 'Event on 2023-01-01');
+ """,
+ ),
+ # Hour Transform
+ (
+ [PartitionField(source_id=8, field_id=1001,
transform=HourTransform(), name="timestamp_field_hour")],
+ [datetime(2023, 1, 1, 11, 55, 59, 999999)],
+ Record(timestamp_field_hour=464603),
+ "timestamp_field_hour=2023-01-01-11",
+ f"""CREATE TABLE {identifier} (
+ timestamp_field timestamp,
+ string_field string
+ )
+ USING iceberg
+ PARTITIONED BY (
+ hour(timestamp_field) -- Partitioning by hour from
'timestamp_field'
+ )
+ """,
+ f"""INSERT INTO {identifier}
+ VALUES
+ (CAST('2023-01-01 11:55:59.999999' AS TIMESTAMP), 'Event
within the 11th hour of 2023-01-01');
+ """,
+ ),
+ (
+ [PartitionField(source_id=9, field_id=1001,
transform=HourTransform(), name="timestamptz_field_hour")],
+ [datetime(2023, 1, 1, 12, 0, 1, 999,
tzinfo=timezone(timedelta(hours=3)))],
+ Record(timestamptz_field_hour=464601),
+ "timestamptz_field_hour=2023-01-01-09",
+ f"""CREATE TABLE {identifier} (
+ timestamptz_field timestamp,
+ string_field string
+ )
+ USING iceberg
+ PARTITIONED BY (
+ hour(timestamptz_field)
+ )
+ """,
+ f"""INSERT INTO {identifier}
+ VALUES
+ (CAST('2023-01-01 12:00:01.000999+03:00' AS TIMESTAMP), 'Event at
2023-01-01 12:00:01.000999+03:00');
+ """,
+ ),
+ # Truncate Transform
+ (
+ [PartitionField(source_id=4, field_id=1001,
transform=TruncateTransform(10), name="int_field_trunc")],
+ [12345],
+ Record(int_field_trunc=12340),
+ "int_field_trunc=12340",
+ f"""CREATE TABLE {identifier} (
+ int_field int,
+ string_field string
+ )
+ USING iceberg
+ PARTITIONED BY (
+ truncate(int_field, 10) -- Truncating 'int_field' integer
column to a width of 10
+ )
+ """,
+ f"""INSERT INTO {identifier}
+ VALUES
+ (12345, 'Sample data for int');
+ """,
+ ),
+ (
+ [PartitionField(source_id=5, field_id=1001,
transform=TruncateTransform(2), name="bigint_field_trunc")],
+ [2**32 + 1],
+ Record(bigint_field_trunc=2**32), # 4294967296
+ "bigint_field_trunc=4294967296",
+ f"""CREATE TABLE {identifier} (
+ bigint_field bigint,
+ string_field string
+ )
+ USING iceberg
+ PARTITIONED BY (
+ truncate(bigint_field, 2) -- Truncating 'bigint_field' long
column to a width of 2
+ )
+ """,
+ f"""INSERT INTO {identifier}
+ VALUES
+ (4294967297, 'Sample data for long');
+ """,
+ ),
+ (
+ [PartitionField(source_id=2, field_id=1001,
transform=TruncateTransform(3), name="string_field_trunc")],
+ ["abcdefg"],
+ Record(string_field_trunc="abc"),
+ "string_field_trunc=abc",
+ f"""CREATE TABLE {identifier} (
+ string_field string,
+ another_string_field string
+ )
+ USING iceberg
+ PARTITIONED BY (
+ truncate(string_field, 3) -- Truncating 'string_field' string
column to a length of 3 characters
+ )
+ """,
+ f"""INSERT INTO {identifier}
+ VALUES
+ ('abcdefg', 'Another sample for string');
+ """,
+ ),
+ (
+ [PartitionField(source_id=13, field_id=1001,
transform=TruncateTransform(width=5), name="decimal_field_trunc")],
+ [Decimal('678.93')],
+ Record(decimal_field_trunc=Decimal('678.90')),
+ "decimal_field_trunc=678.90", # Assuming truncation width of 1
leads to truncating to 670
+ f"""CREATE TABLE {identifier} (
+ decimal_field decimal(5,2),
+ string_field string
+ )
+ USING iceberg
+ PARTITIONED BY (
+ truncate(decimal_field, 2)
+ )
+ """,
+ f"""INSERT INTO {identifier}
+ VALUES
+ (678.90, 'Associated string value for decimal 678.90')
+ """,
+ ),
+ (
+ [PartitionField(source_id=11, field_id=1001,
transform=TruncateTransform(10), name="binary_field_trunc")],
+ [b'HELLOICEBERG'],
+ Record(binary_field_trunc=b'HELLOICEBE'),
+ "binary_field_trunc=SEVMTE9JQ0VCRQ%3D%3D",
+ f"""CREATE TABLE {identifier} (
+ binary_field binary,
+ string_field string
+ )
+ USING iceberg
+ PARTITIONED BY (
+ truncate(binary_field, 10) -- Truncating 'binary_field'
binary column to a length of 10 bytes
+ )
+ """,
+ f"""INSERT INTO {identifier}
+ VALUES
+ (binary('HELLOICEBERG'), 'Sample data for binary');
+ """,
+ ),
+ # Bucket Transform
+ (
+ [PartitionField(source_id=4, field_id=1001,
transform=BucketTransform(2), name="int_field_bucket")],
+ [10],
+ Record(int_field_bucket=0),
+ "int_field_bucket=0",
+ f"""CREATE TABLE {identifier} (
+ int_field int,
+ string_field string
+ )
+ USING iceberg
+ PARTITIONED BY (
+ bucket(2, int_field) -- Distributing 'int_field' across 2
buckets
+ )
+ """,
+ f"""INSERT INTO {identifier}
+ VALUES
+ (10, 'Integer with value 10');
+ """,
+ ),
+ # Test multiple field combinations could generate the Partition record
and hive partition path correctly
+ (
+ [
+ PartitionField(source_id=8, field_id=1001,
transform=YearTransform(), name="timestamp_field_year"),
+ PartitionField(source_id=10, field_id=1002,
transform=DayTransform(), name="date_field_day"),
+ ],
+ [
+ datetime(2023, 1, 1, 11, 55, 59, 999999),
+ date(2023, 1, 1),
+ ],
+ Record(timestamp_field_year=53, date_field_day=19358),
+ "timestamp_field_year=2023/date_field_day=2023-01-01",
+ f"""CREATE TABLE {identifier} (
+ timestamp_field timestamp,
+ date_field date,
+ string_field string
+ )
+ USING iceberg
+ PARTITIONED BY (
+ year(timestamp_field),
+ day(date_field)
+ )
+ """,
+ f"""INSERT INTO {identifier}
+ VALUES
+ (CAST('2023-01-01 11:55:59.999999' AS TIMESTAMP),
CAST('2023-01-01' AS DATE), 'some data');
+ """,
+ ),
+ ],
+)
[email protected]
+def test_partition_key(
+ session_catalog: Catalog,
+ spark: SparkSession,
+ partition_fields: List[PartitionField],
+ partition_values: List[Any],
+ expected_partition_record: Record,
+ expected_hive_partition_path_slice: str,
+ spark_create_table_sql_for_justification: str,
+ spark_data_insert_sql_for_justification: str,
+) -> None:
+ partition_field_values = [PartitionFieldValue(field, value) for field,
value in zip(partition_fields, partition_values)]
+ spec = PartitionSpec(*partition_fields)
+
+ key = PartitionKey(
+ raw_partition_field_values=partition_field_values,
+ partition_spec=spec,
+ schema=TABLE_SCHEMA,
+ )
+
+ # key.partition is used to write the metadata in DataFile, ManifestFile
and all above layers
+ assert key.partition == expected_partition_record
+ # key.to_path() generates the hive partitioning part of the to-write
parquet file path
+ assert key.to_path() == expected_hive_partition_path_slice
+
+ # Justify expected values are not made up but conform to spark behaviors
+ if spark_create_table_sql_for_justification is not None and
spark_data_insert_sql_for_justification is not None:
+ try:
+ spark.sql(f"drop table {identifier}")
+ except AnalysisException:
+ pass
+
+ spark.sql(spark_create_table_sql_for_justification)
+ spark.sql(spark_data_insert_sql_for_justification)
+
+ iceberg_table = session_catalog.load_table(identifier=identifier)
+ snapshot = iceberg_table.current_snapshot()
+ assert snapshot
+ spark_partition_for_justification = (
+
snapshot.manifests(iceberg_table.io)[0].fetch_manifest_entry(iceberg_table.io)[0].data_file.partition
+ )
+ spark_path_for_justification = (
+
snapshot.manifests(iceberg_table.io)[0].fetch_manifest_entry(iceberg_table.io)[0].data_file.file_path
+ )
+ assert spark_partition_for_justification == expected_partition_record
+ assert expected_hive_partition_path_slice in
spark_path_for_justification
diff --git a/tests/integration/test_writes.py b/tests/integration/test_writes.py
index 1eeec2b3..2b851e14 100644
--- a/tests/integration/test_writes.py
+++ b/tests/integration/test_writes.py
@@ -94,20 +94,6 @@ TABLE_SCHEMA = Schema(
)
[email protected](scope="session")
-def session_catalog() -> Catalog:
- return load_catalog(
- "local",
- **{
- "type": "rest",
- "uri": "http://localhost:8181",
- "s3.endpoint": "http://localhost:9000",
- "s3.access-key-id": "admin",
- "s3.secret-access-key": "password",
- },
- )
-
-
@pytest.fixture(scope="session")
def pa_schema() -> pa.Schema:
return pa.schema([
@@ -231,40 +217,6 @@ def table_v1_v2_appended_with_null(session_catalog:
Catalog, arrow_table_with_nu
assert tbl.format_version == 2, f"Expected v2, got: v{tbl.format_version}"
[email protected](scope="session")
-def spark() -> SparkSession:
- import importlib.metadata
- import os
-
- spark_version =
".".join(importlib.metadata.version("pyspark").split(".")[:2])
- scala_version = "2.12"
- iceberg_version = "1.4.3"
-
- os.environ["PYSPARK_SUBMIT_ARGS"] = (
- f"--packages
org.apache.iceberg:iceberg-spark-runtime-{spark_version}_{scala_version}:{iceberg_version},"
- f"org.apache.iceberg:iceberg-aws-bundle:{iceberg_version}
pyspark-shell"
- )
- os.environ["AWS_REGION"] = "us-east-1"
- os.environ["AWS_ACCESS_KEY_ID"] = "admin"
- os.environ["AWS_SECRET_ACCESS_KEY"] = "password"
-
- spark = (
- SparkSession.builder.appName("PyIceberg integration test")
- .config("spark.sql.extensions",
"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
- .config("spark.sql.catalog.integration",
"org.apache.iceberg.spark.SparkCatalog")
- .config("spark.sql.catalog.integration.catalog-impl",
"org.apache.iceberg.rest.RESTCatalog")
- .config("spark.sql.catalog.integration.uri", "http://localhost:8181")
- .config("spark.sql.catalog.integration.io-impl",
"org.apache.iceberg.aws.s3.S3FileIO")
- .config("spark.sql.catalog.integration.warehouse",
"s3://warehouse/wh/")
- .config("spark.sql.catalog.integration.s3.endpoint",
"http://localhost:9000")
- .config("spark.sql.catalog.integration.s3.path-style-access", "true")
- .config("spark.sql.defaultCatalog", "integration")
- .getOrCreate()
- )
-
- return spark
-
-
@pytest.mark.integration
@pytest.mark.parametrize("format_version", [1, 2])
def test_query_count(spark: SparkSession, format_version: int) -> None: