rkuhlercadent commented on issue #1401:
URL:
https://github.com/apache/iceberg-python/issues/1401#issuecomment-2523813801
Here is a python script that will demonstrate the issue.
```
import os
import datetime
from pyiceberg.catalog.sql import SqlCatalog
from pyiceberg.io.pyarrow import data_file_statistics_from_parquet_metadata,
compute_statistics_plan
from pyiceberg.io.pyarrow import parquet_path_to_id_mapping
from pyiceberg.schema import Schema
from pyiceberg.manifest import DataFile, DataFileContent, FileFormat
from pyiceberg.table import TableProperties
from pyiceberg.typedef import Record
from pyiceberg.types import StringType, IntegerType, NestedField
from pyiceberg.partitioning import PartitionSpec, PartitionField
from pyiceberg.transforms import IdentityTransform
from pyiceberg.table.name_mapping import create_mapping_from_schema
import pyarrow as pa
import pyarrow.parquet as pq
def demonstrate_identity_partition_scan_issue():
# we have petabytes of parquet data in hive format on s3 already that we
are cataloging in iceberg format.
# note that these parquet files do NOT have the partition columns in
them which is standard for hive format.
# the partition values must be taken from the iceberg metadata for the
identity partition columns as
# specified in the iceberg spec:
https://iceberg.apache.org/spec/#column-projection
# "Values for field ids which are not present in a data file must be
resolved according the following rules:
# Return the value from partition metadata if an Identity Transform
exists for the field and the partition
# value is present in the partition struct on data_file object in the
manifest. This allows for metadata
# only migrations of Hive tables."
warehouse_path = os.path.dirname(os.path.realpath(__file__))
namespace_name = "IDENTITY_PARTITION_SCAN_ISSUE_NAMESPACE"
table_name = "IDENTITY_PARTITION_SCAN_ISSUE"
catalog = get_iceberg_catalog(warehouse_path)
drop_catalog_entities_for_test(catalog, namespace_name)
# create sample hive files
catalog.create_namespace(namespace_name)
sample_hive_parquet_file =
create_sample_hive_parquet_file(warehouse_path, namespace_name, table_name,
202412)
# catalog existing hive data in iceberg
table = create_iceberg_table(catalog, namespace_name, table_name)
add_data_file(table, sample_hive_parquet_file,
table.metadata.default_spec_id)
# the partition_id columns should have values from the metadata not null
in this output
# this same iceberg metadata correctly returns the partition_id column
values in spark, athena, and snowflake
print(table.scan().to_arrow())
def get_iceberg_catalog(warehouse_path):
# using sqlite catalog on local filesystem for demo
catalog = SqlCatalog(
"default",
**{
"uri": f"sqlite:///{warehouse_path}/pyiceberg_catalog.db",
"warehouse": f"file://{warehouse_path}",
})
return catalog
def drop_catalog_entities_for_test(catalog, namespace_name):
if namespace_name in [n[0] for n in catalog.list_namespaces()]:
for _, table_name in catalog.list_tables(namespace_name):
catalog.drop_table(f"{namespace_name}.{table_name}")
catalog.drop_namespace(namespace_name)
def create_sample_hive_parquet_file(warehouse_path, namespace_name,
table_name, partition_id):
location =
f"{warehouse_path}/{namespace_name}.db/{table_name}/data/partition_id={partition_id}/data.parquet"
os.makedirs(os.path.dirname(location), exist_ok=True)
name = datetime.datetime.strptime(str(partition_id),
"%Y%m").strftime("%B %Y")
names = pa.array([name], type=pa.string())
pq.write_table(pa.table([names], names=["name"]), location)
return {
"location": location,
"file_size": os.path.getsize(location),
"partition_id": partition_id
}
def create_iceberg_table(catalog, namespace_name, table_name):
print("creating iceberg table")
schema = Schema(
NestedField(field_id=1, name="partition_id",
field_type=IntegerType(), required=False),
NestedField(field_id=2, name="name", field_type=StringType(),
required=False))
partition_spec = PartitionSpec(
PartitionField(source_id=1, field_id=1000,
transform=IdentityTransform(), name="partition_id"))
table = catalog.create_table(
f"{namespace_name}.{table_name}",
schema=schema,
partition_spec=partition_spec,
properties={TableProperties.DEFAULT_NAME_MAPPING:
create_mapping_from_schema(schema).model_dump_json()})
return table
def add_data_file(table, hive_data_file, spec_id):
print("adding data file")
parquet_metadata = pq.read_metadata(hive_data_file.get("location"))
stats_columns = compute_statistics_plan(table.schema(),
table.metadata.properties)
statistics = data_file_statistics_from_parquet_metadata(
parquet_metadata=parquet_metadata,
stats_columns=stats_columns,
parquet_column_mapping=parquet_path_to_id_mapping(table.schema()))
data_file = DataFile(
content=DataFileContent.DATA,
file_path=hive_data_file.get("location"),
file_format=FileFormat.PARQUET,
partition=Record(partition_id=hive_data_file.get("partition_id")),
file_size_in_bytes=hive_data_file.get("file_size"),
sort_order_id=None,
spec_id=spec_id,
equality_ids=None,
key_metadata=None,
**statistics.to_serialized_dict())
with table.transaction() as tx:
with tx.update_snapshot().overwrite() as update_snapshot:
update_snapshot.append_data_file(data_file)
if __name__ == "__main__":
demonstrate_identity_partition_scan_issue()
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]