kevinjqliu commented on code in PR #2881:
URL: https://github.com/apache/iceberg-python/pull/2881#discussion_r2662506024
##########
pyiceberg/io/pyarrow.py:
##########
@@ -789,7 +789,7 @@ def visit_string(self, _: StringType) -> pa.DataType:
return pa.large_string()
def visit_uuid(self, _: UUIDType) -> pa.DataType:
- return pa.uuid()
+ return pa.binary(16)
Review Comment:
not sure if this is the right fix.
We explicity changed binary(16) to uuid in this PR
https://github.com/apache/iceberg-python/pull/2007/files#diff-8d5e63f2a87ead8cebe2fd8ac5dcf2198d229f01e16bb9e06e21f7277c328abdR687
The current change reverts it
##########
tests/integration/test_writes/test_writes.py:
##########
@@ -2530,3 +2530,63 @@ def test_v3_write_and_read_row_lineage(spark:
SparkSession, session_catalog: Cat
assert tbl.metadata.next_row_id == initial_next_row_id + len(test_data), (
"Expected next_row_id to be incremented by the number of added rows"
)
+
+
[email protected]
+def test_write_uuid_in_pyiceberg_and_scan(session_catalog: Catalog, spark:
SparkSession) -> None:
+ """Test UUID compatibility between PyIceberg and Spark.
+
+ UUIDs must be written as binary(16) for Spark compatibility since Java
Arrow
+ metadata differs from Python Arrow metadata for UUID types.
+ """
+ identifier = "default.test_write_uuid_in_pyiceberg_and_scan"
+
+ catalog = load_catalog("default", type="in-memory")
+ catalog.create_namespace("ns")
+
+ schema = Schema(NestedField(field_id=1, name="uuid_col",
field_type=UUIDType(), required=False))
+
+ test_data_with_null = {
+ "uuid_col": [
+ uuid.UUID("00000000-0000-0000-0000-000000000000").bytes,
+ None,
+ uuid.UUID("11111111-1111-1111-1111-111111111111").bytes,
+ ]
+ }
+
+ try:
+ session_catalog.drop_table(identifier=identifier)
+ except NoSuchTableError:
+ pass
+
+ table = _create_table(session_catalog, identifier, {"format-version":
"2"}, schema=schema)
+
+ arrow_table = pa.table(test_data_with_null, schema=schema.as_arrow())
+
+ # Write with pyarrow
+ table.append(arrow_table)
+
+ # Write with pyspark
+ spark.sql(
+ f"""
+ INSERT INTO {identifier} VALUES
("22222222-2222-2222-2222-222222222222")
+ """
+ )
+ df = spark.table(identifier)
+
+ table.refresh()
+
+ assert df.count() == 4
+ assert len(table.scan().to_arrow()) == 4
+
+ result = df.where("uuid_col = '00000000-0000-0000-0000-000000000000'")
+ assert result.count() == 1
Review Comment:
I ran this test on current main branch with 1.10.1 and this is the
stacktrace. This is different from the stacktrace in #2007
```
E pyspark.errors.exceptions.connect.SparkException: Job
aborted due to stage failure: Task 0 in stage 142.0 failed 1 times, most recent
failure: Lost task 0.0 in stage 142.0 (TID 287) (fcaa97ba83c2 executor driver):
java.lang.ClassCastException: class java.util.UUID cannot be cast to class
java.nio.ByteBuffer (java.util.UUID and java.nio.ByteBuffer are in module
java.base of loader 'bootstrap')
E at
java.base/java.nio.ByteBuffer.compareTo(ByteBuffer.java:267)
E at
java.base/java.util.Comparators$NaturalOrderComparator.compare(Comparators.java:52)
E at
java.base/java.util.Comparators$NaturalOrderComparator.compare(Comparators.java:47)
E at
org.apache.iceberg.types.Comparators$NullSafeChainedComparator.compare(Comparators.java:306)
E at
org.apache.iceberg.parquet.ParquetMetricsRowGroupFilter$MetricsEvalVisitor.eq(ParquetMetricsRowGroupFilter.java:352)
E at
org.apache.iceberg.parquet.ParquetMetricsRowGroupFilter$MetricsEvalVisitor.eq(ParquetMetricsRowGroupFilter.java:79)
E at
org.apache.iceberg.expressions.ExpressionVisitors$BoundExpressionVisitor.predicate(ExpressionVisitors.java:162)
E at
org.apache.iceberg.expressions.ExpressionVisitors.visitEvaluator(ExpressionVisitors.java:390)
E at
org.apache.iceberg.expressions.ExpressionVisitors.visitEvaluator(ExpressionVisitors.java:409)
E at
org.apache.iceberg.parquet.ParquetMetricsRowGroupFilter$MetricsEvalVisitor.eval(ParquetMetricsRowGroupFilter.java:103)
E at
org.apache.iceberg.parquet.ParquetMetricsRowGroupFilter.shouldRead(ParquetMetricsRowGroupFilter.java:73)
E at
org.apache.iceberg.parquet.ReadConf.<init>(ReadConf.java:108)
E at
org.apache.iceberg.parquet.VectorizedParquetReader.init(VectorizedParquetReader.java:90)
E at
org.apache.iceberg.parquet.VectorizedParquetReader.iterator(VectorizedParquetReader.java:99)
E at
org.apache.iceberg.spark.source.BatchDataReader.open(BatchDataReader.java:126)
E at
org.apache.iceberg.spark.source.BatchDataReader.open(BatchDataReader.java:43)
E at
org.apache.iceberg.spark.source.BaseReader.next(BaseReader.java:141)
E at
org.apache.spark.sql.execution.datasources.v2.PartitionIterator.hasNext(DataSourceRDD.scala:148)
E at
org.apache.spark.sql.execution.datasources.v2.MetricsIterator.hasNext(DataSourceRDD.scala:186)
E at
org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.$anonfun$hasNext$1(DataSourceRDD.scala:72)
E at
org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.$anonfun$hasNext$1$adapted(DataSourceRDD.scala:72)
E at scala.Option.exists(Option.scala:406)
E at
org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.hasNext(DataSourceRDD.scala:72)
E at
org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.advanceToNextIter(DataSourceRDD.scala:103)
E at
org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.hasNext(DataSourceRDD.scala:72)
E at
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
E at
scala.collection.Iterator$$anon$9.hasNext(Iterator.scala:583)
E at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.columnartorow_nextBatch_0$(Unknown
Source)
E at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.hashAgg_doAggregateWithoutKey_0$(Unknown
Source)
E at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
Source)
E at
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
E at
org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:50)
E at
scala.collection.Iterator$$anon$9.hasNext(Iterator.scala:583)
E at
org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:143)
E at
org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:57)
E at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:111)
E at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54)
E at
org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:171)
E at
org.apache.spark.scheduler.Task.run(Task.scala:147)
E at
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$5(Executor.scala:647)
E at
org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:80)
E at
org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:77)
E at
org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:99)
E at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:650)
E at
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
E at
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
E at java.base/java.lang.Thread.run(Thread.java:840)
E
E Driver stacktrace:
E
E JVM stacktrace:
E org.apache.spark.SparkException
E at
org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$3(DAGScheduler.scala:2935)
E at scala.Option.getOrElse(Option.scala:201)
E at
org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2935)
E at
org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2927)
E at
scala.collection.immutable.List.foreach(List.scala:334)
E at
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2927)
E at
org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1295)
E at
org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1295)
E at scala.Option.foreach(Option.scala:437)
E at
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1295)
E at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3207)
E at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:3141)
E at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:3130)
E at
org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:50)
E at
org.apache.spark.util.Utils$.getTryWithCallerStacktrace(Utils.scala:1439)
E at
org.apache.spark.util.LazyTry.get(LazyTry.scala:58)
E at
org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:201)
E at
org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:260)
E at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
E at
org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:257)
E at
org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:197)
E at
org.apache.spark.sql.connect.execution.SparkConnectPlanExecution.$anonfun$processAsArrowBatches$2(SparkConnectPlanExecution.scala:155)
E at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$8(SQLExecution.scala:163)
E at
org.apache.spark.sql.execution.SQLExecution$.withSessionTagsApplied(SQLExecution.scala:272)
E at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$7(SQLExecution.scala:125)
E at
org.apache.spark.JobArtifactSet$.withActiveJobArtifactState(JobArtifactSet.scala:94)
E at
org.apache.spark.sql.artifact.ArtifactManager.$anonfun$withResources$1(ArtifactManager.scala:112)
E at
org.apache.spark.util.Utils$.withContextClassLoader(Utils.scala:186)
E at
org.apache.spark.sql.artifact.ArtifactManager.withClassLoaderIfNeeded(ArtifactManager.scala:102)
E at
org.apache.spark.sql.artifact.ArtifactManager.withResources(ArtifactManager.scala:111)
E at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$6(SQLExecution.scala:125)
E at
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:295)
E at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$1(SQLExecution.scala:124)
E at
org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:804)
E at
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId0(SQLExecution.scala:78)
E at
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:237)
E at
org.apache.spark.sql.connect.execution.SparkConnectPlanExecution.processAsArrowBatches(SparkConnectPlanExecution.scala:154)
E at
org.apache.spark.sql.connect.execution.SparkConnectPlanExecution.handlePlan(SparkConnectPlanExecution.scala:78)
E at
org.apache.spark.sql.connect.execution.ExecuteThreadRunner.handlePlan(ExecuteThreadRunner.scala:314)
E at
org.apache.spark.sql.connect.execution.ExecuteThreadRunner.$anonfun$executeInternal$1(ExecuteThreadRunner.scala:225)
E at
org.apache.spark.sql.connect.execution.ExecuteThreadRunner.$anonfun$executeInternal$1$adapted(ExecuteThreadRunner.scala:196)
E at
org.apache.spark.sql.connect.service.SessionHolder.$anonfun$withSession$2(SessionHolder.scala:341)
E at
org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:804)
E at
org.apache.spark.sql.connect.service.SessionHolder.$anonfun$withSession$1(SessionHolder.scala:341)
E at
org.apache.spark.JobArtifactSet$.withActiveJobArtifactState(JobArtifactSet.scala:94)
E at
org.apache.spark.sql.artifact.ArtifactManager.$anonfun$withResources$1(ArtifactManager.scala:112)
E at
org.apache.spark.util.Utils$.withContextClassLoader(Utils.scala:186)
E at
org.apache.spark.sql.artifact.ArtifactManager.withClassLoaderIfNeeded(ArtifactManager.scala:102)
E at
org.apache.spark.sql.artifact.ArtifactManager.withResources(ArtifactManager.scala:111)
E at
org.apache.spark.sql.connect.service.SessionHolder.withSession(SessionHolder.scala:340)
E at
org.apache.spark.sql.connect.execution.ExecuteThreadRunner.executeInternal(ExecuteThreadRunner.scala:196)
E at
org.apache.spark.sql.connect.execution.ExecuteThreadRunner.org$apache$spark$sql$connect$execution$ExecuteThreadRunner$$execute(ExecuteThreadRunner.scala:125)
E at
org.apache.spark.sql.connect.execution.ExecuteThreadRunner$ExecutionThread.run(ExecuteThreadRunner.scala:347)
E Caused by: java.lang.ClassCastException: class
java.util.UUID cannot be cast to class java.nio.ByteBuffer (java.util.UUID and
java.nio.ByteBuffer are in module java.base of loader 'bootstrap')
E at java.nio.ByteBuffer.compareTo(ByteBuffer.java:267)
E at
java.util.Comparators$NaturalOrderComparator.compare(Comparators.java:52)
E at
java.util.Comparators$NaturalOrderComparator.compare(Comparators.java:47)
E at
org.apache.iceberg.types.Comparators$NullSafeChainedComparator.compare(Comparators.java:306)
E at
org.apache.iceberg.parquet.ParquetMetricsRowGroupFilter$MetricsEvalVisitor.eq(ParquetMetricsRowGroupFilter.java:352)
E at
org.apache.iceberg.parquet.ParquetMetricsRowGroupFilter$MetricsEvalVisitor.eq(ParquetMetricsRowGroupFilter.java:79)
E at
org.apache.iceberg.expressions.ExpressionVisitors$BoundExpressionVisitor.predicate(ExpressionVisitors.java:162)
E at
org.apache.iceberg.expressions.ExpressionVisitors.visitEvaluator(ExpressionVisitors.java:390)
E at
org.apache.iceberg.expressions.ExpressionVisitors.visitEvaluator(ExpressionVisitors.java:409)
E at
org.apache.iceberg.parquet.ParquetMetricsRowGroupFilter$MetricsEvalVisitor.eval(ParquetMetricsRowGroupFilter.java:103)
E at
org.apache.iceberg.parquet.ParquetMetricsRowGroupFilter.shouldRead(ParquetMetricsRowGroupFilter.java:73)
E at
org.apache.iceberg.parquet.ReadConf.<init>(ReadConf.java:108)
E at
org.apache.iceberg.parquet.VectorizedParquetReader.init(VectorizedParquetReader.java:90)
E at
org.apache.iceberg.parquet.VectorizedParquetReader.iterator(VectorizedParquetReader.java:99)
E at
org.apache.iceberg.spark.source.BatchDataReader.open(BatchDataReader.java:126)
E at
org.apache.iceberg.spark.source.BatchDataReader.open(BatchDataReader.java:43)
E at
org.apache.iceberg.spark.source.BaseReader.next(BaseReader.java:141)
E at
org.apache.spark.sql.execution.datasources.v2.PartitionIterator.hasNext(DataSourceRDD.scala:148)
E at
org.apache.spark.sql.execution.datasources.v2.MetricsIterator.hasNext(DataSourceRDD.scala:186)
E at
org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.$anonfun$hasNext$1(DataSourceRDD.scala:72)
E at
org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.$anonfun$hasNext$1$adapted(DataSourceRDD.scala:72)
E at scala.Option.exists(Option.scala:406)
E at
org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.hasNext(DataSourceRDD.scala:72)
E at
org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.advanceToNextIter(DataSourceRDD.scala:103)
E at
org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.hasNext(DataSourceRDD.scala:72)
E at
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
E at
scala.collection.Iterator$$anon$9.hasNext(Iterator.scala:583)
E at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.columnartorow_nextBatch_0$(:-1)
E at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.hashAgg_doAggregateWithoutKey_0$(:-1)
E at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(:-1)
E at
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
E at
org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:50)
E at
scala.collection.Iterator$$anon$9.hasNext(Iterator.scala:583)
E at
org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:143)
E at
org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:57)
E at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:111)
E at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54)
E at
org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:171)
E at
org.apache.spark.scheduler.Task.run(Task.scala:147)
E at
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$5(Executor.scala:647)
E at
org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:80)
E at
org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:77)
E at
org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:99)
E at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:650)
E at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
E at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
E at java.lang.Thread.run(Thread.java:840)
.venv/lib/python3.12/site-packages/pyspark/sql/connect/client/core.py:1882:
SparkException
```
##########
tests/integration/test_writes/test_writes.py:
##########
@@ -2530,3 +2530,63 @@ def test_v3_write_and_read_row_lineage(spark:
SparkSession, session_catalog: Cat
assert tbl.metadata.next_row_id == initial_next_row_id + len(test_data), (
"Expected next_row_id to be incremented by the number of added rows"
)
+
+
[email protected]
+def test_write_uuid_in_pyiceberg_and_scan(session_catalog: Catalog, spark:
SparkSession) -> None:
+ """Test UUID compatibility between PyIceberg and Spark.
+
+ UUIDs must be written as binary(16) for Spark compatibility since Java
Arrow
+ metadata differs from Python Arrow metadata for UUID types.
+ """
+ identifier = "default.test_write_uuid_in_pyiceberg_and_scan"
+
+ catalog = load_catalog("default", type="in-memory")
+ catalog.create_namespace("ns")
+
+ schema = Schema(NestedField(field_id=1, name="uuid_col",
field_type=UUIDType(), required=False))
+
+ test_data_with_null = {
+ "uuid_col": [
+ uuid.UUID("00000000-0000-0000-0000-000000000000").bytes,
+ None,
+ uuid.UUID("11111111-1111-1111-1111-111111111111").bytes,
+ ]
+ }
+
+ try:
+ session_catalog.drop_table(identifier=identifier)
+ except NoSuchTableError:
+ pass
+
+ table = _create_table(session_catalog, identifier, {"format-version":
"2"}, schema=schema)
+
+ arrow_table = pa.table(test_data_with_null, schema=schema.as_arrow())
+
+ # Write with pyarrow
+ table.append(arrow_table)
+
+ # Write with pyspark
+ spark.sql(
+ f"""
+ INSERT INTO {identifier} VALUES
("22222222-2222-2222-2222-222222222222")
+ """
+ )
+ df = spark.table(identifier)
+
+ table.refresh()
+
+ assert df.count() == 4
+ assert len(table.scan().to_arrow()) == 4
+
+ result = df.where("uuid_col = '00000000-0000-0000-0000-000000000000'")
+ assert result.count() == 1
Review Comment:
i also downloaded the 2 data files
```
➜ Downloads parquet schema
00000-0-562a1d32-f5da-4e09-836a-b7d0d4e737e6.parquet
{
"type" : "record",
"name" : "schema",
"fields" : [ {
"name" : "uuid_col",
"type" : [ "null", {
"type" : "string",
"logicalType" : "uuid"
} ],
"default" : null
} ]
}
➜ Downloads parquet schema
00000-284-30c509c4-f8e9-46fb-a1f1-169e1c928e00-0-00001.parquet
{
"type" : "record",
"name" : "table",
"fields" : [ {
"name" : "uuid_col",
"type" : {
"type" : "string",
"logicalType" : "uuid"
}
} ]
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]