Re: [PR] Partitioned Append on Identity Transform [iceberg-python]

2024-04-02 Thread via GitHub


Fokko commented on code in PR #555:
URL: https://github.com/apache/iceberg-python/pull/555#discussion_r1547810906


##
pyiceberg/manifest.py:
##
@@ -289,10 +286,7 @@ def 
partition_field_to_data_file_partition_field(partition_field_type: IcebergTy
 
 
 @partition_field_to_data_file_partition_field.register(LongType)
-@partition_field_to_data_file_partition_field.register(DateType)

Review Comment:
   This single-dispatch is there only for the `TimeType` it seems. Probably we 
should we should also convert those into a native type.



##
tests/conftest.py:
##
@@ -2000,7 +2000,11 @@ def spark() -> "SparkSession":
 'float': [0.0, None, 0.9],
 'double': [0.0, None, 0.9],
 'timestamp': [datetime(2023, 1, 1, 19, 25, 00), None, datetime(2023, 3, 1, 
19, 25, 00)],
-'timestamptz': [datetime(2023, 1, 1, 19, 25, 00), None, datetime(2023, 3, 
1, 19, 25, 00)],
+'timestamptz': [

Review Comment:
   Nice one!



##
pyiceberg/table/__init__.py:
##
@@ -3111,3 +3147,112 @@ def snapshots(self) -> "pa.Table":
 snapshots,
 schema=snapshots_schema,
 )
+
+
+@dataclass(frozen=True)
+class TablePartition:
+partition_key: PartitionKey
+arrow_table_partition: pa.Table
+
+
+def _get_partition_sort_order(partition_columns: list[str], reverse: bool = 
False) -> dict[str, Any]:
+order = 'ascending' if not reverse else 'descending'
+null_placement = 'at_start' if reverse else 'at_end'
+return {'sort_keys': [(column_name, order) for column_name in 
partition_columns], 'null_placement': null_placement}
+
+
+def group_by_partition_scheme(arrow_table: pa.Table, partition_columns: 
list[str]) -> pa.Table:
+"""Given a table, sort it by current partition scheme."""
+# only works for identity for now
+sort_options = _get_partition_sort_order(partition_columns, reverse=False)
+sorted_arrow_table = 
arrow_table.sort_by(sorting=sort_options['sort_keys'], 
null_placement=sort_options['null_placement'])
+return sorted_arrow_table
+
+
+def get_partition_columns(
+spec: PartitionSpec,
+schema: Schema,
+) -> list[str]:
+partition_cols = []
+for partition_field in spec.fields:
+column_name = schema.find_column_name(partition_field.source_id)
+if not column_name:
+raise ValueError(f"{partition_field=} could not be found in 
{schema}.")
+partition_cols.append(column_name)
+return partition_cols
+
+
+def _get_table_partitions(
+arrow_table: pa.Table,
+partition_spec: PartitionSpec,
+schema: Schema,
+slice_instructions: list[dict[str, Any]],
+) -> list[TablePartition]:
+sorted_slice_instructions = sorted(slice_instructions, key=lambda x: 
x['offset'])
+
+partition_fields = partition_spec.fields
+
+offsets = [inst["offset"] for inst in sorted_slice_instructions]
+projected_and_filtered = {
+partition_field.source_id: 
arrow_table[schema.find_field(name_or_id=partition_field.source_id).name]
+.take(offsets)
+.to_pylist()
+for partition_field in partition_fields
+}
+
+table_partitions = []
+for idx, inst in enumerate(sorted_slice_instructions):
+partition_slice = arrow_table.slice(**inst)
+fieldvalues = [
+PartitionFieldValue(partition_field, 
projected_and_filtered[partition_field.source_id][idx])
+for partition_field in partition_fields
+]
+partition_key = PartitionKey(raw_partition_field_values=fieldvalues, 
partition_spec=partition_spec, schema=schema)
+table_partitions.append(TablePartition(partition_key=partition_key, 
arrow_table_partition=partition_slice))
+return table_partitions
+
+
+def partition(spec: PartitionSpec, schema: Schema, arrow_table: pa.Table) -> 
Iterable[TablePartition]:

Review Comment:
   It would be good to have a bit more length filenames. I also think we should 
hide this from the outside user.
   ```suggestion
   def _determine_partitions(spec: PartitionSpec, schema: Schema, arrow_table: 
pa.Table) -> List[TablePartition]:
   ```
   I think we can also return a list, so folks know that it is already 
materialized.



##
tests/conftest.py:
##
@@ -2045,3 +2049,19 @@ def arrow_table_with_null(pa_schema: "pa.Schema") -> 
"pa.Table":
 
 """PyArrow table with all kinds of columns"""
 return pa.Table.from_pydict(TEST_DATA_WITH_NULL, schema=pa_schema)
+
+
+@pytest.fixture(scope="session")
+def arrow_table_without_data(pa_schema: "pa.Schema") -> "pa.Table":
+import pyarrow as pa
+
+"""PyArrow table with all kinds of columns."""

Review Comment:
   ```suggestion
   """PyArrow table with all kinds of columns."""
   
   import pyarrow as pa
   ```



##
pyiceberg/table/__init__.py:
##
@@ -1131,8 +1133,11 @@ def append(self, df: pa.Table, snapshot_properties: 
Dict[str, str] = EMPTY_DICT)
 if not isinstance(df, pa.Table):
 raise Valu

Re: [PR] Partitioned Append on Identity Transform [iceberg-python]

2024-04-02 Thread via GitHub


jqin61 commented on code in PR #555:
URL: https://github.com/apache/iceberg-python/pull/555#discussion_r1548068306


##
pyiceberg/typedef.py:
##
@@ -199,3 +199,7 @@ def __repr__(self) -> str:
 def record_fields(self) -> List[str]:
 """Return values of all the fields of the Record class except those 
specified in skip_fields."""
 return [self.__getattribute__(v) if hasattr(self, v) else None for v 
in self._position_to_field_name]
+
+def __hash__(self) -> int:
+"""Return hash value of the Record class."""
+return hash(str(self))

Review Comment:
   I think since `__repr_` is defined, the str() might still work? I tested:
   ```
   r1 = Record(1,2)
   r2 = Record(x=1, y="string value")
   print("")
   print(str(r1), hash(str(r1)))
   print(str(r2), hash(str(r2)))
   ```
   
   prints:
   ```
   Record[field1=1, field2=2] -7504199255027864703
   Record[x=1, y='string value'] -4897332691101137012
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Partitioned Append on Identity Transform [iceberg-python]

2024-04-02 Thread via GitHub


jqin61 commented on code in PR #555:
URL: https://github.com/apache/iceberg-python/pull/555#discussion_r1548140319


##
pyiceberg/table/__init__.py:
##
@@ -2526,25 +2537,44 @@ def _dataframe_to_data_files(
 """
 from pyiceberg.io.pyarrow import bin_pack_arrow_table, write_file
 
-if len([spec for spec in table_metadata.partition_specs if spec.spec_id != 
0]) > 0:
-raise ValueError("Cannot write to partitioned tables")
-
 counter = itertools.count(0)
 write_uuid = write_uuid or uuid.uuid4()
-
 target_file_size = PropertyUtil.property_as_int(
 properties=table_metadata.properties,
 property_name=TableProperties.WRITE_TARGET_FILE_SIZE_BYTES,
 default=TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT,
 )
+if target_file_size is None:
+raise ValueError(
+"Fail to get neither TableProperties.WRITE_TARGET_FILE_SIZE_BYTES 
nor WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT for writing target data file."

Review Comment:
   i just find the default value itself could be None: 
   ```PARQUET_COMPRESSION_LEVEL_DEFAULT = None```
   so this None checking is not unnecessary?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Partitioned Append on Identity Transform [iceberg-python]

2024-04-02 Thread via GitHub


jqin61 commented on code in PR #555:
URL: https://github.com/apache/iceberg-python/pull/555#discussion_r1548140319


##
pyiceberg/table/__init__.py:
##
@@ -2526,25 +2537,44 @@ def _dataframe_to_data_files(
 """
 from pyiceberg.io.pyarrow import bin_pack_arrow_table, write_file
 
-if len([spec for spec in table_metadata.partition_specs if spec.spec_id != 
0]) > 0:
-raise ValueError("Cannot write to partitioned tables")
-
 counter = itertools.count(0)
 write_uuid = write_uuid or uuid.uuid4()
-
 target_file_size = PropertyUtil.property_as_int(
 properties=table_metadata.properties,
 property_name=TableProperties.WRITE_TARGET_FILE_SIZE_BYTES,
 default=TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT,
 )
+if target_file_size is None:
+raise ValueError(
+"Fail to get neither TableProperties.WRITE_TARGET_FILE_SIZE_BYTES 
nor WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT for writing target data file."

Review Comment:
   i just find the default value itself could be None: 
   ```PARQUET_COMPRESSION_LEVEL_DEFAULT = None```
   so this None checking is not unnecessary?
   
   the original code for this target_file_size just type: ignores it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Partitioned Append on Identity Transform [iceberg-python]

2024-04-02 Thread via GitHub


jqin61 commented on code in PR #555:
URL: https://github.com/apache/iceberg-python/pull/555#discussion_r1548140319


##
pyiceberg/table/__init__.py:
##
@@ -2526,25 +2537,44 @@ def _dataframe_to_data_files(
 """
 from pyiceberg.io.pyarrow import bin_pack_arrow_table, write_file
 
-if len([spec for spec in table_metadata.partition_specs if spec.spec_id != 
0]) > 0:
-raise ValueError("Cannot write to partitioned tables")
-
 counter = itertools.count(0)
 write_uuid = write_uuid or uuid.uuid4()
-
 target_file_size = PropertyUtil.property_as_int(
 properties=table_metadata.properties,
 property_name=TableProperties.WRITE_TARGET_FILE_SIZE_BYTES,
 default=TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT,
 )
+if target_file_size is None:
+raise ValueError(
+"Fail to get neither TableProperties.WRITE_TARGET_FILE_SIZE_BYTES 
nor WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT for writing target data file."

Review Comment:
   i just find the default value itself could be None: 
   ```PARQUET_COMPRESSION_LEVEL_DEFAULT = None```
   so this None checking is not unnecessary?
   
   the original code for this target_file_size check just `type: ignores` it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Partitioned Append on Identity Transform [iceberg-python]

2024-04-03 Thread via GitHub


Fokko commented on code in PR #555:
URL: https://github.com/apache/iceberg-python/pull/555#discussion_r1550332689


##
pyiceberg/typedef.py:
##
@@ -199,3 +199,7 @@ def __repr__(self) -> str:
 def record_fields(self) -> List[str]:
 """Return values of all the fields of the Record class except those 
specified in skip_fields."""
 return [self.__getattribute__(v) if hasattr(self, v) else None for v 
in self._position_to_field_name]
+
+def __hash__(self) -> int:
+"""Return hash value of the Record class."""
+return hash(str(self))

Review Comment:
   Hmm, that looks good 👍  I'm always seeing empty records in PyCharm



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Partitioned Append on Identity Transform [iceberg-python]

2024-04-04 Thread via GitHub


Fokko commented on code in PR #555:
URL: https://github.com/apache/iceberg-python/pull/555#discussion_r1552302617


##
pyiceberg/manifest.py:
##
@@ -283,31 +277,12 @@ def __repr__(self) -> str:
 }
 
 
-@singledispatch
-def partition_field_to_data_file_partition_field(partition_field_type: 
IcebergType) -> PrimitiveType:
-raise TypeError(f"Unsupported partition field type: 
{partition_field_type}")
-
-
-@partition_field_to_data_file_partition_field.register(LongType)
-@partition_field_to_data_file_partition_field.register(DateType)
-@partition_field_to_data_file_partition_field.register(TimeType)
-@partition_field_to_data_file_partition_field.register(TimestampType)
-@partition_field_to_data_file_partition_field.register(TimestamptzType)
-def _(partition_field_type: PrimitiveType) -> IntegerType:
-return IntegerType()
-
-
-@partition_field_to_data_file_partition_field.register(PrimitiveType)
-def _(partition_field_type: PrimitiveType) -> PrimitiveType:
-return partition_field_type
-
-
-def data_file_with_partition(partition_type: StructType, format_version: 
TableVersion) -> StructType:
+def data_file_with_partition(partition_type: StructType, format_version: 
Literal[1, 2]) -> StructType:

Review Comment:
   Nit:
   ```suggestion
   def data_file_with_partition(partition_type: StructType, format_version: 
TableVersion) -> StructType:
   ```



##
pyiceberg/manifest.py:
##
@@ -289,10 +286,7 @@ def 
partition_field_to_data_file_partition_field(partition_field_type: IcebergTy
 
 
 @partition_field_to_data_file_partition_field.register(LongType)
-@partition_field_to_data_file_partition_field.register(DateType)

Review Comment:
   Beautiful, thanks 👍 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Partitioned Append on Identity Transform [iceberg-python]

2024-04-05 Thread via GitHub


Fokko merged PR #555:
URL: https://github.com/apache/iceberg-python/pull/555


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Partitioned Append on Identity Transform [iceberg-python]

2024-03-28 Thread via GitHub


jqin61 commented on code in PR #555:
URL: https://github.com/apache/iceberg-python/pull/555#discussion_r1543793736


##
pyiceberg/table/__init__.py:
##
@@ -2526,25 +2537,44 @@ def _dataframe_to_data_files(
 """
 from pyiceberg.io.pyarrow import bin_pack_arrow_table, write_file
 
-if len([spec for spec in table_metadata.partition_specs if spec.spec_id != 
0]) > 0:
-raise ValueError("Cannot write to partitioned tables")
-
 counter = itertools.count(0)
 write_uuid = write_uuid or uuid.uuid4()
-
 target_file_size = PropertyUtil.property_as_int(
 properties=table_metadata.properties,
 property_name=TableProperties.WRITE_TARGET_FILE_SIZE_BYTES,
 default=TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT,
 )
+if target_file_size is None:
+raise ValueError(
+"Fail to get neither TableProperties.WRITE_TARGET_FILE_SIZE_BYTES 
nor WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT for writing target data file."
+)
 
-# This is an iter, so we don't have to materialize everything every time
-# This will be more relevant when we start doing partitioned writes
-yield from write_file(
-io=io,
-table_metadata=table_metadata,
-tasks=iter([WriteTask(write_uuid, next(counter), batches) for batches 
in bin_pack_arrow_table(df, target_file_size)]),  # type: ignore
-)
+if any(len(spec.fields) > 0 for spec in table_metadata.partition_specs):

Review Comment:
   It seems the old line was not checking whether the table is partitioned but 
was checking partition evolution?
   ```if len([spec for spec in table_metadata.partition_specs if spec.spec_id 
!= 0]) > 0:```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Partitioned Append on Identity Transform [iceberg-python]

2024-03-28 Thread via GitHub


jqin61 commented on code in PR #555:
URL: https://github.com/apache/iceberg-python/pull/555#discussion_r1543796652


##
tests/integration/test_partitioned_write.py:
##
@@ -0,0 +1,533 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+# pylint:disable=redefined-outer-name
+import uuid
+from datetime import date, datetime
+
+import pyarrow as pa
+import pytest
+import pytz
+from pyspark.sql import SparkSession
+
+from pyiceberg.catalog import Catalog, load_catalog
+from pyiceberg.exceptions import NamespaceAlreadyExistsError, NoSuchTableError
+from pyiceberg.partitioning import PartitionField, PartitionSpec
+from pyiceberg.schema import Schema
+from pyiceberg.transforms import IdentityTransform
+from pyiceberg.types import (
+BinaryType,
+BooleanType,
+DateType,
+DoubleType,
+FixedType,
+FloatType,
+IntegerType,
+LongType,
+NestedField,
+StringType,
+TimestampType,
+TimestamptzType,
+)
+
+
+@pytest.fixture()
+def catalog() -> Catalog:
+catalog = load_catalog(
+"local",
+**{
+"type": "rest",
+"uri": "http://localhost:8181";,
+"s3.endpoint": "http://localhost:9000";,
+"s3.access-key-id": "admin",
+"s3.secret-access-key": "password",
+},
+)
+
+try:
+catalog.create_namespace("default")
+except NamespaceAlreadyExistsError:
+pass
+
+return catalog
+
+
+TEST_DATA_WITH_NULL = {
+'bool': [False, None, True],
+'string': ['a', None, 'z'],
+# Go over the 16 bytes to kick in truncation
+'string_long': ['a' * 22, None, 'z' * 22],
+'int': [1, None, 9],
+'long': [1, None, 9],
+'float': [0.0, None, 0.9],
+'double': [0.0, None, 0.9],
+'timestamp': [datetime(2023, 1, 1, 19, 25, 00), None, datetime(2023, 3, 1, 
19, 25, 00)],
+'timestamptz': [
+datetime(2023, 1, 1, 19, 25, 00, 
tzinfo=pytz.timezone('America/New_York')),
+None,
+datetime(2023, 3, 1, 19, 25, 00, 
tzinfo=pytz.timezone('America/New_York')),
+],
+'date': [date(2023, 1, 1), None, date(2023, 3, 1)],
+# Not supported by Spark
+# 'time': [time(1, 22, 0), None, time(19, 25, 0)],
+# Not natively supported by Arrow
+# 'uuid': [uuid.UUID('----').bytes, None, 
uuid.UUID('----').bytes],
+'binary': [b'\01', None, b'\22'],
+'fixed': [
+uuid.UUID('----').bytes,
+None,
+uuid.UUID('----').bytes,
+],
+}
+
+
+TABLE_SCHEMA = Schema(
+NestedField(field_id=1, name="bool", field_type=BooleanType(), 
required=False),
+NestedField(field_id=2, name="string", field_type=StringType(), 
required=False),
+NestedField(field_id=3, name="string_long", field_type=StringType(), 
required=False),
+NestedField(field_id=4, name="int", field_type=IntegerType(), 
required=False),
+NestedField(field_id=5, name="long", field_type=LongType(), 
required=False),
+NestedField(field_id=6, name="float", field_type=FloatType(), 
required=False),
+NestedField(field_id=7, name="double", field_type=DoubleType(), 
required=False),
+NestedField(field_id=8, name="timestamp", field_type=TimestampType(), 
required=False),
+NestedField(field_id=9, name="timestamptz", field_type=TimestamptzType(), 
required=False),
+NestedField(field_id=10, name="date", field_type=DateType(), 
required=False),
+# NestedField(field_id=11, name="time", field_type=TimeType(), 
required=False),
+# NestedField(field_id=12, name="uuid", field_type=UuidType(), 
required=False),
+NestedField(field_id=11, name="binary", field_type=BinaryType(), 
required=False),
+NestedField(field_id=12, name="fixed", field_type=FixedType(16), 
required=False),
+)
+
+
+@pytest.fixture(scope="session")
+def session_catalog() -> Catalog:
+return load_catalog(
+"local",
+**{
+"type": "rest",
+"uri": "http://localhost:8181";,
+"s3.endpoint": "http://localhost:9000";,
+"s3.access-key-id": "admin",
+"s3.secret-access-key": "password",
+},
+)
+
+
+@pytest.fi

Re: [PR] Partitioned Append on Identity Transform [iceberg-python]

2024-03-28 Thread via GitHub


jqin61 commented on code in PR #555:
URL: https://github.com/apache/iceberg-python/pull/555#discussion_r1543796831


##
tests/integration/test_partitioned_write.py:
##
@@ -0,0 +1,533 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+# pylint:disable=redefined-outer-name
+import uuid
+from datetime import date, datetime
+
+import pyarrow as pa
+import pytest
+import pytz
+from pyspark.sql import SparkSession
+
+from pyiceberg.catalog import Catalog, load_catalog
+from pyiceberg.exceptions import NamespaceAlreadyExistsError, NoSuchTableError
+from pyiceberg.partitioning import PartitionField, PartitionSpec
+from pyiceberg.schema import Schema
+from pyiceberg.transforms import IdentityTransform
+from pyiceberg.types import (
+BinaryType,
+BooleanType,
+DateType,
+DoubleType,
+FixedType,
+FloatType,
+IntegerType,
+LongType,
+NestedField,
+StringType,
+TimestampType,
+TimestamptzType,
+)
+
+
+@pytest.fixture()
+def catalog() -> Catalog:
+catalog = load_catalog(
+"local",
+**{
+"type": "rest",
+"uri": "http://localhost:8181";,
+"s3.endpoint": "http://localhost:9000";,
+"s3.access-key-id": "admin",
+"s3.secret-access-key": "password",
+},
+)
+
+try:
+catalog.create_namespace("default")
+except NamespaceAlreadyExistsError:
+pass
+
+return catalog
+
+
+TEST_DATA_WITH_NULL = {
+'bool': [False, None, True],
+'string': ['a', None, 'z'],
+# Go over the 16 bytes to kick in truncation
+'string_long': ['a' * 22, None, 'z' * 22],
+'int': [1, None, 9],
+'long': [1, None, 9],
+'float': [0.0, None, 0.9],
+'double': [0.0, None, 0.9],
+'timestamp': [datetime(2023, 1, 1, 19, 25, 00), None, datetime(2023, 3, 1, 
19, 25, 00)],
+'timestamptz': [
+datetime(2023, 1, 1, 19, 25, 00, 
tzinfo=pytz.timezone('America/New_York')),
+None,
+datetime(2023, 3, 1, 19, 25, 00, 
tzinfo=pytz.timezone('America/New_York')),
+],
+'date': [date(2023, 1, 1), None, date(2023, 3, 1)],
+# Not supported by Spark
+# 'time': [time(1, 22, 0), None, time(19, 25, 0)],
+# Not natively supported by Arrow
+# 'uuid': [uuid.UUID('----').bytes, None, 
uuid.UUID('----').bytes],
+'binary': [b'\01', None, b'\22'],
+'fixed': [
+uuid.UUID('----').bytes,
+None,
+uuid.UUID('----').bytes,
+],
+}
+
+
+TABLE_SCHEMA = Schema(
+NestedField(field_id=1, name="bool", field_type=BooleanType(), 
required=False),
+NestedField(field_id=2, name="string", field_type=StringType(), 
required=False),
+NestedField(field_id=3, name="string_long", field_type=StringType(), 
required=False),
+NestedField(field_id=4, name="int", field_type=IntegerType(), 
required=False),
+NestedField(field_id=5, name="long", field_type=LongType(), 
required=False),
+NestedField(field_id=6, name="float", field_type=FloatType(), 
required=False),
+NestedField(field_id=7, name="double", field_type=DoubleType(), 
required=False),
+NestedField(field_id=8, name="timestamp", field_type=TimestampType(), 
required=False),
+NestedField(field_id=9, name="timestamptz", field_type=TimestamptzType(), 
required=False),
+NestedField(field_id=10, name="date", field_type=DateType(), 
required=False),
+# NestedField(field_id=11, name="time", field_type=TimeType(), 
required=False),
+# NestedField(field_id=12, name="uuid", field_type=UuidType(), 
required=False),
+NestedField(field_id=11, name="binary", field_type=BinaryType(), 
required=False),
+NestedField(field_id=12, name="fixed", field_type=FixedType(16), 
required=False),
+)
+
+
+@pytest.fixture(scope="session")
+def session_catalog() -> Catalog:

Review Comment:
   moving to conftest



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please 

Re: [PR] Partitioned Append on Identity Transform [iceberg-python]

2024-03-28 Thread via GitHub


jqin61 commented on code in PR #555:
URL: https://github.com/apache/iceberg-python/pull/555#discussion_r1543797231


##
tests/integration/test_partitioned_write.py:
##
@@ -0,0 +1,533 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+# pylint:disable=redefined-outer-name
+import uuid
+from datetime import date, datetime
+
+import pyarrow as pa
+import pytest
+import pytz
+from pyspark.sql import SparkSession
+
+from pyiceberg.catalog import Catalog, load_catalog
+from pyiceberg.exceptions import NamespaceAlreadyExistsError, NoSuchTableError
+from pyiceberg.partitioning import PartitionField, PartitionSpec
+from pyiceberg.schema import Schema
+from pyiceberg.transforms import IdentityTransform
+from pyiceberg.types import (
+BinaryType,
+BooleanType,
+DateType,
+DoubleType,
+FixedType,
+FloatType,
+IntegerType,
+LongType,
+NestedField,
+StringType,
+TimestampType,
+TimestamptzType,
+)
+
+
+@pytest.fixture()
+def catalog() -> Catalog:
+catalog = load_catalog(
+"local",
+**{
+"type": "rest",
+"uri": "http://localhost:8181";,
+"s3.endpoint": "http://localhost:9000";,
+"s3.access-key-id": "admin",
+"s3.secret-access-key": "password",
+},
+)
+
+try:
+catalog.create_namespace("default")
+except NamespaceAlreadyExistsError:
+pass
+
+return catalog
+
+
+TEST_DATA_WITH_NULL = {

Review Comment:
   duplicate with test_writes, to cleanup



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Partitioned Append on Identity Transform [iceberg-python]

2024-03-28 Thread via GitHub


jqin61 commented on code in PR #555:
URL: https://github.com/apache/iceberg-python/pull/555#discussion_r1543796831


##
tests/integration/test_partitioned_write.py:
##
@@ -0,0 +1,533 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+# pylint:disable=redefined-outer-name
+import uuid
+from datetime import date, datetime
+
+import pyarrow as pa
+import pytest
+import pytz
+from pyspark.sql import SparkSession
+
+from pyiceberg.catalog import Catalog, load_catalog
+from pyiceberg.exceptions import NamespaceAlreadyExistsError, NoSuchTableError
+from pyiceberg.partitioning import PartitionField, PartitionSpec
+from pyiceberg.schema import Schema
+from pyiceberg.transforms import IdentityTransform
+from pyiceberg.types import (
+BinaryType,
+BooleanType,
+DateType,
+DoubleType,
+FixedType,
+FloatType,
+IntegerType,
+LongType,
+NestedField,
+StringType,
+TimestampType,
+TimestamptzType,
+)
+
+
+@pytest.fixture()
+def catalog() -> Catalog:
+catalog = load_catalog(
+"local",
+**{
+"type": "rest",
+"uri": "http://localhost:8181";,
+"s3.endpoint": "http://localhost:9000";,
+"s3.access-key-id": "admin",
+"s3.secret-access-key": "password",
+},
+)
+
+try:
+catalog.create_namespace("default")
+except NamespaceAlreadyExistsError:
+pass
+
+return catalog
+
+
+TEST_DATA_WITH_NULL = {
+'bool': [False, None, True],
+'string': ['a', None, 'z'],
+# Go over the 16 bytes to kick in truncation
+'string_long': ['a' * 22, None, 'z' * 22],
+'int': [1, None, 9],
+'long': [1, None, 9],
+'float': [0.0, None, 0.9],
+'double': [0.0, None, 0.9],
+'timestamp': [datetime(2023, 1, 1, 19, 25, 00), None, datetime(2023, 3, 1, 
19, 25, 00)],
+'timestamptz': [
+datetime(2023, 1, 1, 19, 25, 00, 
tzinfo=pytz.timezone('America/New_York')),
+None,
+datetime(2023, 3, 1, 19, 25, 00, 
tzinfo=pytz.timezone('America/New_York')),
+],
+'date': [date(2023, 1, 1), None, date(2023, 3, 1)],
+# Not supported by Spark
+# 'time': [time(1, 22, 0), None, time(19, 25, 0)],
+# Not natively supported by Arrow
+# 'uuid': [uuid.UUID('----').bytes, None, 
uuid.UUID('----').bytes],
+'binary': [b'\01', None, b'\22'],
+'fixed': [
+uuid.UUID('----').bytes,
+None,
+uuid.UUID('----').bytes,
+],
+}
+
+
+TABLE_SCHEMA = Schema(
+NestedField(field_id=1, name="bool", field_type=BooleanType(), 
required=False),
+NestedField(field_id=2, name="string", field_type=StringType(), 
required=False),
+NestedField(field_id=3, name="string_long", field_type=StringType(), 
required=False),
+NestedField(field_id=4, name="int", field_type=IntegerType(), 
required=False),
+NestedField(field_id=5, name="long", field_type=LongType(), 
required=False),
+NestedField(field_id=6, name="float", field_type=FloatType(), 
required=False),
+NestedField(field_id=7, name="double", field_type=DoubleType(), 
required=False),
+NestedField(field_id=8, name="timestamp", field_type=TimestampType(), 
required=False),
+NestedField(field_id=9, name="timestamptz", field_type=TimestamptzType(), 
required=False),
+NestedField(field_id=10, name="date", field_type=DateType(), 
required=False),
+# NestedField(field_id=11, name="time", field_type=TimeType(), 
required=False),
+# NestedField(field_id=12, name="uuid", field_type=UuidType(), 
required=False),
+NestedField(field_id=11, name="binary", field_type=BinaryType(), 
required=False),
+NestedField(field_id=12, name="fixed", field_type=FixedType(16), 
required=False),
+)
+
+
+@pytest.fixture(scope="session")
+def session_catalog() -> Catalog:

Review Comment:
   moving to conftest



##
tests/integration/test_partitioned_write.py:
##
@@ -0,0 +1,533 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+

Re: [PR] Partitioned Append on Identity Transform [iceberg-python]

2024-03-28 Thread via GitHub


syun64 commented on code in PR #555:
URL: https://github.com/apache/iceberg-python/pull/555#discussion_r1544015221


##
pyiceberg/table/__init__.py:
##
@@ -2526,25 +2537,44 @@ def _dataframe_to_data_files(
 """
 from pyiceberg.io.pyarrow import bin_pack_arrow_table, write_file
 
-if len([spec for spec in table_metadata.partition_specs if spec.spec_id != 
0]) > 0:
-raise ValueError("Cannot write to partitioned tables")
-
 counter = itertools.count(0)
 write_uuid = write_uuid or uuid.uuid4()
-
 target_file_size = PropertyUtil.property_as_int(
 properties=table_metadata.properties,
 property_name=TableProperties.WRITE_TARGET_FILE_SIZE_BYTES,
 default=TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT,
 )
+if target_file_size is None:
+raise ValueError(
+"Fail to get neither TableProperties.WRITE_TARGET_FILE_SIZE_BYTES 
nor WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT for writing target data file."

Review Comment:
   I have mixed feelings about this exception check, because we are setting the 
default value of `target_file_size` as 
TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT right in the previous 
line. I feel as though this is too redundant.
   
   I understand why we are doing it though: 
   
   `PropertyUtil.property_as_int` returns `Optional[int]`, and bin_packing 
expects an int, so we need to type check it. 
   
   If we run into more of these type checking redundancies in the code base, 
where when we are using property values that are always expected to have a 
none-null default value, maybe we should refactor `PropertyUtil` instead. Maybe 
we can have two methods, `property_as_int` that returns an `Optional[int]`, and 
`property_as_int_with_default`, that returns an `int`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Partitioned Append on Identity Transform [iceberg-python]

2024-03-28 Thread via GitHub


syun64 commented on code in PR #555:
URL: https://github.com/apache/iceberg-python/pull/555#discussion_r1544039629


##
pyiceberg/table/__init__.py:
##
@@ -3108,3 +3138,127 @@ def snapshots(self) -> "pa.Table":
 snapshots,
 schema=snapshots_schema,
 )
+
+
+@dataclass(frozen=True)
+class TablePartition:
+partition_key: PartitionKey
+arrow_table_partition: pa.Table
+
+
+def _get_partition_sort_order(partition_columns: list[str], reverse: bool = 
False) -> dict[str, Any]:
+order = 'ascending' if not reverse else 'descending'
+null_placement = 'at_start' if reverse else 'at_end'
+return {'sort_keys': [(column_name, order) for column_name in 
partition_columns], 'null_placement': null_placement}
+
+
+def group_by_partition_scheme(
+iceberg_table_metadata: TableMetadata, arrow_table: pa.Table, 
partition_columns: list[str]
+) -> pa.Table:
+"""Given a table sort it by current partition scheme with all transform 
functions supported."""
+from pyiceberg.transforms import IdentityTransform
+
+supported = {IdentityTransform}
+if not all(
+type(field.transform) in supported for field in 
iceberg_table_metadata.spec().fields if field in partition_columns
+):
+raise ValueError(
+f"Not all transforms are supported, get: {[transform in supported 
for transform in iceberg_table_metadata.spec().fields]}."
+)
+
+# only works for identity
+sort_options = _get_partition_sort_order(partition_columns, reverse=False)
+sorted_arrow_table = 
arrow_table.sort_by(sorting=sort_options['sort_keys'], 
null_placement=sort_options['null_placement'])
+return sorted_arrow_table
+
+
+def get_partition_columns(iceberg_table_metadata: TableMetadata, arrow_table: 
pa.Table) -> list[str]:
+arrow_table_cols = set(arrow_table.column_names)
+partition_cols = []
+for transform_field in iceberg_table_metadata.spec().fields:
+column_name = 
iceberg_table_metadata.schema().find_column_name(transform_field.source_id)
+if not column_name:
+raise ValueError(f"{transform_field=} could not be found in 
{iceberg_table_metadata.schema()}.")
+if column_name not in arrow_table_cols:
+continue
+partition_cols.append(column_name)
+return partition_cols
+
+
+def _get_table_partitions(
+arrow_table: pa.Table,
+partition_spec: PartitionSpec,
+schema: Schema,
+slice_instructions: list[dict[str, Any]],
+) -> list[TablePartition]:
+sorted_slice_instructions = sorted(slice_instructions, key=lambda x: 
x['offset'])
+
+partition_fields = partition_spec.fields
+
+offsets = [inst["offset"] for inst in sorted_slice_instructions]
+projected_and_filtered = {
+partition_field.source_id: 
arrow_table[schema.find_field(name_or_id=partition_field.source_id).name]
+.take(offsets)
+.to_pylist()
+for partition_field in partition_fields
+}
+
+table_partitions = []
+for inst in sorted_slice_instructions:
+partition_slice = arrow_table.slice(**inst)
+fieldvalues = [
+PartitionFieldValue(partition_field, 
projected_and_filtered[partition_field.source_id][inst["offset"]])
+for partition_field in partition_fields
+]
+partition_key = PartitionKey(raw_partition_field_values=fieldvalues, 
partition_spec=partition_spec, schema=schema)
+table_partitions.append(TablePartition(partition_key=partition_key, 
arrow_table_partition=partition_slice))
+
+return table_partitions
+
+
+def partition(iceberg_table_metadata: TableMetadata, arrow_table: pa.Table) -> 
Iterable[TablePartition]:
+"""Based on the iceberg table partition spec, slice the arrow table into 
partitions with their keys.
+
+Example:
+Input:
+An arrow table with partition key of ['n_legs', 'year'] and with data of
+{'year': [2020, 2022, 2022, 2021, 2022, 2022, 2022, 2019, 2021],
+ 'n_legs': [2, 2, 2, 4, 4, 4, 4, 5, 100],
+ 'animal': ["Flamingo", "Parrot", "Parrot", "Dog", "Horse", "Horse", 
"Horse","Brittle stars", "Centipede"]}.
+The algrithm:
+Firstly we group the rows into partitions by sorting with sort order 
[('n_legs', 'descending'), ('year', 'descending')]
+and null_placement of "at_end".
+This gives the same table as raw input.
+Then we sort_indices using reverse order of [('n_legs', 'descending'), 
('year', 'descending')]
+and null_placement : "at_start".
+This gives:
+[8, 7, 4, 5, 6, 3, 1, 2, 0]
+Based on this we get partition groups of indices:
+[{'offset': 8, 'length': 1}, {'offset': 7, 'length': 1}, {'offset': 4, 
'length': 3}, {'offset': 3, 'length': 1}, {'offset': 1, 'length': 2}, 
{'offset': 0, 'length': 1}]
+We then retrieve the partition keys by offsets.
+And slice the arrow table by offsets and lengths of each partition.
+"""
+import pyarrow as pa
+
+partition_columns = get_p

Re: [PR] Partitioned Append on Identity Transform [iceberg-python]

2024-03-29 Thread via GitHub


jqin61 commented on code in PR #555:
URL: https://github.com/apache/iceberg-python/pull/555#discussion_r1544752616


##
pyiceberg/table/__init__.py:
##
@@ -2489,16 +2488,28 @@ def _add_and_move_fields(
 class WriteTask:
 write_uuid: uuid.UUID
 task_id: int
+schema: Schema
 record_batches: List[pa.RecordBatch]
 sort_order_id: Optional[int] = None
+partition_key: Optional[PartitionKey] = None
 
-# Later to be extended with partition information
+def generate_data_file_partition_path(self) -> str:
+if self.partition_key is None:
+raise ValueError("Cannot generate partition path based on 
non-partitioned WriteTask")
+return self.partition_key.to_path()
 
 def generate_data_file_filename(self, extension: str) -> str:
 # Mimics the behavior in the Java API:
 # 
https://github.com/apache/iceberg/blob/a582968975dd30ff4917fbbe999f1be903efac02/core/src/main/java/org/apache/iceberg/io/OutputFileFactory.java#L92-L101
 return f"0-{self.task_id}-{self.write_uuid}.{extension}"
 
+def generate_data_file_path(self, extension: str) -> str:
+if self.partition_key:
+file_path = f"{self.generate_data_file_partition_path()}/{self. 
generate_data_file_filename(extension)}"

Review Comment:
   resolved in the incoming commit



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Partitioned Append on Identity Transform [iceberg-python]

2024-03-29 Thread via GitHub


jqin61 commented on code in PR #555:
URL: https://github.com/apache/iceberg-python/pull/555#discussion_r1544754492


##
pyiceberg/table/__init__.py:
##
@@ -2526,25 +2537,44 @@ def _dataframe_to_data_files(
 """
 from pyiceberg.io.pyarrow import bin_pack_arrow_table, write_file
 
-if len([spec for spec in table_metadata.partition_specs if spec.spec_id != 
0]) > 0:
-raise ValueError("Cannot write to partitioned tables")
-
 counter = itertools.count(0)
 write_uuid = write_uuid or uuid.uuid4()
-
 target_file_size = PropertyUtil.property_as_int(
 properties=table_metadata.properties,
 property_name=TableProperties.WRITE_TARGET_FILE_SIZE_BYTES,
 default=TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT,
 )
+if target_file_size is None:
+raise ValueError(
+"Fail to get neither TableProperties.WRITE_TARGET_FILE_SIZE_BYTES 
nor WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT for writing target data file."

Review Comment:
   property_as_int_with_default sounds better to me because all the exceptions 
raised due to missing default  property could be centralized in the function? 
How do you feel about it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Partitioned Append on Identity Transform [iceberg-python]

2024-03-29 Thread via GitHub


jqin61 commented on code in PR #555:
URL: https://github.com/apache/iceberg-python/pull/555#discussion_r1544754857


##
pyiceberg/table/__init__.py:
##
@@ -2526,25 +2537,44 @@ def _dataframe_to_data_files(
 """
 from pyiceberg.io.pyarrow import bin_pack_arrow_table, write_file
 
-if len([spec for spec in table_metadata.partition_specs if spec.spec_id != 
0]) > 0:
-raise ValueError("Cannot write to partitioned tables")
-
 counter = itertools.count(0)
 write_uuid = write_uuid or uuid.uuid4()
-
 target_file_size = PropertyUtil.property_as_int(
 properties=table_metadata.properties,
 property_name=TableProperties.WRITE_TARGET_FILE_SIZE_BYTES,
 default=TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT,
 )
+if target_file_size is None:
+raise ValueError(
+"Fail to get neither TableProperties.WRITE_TARGET_FILE_SIZE_BYTES 
nor WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT for writing target data file."
+)
 
-# This is an iter, so we don't have to materialize everything every time
-# This will be more relevant when we start doing partitioned writes
-yield from write_file(
-io=io,
-table_metadata=table_metadata,
-tasks=iter([WriteTask(write_uuid, next(counter), batches) for batches 
in bin_pack_arrow_table(df, target_file_size)]),  # type: ignore
-)
+if any(len(spec.fields) > 0 for spec in table_metadata.partition_specs):

Review Comment:
   yes recent makes more sense.



##
pyiceberg/table/__init__.py:
##
@@ -2526,25 +2537,44 @@ def _dataframe_to_data_files(
 """
 from pyiceberg.io.pyarrow import bin_pack_arrow_table, write_file
 
-if len([spec for spec in table_metadata.partition_specs if spec.spec_id != 
0]) > 0:
-raise ValueError("Cannot write to partitioned tables")
-
 counter = itertools.count(0)
 write_uuid = write_uuid or uuid.uuid4()
-
 target_file_size = PropertyUtil.property_as_int(
 properties=table_metadata.properties,
 property_name=TableProperties.WRITE_TARGET_FILE_SIZE_BYTES,
 default=TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT,
 )
+if target_file_size is None:
+raise ValueError(
+"Fail to get neither TableProperties.WRITE_TARGET_FILE_SIZE_BYTES 
nor WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT for writing target data file."
+)
 
-# This is an iter, so we don't have to materialize everything every time
-# This will be more relevant when we start doing partitioned writes
-yield from write_file(
-io=io,
-table_metadata=table_metadata,
-tasks=iter([WriteTask(write_uuid, next(counter), batches) for batches 
in bin_pack_arrow_table(df, target_file_size)]),  # type: ignore
-)
+if any(len(spec.fields) > 0 for spec in table_metadata.partition_specs):

Review Comment:
   yes recent makes more sense. fixed in the incoming commit.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Partitioned Append on Identity Transform [iceberg-python]

2024-03-29 Thread via GitHub


jqin61 commented on code in PR #555:
URL: https://github.com/apache/iceberg-python/pull/555#discussion_r1544756546


##
pyiceberg/table/__init__.py:
##
@@ -3108,3 +3138,127 @@ def snapshots(self) -> "pa.Table":
 snapshots,
 schema=snapshots_schema,
 )
+
+
+@dataclass(frozen=True)
+class TablePartition:
+partition_key: PartitionKey
+arrow_table_partition: pa.Table
+
+
+def _get_partition_sort_order(partition_columns: list[str], reverse: bool = 
False) -> dict[str, Any]:
+order = 'ascending' if not reverse else 'descending'
+null_placement = 'at_start' if reverse else 'at_end'
+return {'sort_keys': [(column_name, order) for column_name in 
partition_columns], 'null_placement': null_placement}
+
+
+def group_by_partition_scheme(
+iceberg_table_metadata: TableMetadata, arrow_table: pa.Table, 
partition_columns: list[str]
+) -> pa.Table:
+"""Given a table sort it by current partition scheme with all transform 
functions supported."""
+from pyiceberg.transforms import IdentityTransform
+
+supported = {IdentityTransform}
+if not all(
+type(field.transform) in supported for field in 
iceberg_table_metadata.spec().fields if field in partition_columns
+):
+raise ValueError(
+f"Not all transforms are supported, get: {[transform in supported 
for transform in iceberg_table_metadata.spec().fields]}."
+)
+
+# only works for identity
+sort_options = _get_partition_sort_order(partition_columns, reverse=False)
+sorted_arrow_table = 
arrow_table.sort_by(sorting=sort_options['sort_keys'], 
null_placement=sort_options['null_placement'])
+return sorted_arrow_table
+
+
+def get_partition_columns(iceberg_table_metadata: TableMetadata, arrow_table: 
pa.Table) -> list[str]:
+arrow_table_cols = set(arrow_table.column_names)
+partition_cols = []
+for transform_field in iceberg_table_metadata.spec().fields:

Review Comment:
   YES 👍  I changed the name in a latter commit in the origin PR. Overlooked it 
here. Resolved in the incoming commit.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Partitioned Append on Identity Transform [iceberg-python]

2024-03-29 Thread via GitHub


jqin61 commented on code in PR #555:
URL: https://github.com/apache/iceberg-python/pull/555#discussion_r1544770143


##
pyiceberg/table/__init__.py:
##
@@ -3108,3 +3138,127 @@ def snapshots(self) -> "pa.Table":
 snapshots,
 schema=snapshots_schema,
 )
+
+
+@dataclass(frozen=True)
+class TablePartition:
+partition_key: PartitionKey
+arrow_table_partition: pa.Table
+
+
+def _get_partition_sort_order(partition_columns: list[str], reverse: bool = 
False) -> dict[str, Any]:
+order = 'ascending' if not reverse else 'descending'
+null_placement = 'at_start' if reverse else 'at_end'
+return {'sort_keys': [(column_name, order) for column_name in 
partition_columns], 'null_placement': null_placement}
+
+
+def group_by_partition_scheme(
+iceberg_table_metadata: TableMetadata, arrow_table: pa.Table, 
partition_columns: list[str]
+) -> pa.Table:
+"""Given a table sort it by current partition scheme with all transform 
functions supported."""
+from pyiceberg.transforms import IdentityTransform
+
+supported = {IdentityTransform}
+if not all(
+type(field.transform) in supported for field in 
iceberg_table_metadata.spec().fields if field in partition_columns
+):
+raise ValueError(
+f"Not all transforms are supported, get: {[transform in supported 
for transform in iceberg_table_metadata.spec().fields]}."
+)
+
+# only works for identity
+sort_options = _get_partition_sort_order(partition_columns, reverse=False)
+sorted_arrow_table = 
arrow_table.sort_by(sorting=sort_options['sort_keys'], 
null_placement=sort_options['null_placement'])
+return sorted_arrow_table
+
+
+def get_partition_columns(iceberg_table_metadata: TableMetadata, arrow_table: 
pa.Table) -> list[str]:
+arrow_table_cols = set(arrow_table.column_names)
+partition_cols = []
+for transform_field in iceberg_table_metadata.spec().fields:
+column_name = 
iceberg_table_metadata.schema().find_column_name(transform_field.source_id)
+if not column_name:
+raise ValueError(f"{transform_field=} could not be found in 
{iceberg_table_metadata.schema()}.")
+if column_name not in arrow_table_cols:
+continue
+partition_cols.append(column_name)
+return partition_cols
+
+
+def _get_table_partitions(
+arrow_table: pa.Table,
+partition_spec: PartitionSpec,
+schema: Schema,
+slice_instructions: list[dict[str, Any]],
+) -> list[TablePartition]:
+sorted_slice_instructions = sorted(slice_instructions, key=lambda x: 
x['offset'])
+
+partition_fields = partition_spec.fields
+
+offsets = [inst["offset"] for inst in sorted_slice_instructions]
+projected_and_filtered = {
+partition_field.source_id: 
arrow_table[schema.find_field(name_or_id=partition_field.source_id).name]
+.take(offsets)
+.to_pylist()
+for partition_field in partition_fields
+}
+
+table_partitions = []
+for inst in sorted_slice_instructions:
+partition_slice = arrow_table.slice(**inst)
+fieldvalues = [
+PartitionFieldValue(partition_field, 
projected_and_filtered[partition_field.source_id][inst["offset"]])
+for partition_field in partition_fields
+]
+partition_key = PartitionKey(raw_partition_field_values=fieldvalues, 
partition_spec=partition_spec, schema=schema)
+table_partitions.append(TablePartition(partition_key=partition_key, 
arrow_table_partition=partition_slice))
+
+return table_partitions
+
+
+def partition(iceberg_table_metadata: TableMetadata, arrow_table: pa.Table) -> 
Iterable[TablePartition]:
+"""Based on the iceberg table partition spec, slice the arrow table into 
partitions with their keys.
+
+Example:
+Input:
+An arrow table with partition key of ['n_legs', 'year'] and with data of
+{'year': [2020, 2022, 2022, 2021, 2022, 2022, 2022, 2019, 2021],
+ 'n_legs': [2, 2, 2, 4, 4, 4, 4, 5, 100],
+ 'animal': ["Flamingo", "Parrot", "Parrot", "Dog", "Horse", "Horse", 
"Horse","Brittle stars", "Centipede"]}.
+The algrithm:
+Firstly we group the rows into partitions by sorting with sort order 
[('n_legs', 'descending'), ('year', 'descending')]
+and null_placement of "at_end".
+This gives the same table as raw input.
+Then we sort_indices using reverse order of [('n_legs', 'descending'), 
('year', 'descending')]
+and null_placement : "at_start".
+This gives:
+[8, 7, 4, 5, 6, 3, 1, 2, 0]
+Based on this we get partition groups of indices:
+[{'offset': 8, 'length': 1}, {'offset': 7, 'length': 1}, {'offset': 4, 
'length': 3}, {'offset': 3, 'length': 1}, {'offset': 1, 'length': 2}, 
{'offset': 0, 'length': 1}]
+We then retrieve the partition keys by offsets.
+And slice the arrow table by offsets and lengths of each partition.
+"""
+import pyarrow as pa
+
+partition_columns = get_p

Re: [PR] Partitioned Append on Identity Transform [iceberg-python]

2024-03-29 Thread via GitHub


jqin61 commented on code in PR #555:
URL: https://github.com/apache/iceberg-python/pull/555#discussion_r1544770723


##
pyiceberg/table/__init__.py:
##
@@ -3108,3 +3138,127 @@ def snapshots(self) -> "pa.Table":
 snapshots,
 schema=snapshots_schema,
 )
+
+
+@dataclass(frozen=True)
+class TablePartition:
+partition_key: PartitionKey
+arrow_table_partition: pa.Table
+
+
+def _get_partition_sort_order(partition_columns: list[str], reverse: bool = 
False) -> dict[str, Any]:
+order = 'ascending' if not reverse else 'descending'
+null_placement = 'at_start' if reverse else 'at_end'
+return {'sort_keys': [(column_name, order) for column_name in 
partition_columns], 'null_placement': null_placement}
+
+
+def group_by_partition_scheme(
+iceberg_table_metadata: TableMetadata, arrow_table: pa.Table, 
partition_columns: list[str]
+) -> pa.Table:
+"""Given a table sort it by current partition scheme with all transform 
functions supported."""

Review Comment:
   updated in the incoming commit



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Partitioned Append on Identity Transform [iceberg-python]

2024-03-29 Thread via GitHub


jqin61 commented on code in PR #555:
URL: https://github.com/apache/iceberg-python/pull/555#discussion_r1544783412


##
pyiceberg/table/__init__.py:
##
@@ -3108,3 +3138,127 @@ def snapshots(self) -> "pa.Table":
 snapshots,
 schema=snapshots_schema,
 )
+
+
+@dataclass(frozen=True)
+class TablePartition:
+partition_key: PartitionKey
+arrow_table_partition: pa.Table
+
+
+def _get_partition_sort_order(partition_columns: list[str], reverse: bool = 
False) -> dict[str, Any]:
+order = 'ascending' if not reverse else 'descending'
+null_placement = 'at_start' if reverse else 'at_end'
+return {'sort_keys': [(column_name, order) for column_name in 
partition_columns], 'null_placement': null_placement}
+
+
+def group_by_partition_scheme(
+iceberg_table_metadata: TableMetadata, arrow_table: pa.Table, 
partition_columns: list[str]
+) -> pa.Table:
+"""Given a table sort it by current partition scheme with all transform 
functions supported."""
+from pyiceberg.transforms import IdentityTransform
+
+supported = {IdentityTransform}
+if not all(
+type(field.transform) in supported for field in 
iceberg_table_metadata.spec().fields if field in partition_columns
+):
+raise ValueError(
+f"Not all transforms are supported, get: {[transform in supported 
for transform in iceberg_table_metadata.spec().fields]}."

Review Comment:
   added test for non-supported transform in integration test



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Partitioned Append on Identity Transform [iceberg-python]

2024-03-29 Thread via GitHub


jqin61 commented on code in PR #555:
URL: https://github.com/apache/iceberg-python/pull/555#discussion_r1544785650


##
pyiceberg/table/__init__.py:
##
@@ -3108,3 +3138,127 @@ def snapshots(self) -> "pa.Table":
 snapshots,
 schema=snapshots_schema,
 )
+
+
+@dataclass(frozen=True)
+class TablePartition:
+partition_key: PartitionKey
+arrow_table_partition: pa.Table
+
+
+def _get_partition_sort_order(partition_columns: list[str], reverse: bool = 
False) -> dict[str, Any]:
+order = 'ascending' if not reverse else 'descending'
+null_placement = 'at_start' if reverse else 'at_end'
+return {'sort_keys': [(column_name, order) for column_name in 
partition_columns], 'null_placement': null_placement}
+
+
+def group_by_partition_scheme(
+iceberg_table_metadata: TableMetadata, arrow_table: pa.Table, 
partition_columns: list[str]
+) -> pa.Table:
+"""Given a table sort it by current partition scheme with all transform 
functions supported."""
+from pyiceberg.transforms import IdentityTransform
+
+supported = {IdentityTransform}
+if not all(
+type(field.transform) in supported for field in 
iceberg_table_metadata.spec().fields if field in partition_columns
+):
+raise ValueError(
+f"Not all transforms are supported, get: {[transform in supported 
for transform in iceberg_table_metadata.spec().fields]}."
+)
+
+# only works for identity
+sort_options = _get_partition_sort_order(partition_columns, reverse=False)
+sorted_arrow_table = 
arrow_table.sort_by(sorting=sort_options['sort_keys'], 
null_placement=sort_options['null_placement'])
+return sorted_arrow_table
+
+
+def get_partition_columns(iceberg_table_metadata: TableMetadata, arrow_table: 
pa.Table) -> list[str]:
+arrow_table_cols = set(arrow_table.column_names)
+partition_cols = []
+for transform_field in iceberg_table_metadata.spec().fields:
+column_name = 
iceberg_table_metadata.schema().find_column_name(transform_field.source_id)
+if not column_name:
+raise ValueError(f"{transform_field=} could not be found in 
{iceberg_table_metadata.schema()}.")
+if column_name not in arrow_table_cols:
+continue
+partition_cols.append(column_name)
+return partition_cols
+
+
+def _get_table_partitions(
+arrow_table: pa.Table,
+partition_spec: PartitionSpec,
+schema: Schema,
+slice_instructions: list[dict[str, Any]],
+) -> list[TablePartition]:
+sorted_slice_instructions = sorted(slice_instructions, key=lambda x: 
x['offset'])
+
+partition_fields = partition_spec.fields
+
+offsets = [inst["offset"] for inst in sorted_slice_instructions]
+projected_and_filtered = {
+partition_field.source_id: 
arrow_table[schema.find_field(name_or_id=partition_field.source_id).name]
+.take(offsets)
+.to_pylist()
+for partition_field in partition_fields
+}
+
+table_partitions = []
+for inst in sorted_slice_instructions:
+partition_slice = arrow_table.slice(**inst)
+fieldvalues = [
+PartitionFieldValue(partition_field, 
projected_and_filtered[partition_field.source_id][inst["offset"]])
+for partition_field in partition_fields
+]
+partition_key = PartitionKey(raw_partition_field_values=fieldvalues, 
partition_spec=partition_spec, schema=schema)
+table_partitions.append(TablePartition(partition_key=partition_key, 
arrow_table_partition=partition_slice))
+
+return table_partitions
+
+
+def partition(iceberg_table_metadata: TableMetadata, arrow_table: pa.Table) -> 
Iterable[TablePartition]:

Review Comment:
   yep totally agree, i updated in the later commit in the original pr with the 
exact changes you mentioned and also added unit tests around this function. Let 
me pull in those changes



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Partitioned Append on Identity Transform [iceberg-python]

2024-03-29 Thread via GitHub


jqin61 commented on code in PR #555:
URL: https://github.com/apache/iceberg-python/pull/555#discussion_r1544785650


##
pyiceberg/table/__init__.py:
##
@@ -3108,3 +3138,127 @@ def snapshots(self) -> "pa.Table":
 snapshots,
 schema=snapshots_schema,
 )
+
+
+@dataclass(frozen=True)
+class TablePartition:
+partition_key: PartitionKey
+arrow_table_partition: pa.Table
+
+
+def _get_partition_sort_order(partition_columns: list[str], reverse: bool = 
False) -> dict[str, Any]:
+order = 'ascending' if not reverse else 'descending'
+null_placement = 'at_start' if reverse else 'at_end'
+return {'sort_keys': [(column_name, order) for column_name in 
partition_columns], 'null_placement': null_placement}
+
+
+def group_by_partition_scheme(
+iceberg_table_metadata: TableMetadata, arrow_table: pa.Table, 
partition_columns: list[str]
+) -> pa.Table:
+"""Given a table sort it by current partition scheme with all transform 
functions supported."""
+from pyiceberg.transforms import IdentityTransform
+
+supported = {IdentityTransform}
+if not all(
+type(field.transform) in supported for field in 
iceberg_table_metadata.spec().fields if field in partition_columns
+):
+raise ValueError(
+f"Not all transforms are supported, get: {[transform in supported 
for transform in iceberg_table_metadata.spec().fields]}."
+)
+
+# only works for identity
+sort_options = _get_partition_sort_order(partition_columns, reverse=False)
+sorted_arrow_table = 
arrow_table.sort_by(sorting=sort_options['sort_keys'], 
null_placement=sort_options['null_placement'])
+return sorted_arrow_table
+
+
+def get_partition_columns(iceberg_table_metadata: TableMetadata, arrow_table: 
pa.Table) -> list[str]:
+arrow_table_cols = set(arrow_table.column_names)
+partition_cols = []
+for transform_field in iceberg_table_metadata.spec().fields:
+column_name = 
iceberg_table_metadata.schema().find_column_name(transform_field.source_id)
+if not column_name:
+raise ValueError(f"{transform_field=} could not be found in 
{iceberg_table_metadata.schema()}.")
+if column_name not in arrow_table_cols:
+continue
+partition_cols.append(column_name)
+return partition_cols
+
+
+def _get_table_partitions(
+arrow_table: pa.Table,
+partition_spec: PartitionSpec,
+schema: Schema,
+slice_instructions: list[dict[str, Any]],
+) -> list[TablePartition]:
+sorted_slice_instructions = sorted(slice_instructions, key=lambda x: 
x['offset'])
+
+partition_fields = partition_spec.fields
+
+offsets = [inst["offset"] for inst in sorted_slice_instructions]
+projected_and_filtered = {
+partition_field.source_id: 
arrow_table[schema.find_field(name_or_id=partition_field.source_id).name]
+.take(offsets)
+.to_pylist()
+for partition_field in partition_fields
+}
+
+table_partitions = []
+for inst in sorted_slice_instructions:
+partition_slice = arrow_table.slice(**inst)
+fieldvalues = [
+PartitionFieldValue(partition_field, 
projected_and_filtered[partition_field.source_id][inst["offset"]])
+for partition_field in partition_fields
+]
+partition_key = PartitionKey(raw_partition_field_values=fieldvalues, 
partition_spec=partition_spec, schema=schema)
+table_partitions.append(TablePartition(partition_key=partition_key, 
arrow_table_partition=partition_slice))
+
+return table_partitions
+
+
+def partition(iceberg_table_metadata: TableMetadata, arrow_table: pa.Table) -> 
Iterable[TablePartition]:

Review Comment:
   yep totally agree, I updated in the later commit of the original PR with the 
exact changes you mentioned and also added unit tests around this function. Let 
me pull in those changes



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org