kbendick edited a comment on issue #2962:
URL: https://github.com/apache/iceberg/issues/2962#issuecomment-925357611


   Hi @prodeezy @hankfanchiu , sorry for the delay in getting back to you on 
this. I was looking into a different parquet issue,[ 
PARQUET-2078](https://issues.apache.org/jira/browse/PARQUET-2078) so my 
apologies for overlooking this one.
   
   I'm also able to reproduce this using Spark 3.1.2 with Iceberg 0.9.0 
(compile only [email protected]) to Iceberg 0.11.0 (compile only [email protected]).
   
   I get the same error
   ```
   java.lang.IllegalArgumentException: [mapCol, key_value, key] required binary 
key (STRING) = 2 is not in the store: [] 12
        at 
org.apache.iceberg.shaded.org.apache.parquet.hadoop.ColumnChunkPageReadStore.getPageReader(ColumnChunkPageReadStore.java:231)
        at 
org.apache.iceberg.parquet.ParquetValueReaders$PrimitiveReader.setPageSource(ParquetValueReaders.java:154)
        at 
org.apache.iceberg.parquet.ParquetValueReaders$RepeatedKeyValueReader.setPageSource(ParquetValueReaders.java:487)
        at 
org.apache.iceberg.parquet.ParquetValueReaders$StructReader.setPageSource(ParquetValueReaders.java:643)
        at 
org.apache.iceberg.parquet.ParquetReader$FileIterator.advance(ParquetReader.java:139)
        at 
org.apache.iceberg.parquet.ParquetReader$FileIterator.next(ParquetReader.java:110)
        at 
org.apache.iceberg.spark.source.BaseDataReader.next(BaseDataReader.java:69)
        at 
org.apache.spark.sql.execution.datasources.v2.PartitionIterator.hasNext(DataSourceRDD.scala:79)
        at 
org.apache.spark.sql.execution.datasources.v2.MetricsIterator.hasNext(DataSourceRDD.scala:112)
        at 
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
        at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
        at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
 Source)
        at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:755)
        at 
org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:345)
        at 
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:898)
        at 
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:898)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:131)
        at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
   ```
   
   Here's my minimal reproduction:
   
   I downloaded the Spark tarball for 3.1.2 from 
https://archive.apache.org/dist/spark/spark-3.1.2/spark-3.1.2-bin-hadoop2.7.tgz,
 untarred it, and created a test table similar to yours.
   
   **Notice that by default, it's parquet-1.10.1, so I removed the parquet jars 
from /opt/spark/jars**
   ```bash
   # Note that this is parquet 1.10.1 using Apache Spark 3.1.2.
   root@spark:/opt/spark# ls -la jars | grep parquet
   parquet-column-1.10.1.jar  parquet-common-1.10.1.jar  
parquet-encoding-1.10.1.jar  parquet-format-2.4.0.jar  
parquet-hadoop-1.10.1.jar  parquet-jackson-1.10.1.jar
   
   root@spark:/opt/spark# rm -f jars/parquet-*
   
   root@spark-box:/opt/spark# ./bin/spark-shell     --packages 
org.apache.iceberg:iceberg-spark3-runtime:0.9.0    --driver-memory 2g     
--conf 
spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog    
 --conf spark.sql.catalog.spark_catalog.type=hive      --conf 
spark.hadoop.hive.metastore.uris=thrift://hive:9083
   ```
   
   ```scala
   scala>  import spark.implicits._
   import spark.implicits._
   
   scala> import org.apache.spark.sql._
   import org.apache.spark.sql._
   
   scala> import org.apache.spark.sql.functions._
   import org.apache.spark.sql.functions._
   
   scala> import org.apache.spark.sql.expressions._
   import org.apache.spark.sql.expressions._
   
   scala>  spark.sql("CREATE TABLE IF NOT EXISTS 
test_parquet_map_regression_iceberg_090(mapCol MAP<STRING, STRUCT<payload: 
STRUCT<bool: BOOLEAN, dbl: DOUBLE, str: STRING>, str: STRING>>) USING ICEBERG 
TBLPROPERTIES('type'='hive')")
   
   scala> :paste
   // Entering paste mode (ctrl-D to finish)
   
       var df = spark.range(NUM_ROWS)
         .withColumnRenamed("id", "longCol")
         .withColumn("intCol", expr("CAST(longCol AS INT)"))
         .withColumn("dbl", expr("CAST(longCol AS DOUBLE)"))
         .withColumn("str", expr("CAST(longCol AS STRING)"))
         .withColumn("bool", expr("IF(intCol % 2 = 0, true, false)"))
         .withColumn("payload", struct($"bool", $"dbl", $"str"))
         .withColumn("value", struct($"payload", $"str"))
         .withColumn("mapCol", map($"str", $"value"))
         .select("mapCol")
   
   // Exiting paste mode, now interpreting.
   
   df: org.apache.spark.sql.DataFrame = [mapCol: 
map<string,struct<payload:struct<bool:boolean,dbl:double,str:string>,str:string>>]
   
   scala> df.writeTo("default.test_parquet_map_regression_iceberg_090").append
   
   scala>  
spark.table("default.test_parquet_map_regression_iceberg_090").show(false)
   +-------------------------------+
   |mapCol                         |
   +-------------------------------+
   |{0 -> {{true, 0.0, 0}, 0}}     |
   |{1 -> {{false, 1.0, 1}, 1}}    |
   |{2 -> {{true, 2.0, 2}, 2}}     |
   |{3 -> {{false, 3.0, 3}, 3}}    |
   |{4 -> {{true, 4.0, 4}, 4}}     |
   |{5 -> {{false, 5.0, 5}, 5}}    |
   |{6 -> {{true, 6.0, 6}, 6}}     |
   |{7 -> {{false, 7.0, 7}, 7}}    |
   |{8 -> {{true, 8.0, 8}, 8}}     |
   |{9 -> {{false, 9.0, 9}, 9}}    |
   |{10 -> {{true, 10.0, 10}, 10}} |
   |{11 -> {{false, 11.0, 11}, 11}}|
   |{12 -> {{true, 12.0, 12}, 12}} |
   |{13 -> {{false, 13.0, 13}, 13}}|
   |{14 -> {{true, 14.0, 14}, 14}} |
   |{15 -> {{false, 15.0, 15}, 15}}|
   |{16 -> {{true, 16.0, 16}, 16}} |
   |{17 -> {{false, 17.0, 17}, 17}}|
   |{18 -> {{true, 18.0, 18}, 18}} |
   |{19 -> {{false, 19.0, 19}, 19}}|
   +-------------------------------+
   only showing top 20 rows
   
   scala> :quit
   ```
   
   I then upgraded from Iceberg 0.9.0 (compiled against Parquet 0.11.0) to 
Iceberg 0.10.0 (compiled against Parquet 0.11.1)
   
   First, I tried writing a new table (to ensure that data is working when 
using the new format).
   ```bash
   root@spark-box:/opt/spark#  ./bin/spark-shell     --packages 
org.apache.iceberg:iceberg-spark3-runtime:0.10.0    --driver-memory 2g     
--conf 
spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog    
 --conf spark.sql.catalog.spark_catalog.type=hive      --conf 
spark.hadoop.hive.metastore.uris=thrift://hive-box:9083
   ```
   
   First, I tried writing a new table (to ensure that data is working when 
using the new format).
   ```scala
   scala>  spark.sql("CREATE TABLE IF NOT EXISTS 
test_parquet_map_regression_iceberg_010(mapCol MAP<STRING, STRUCT<payload: 
STRUCT<bool: BOOLEAN, dbl: DOUBLE, str: STRING>, str: STRING>>) USING ICEBERG 
TBLPROPERTIES('type'='hive')")
   
   ... create df as above ....
   
   scala> 
spark.table("default.test_parquet_map_regression_iceberg_010").show(false)
   +-------------------------------+
   |mapCol                         |
   +-------------------------------+
   |{0 -> {{true, 0.0, 0}, 0}}     |
   |{1 -> {{false, 1.0, 1}, 1}}    |
   |{2 -> {{true, 2.0, 2}, 2}}     |
   |{3 -> {{false, 3.0, 3}, 3}}    |
   |{4 -> {{true, 4.0, 4}, 4}}     |
   |{5 -> {{false, 5.0, 5}, 5}}    |
   |{6 -> {{true, 6.0, 6}, 6}}     |
   |{7 -> {{false, 7.0, 7}, 7}}    |
   |{8 -> {{true, 8.0, 8}, 8}}     |
   |{9 -> {{false, 9.0, 9}, 9}}    |
   |{10 -> {{true, 10.0, 10}, 10}} |
   |{11 -> {{false, 11.0, 11}, 11}}|
   |{12 -> {{true, 12.0, 12}, 12}} |
   |{13 -> {{false, 13.0, 13}, 13}}|
   |{14 -> {{true, 14.0, 14}, 14}} |
   |{15 -> {{false, 15.0, 15}, 15}}|
   |{16 -> {{true, 16.0, 16}, 16}} |
   |{17 -> {{false, 17.0, 17}, 17}}|
   |{18 -> {{true, 18.0, 18}, 18}} |
   |{19 -> {{false, 19.0, 19}, 19}}|
   +-------------------------------+
   only showing top 20 rows
   ```
   
   So Parquet 1.11.1 data can be written and then read.
   
   However, the older data written with Parquet 1.11.0 cannot be read on the 
newer Iceberg version.
   
   ```scala
   scala> 
spark.table("default.test_parquet_map_regression_iceberg_090").show(false)
   21/09/22 21:49:24 ERROR executor.Executor: Exception in task 0.0 in stage 
2.0 (TID 2)
   java.lang.IllegalArgumentException: [mapCol, map, key] required binary key 
(STRING) = 2 is not in the store: [] 12
        at 
org.apache.iceberg.shaded.org.apache.parquet.hadoop.ColumnChunkPageReadStore.getPageReader(ColumnChunkPageReadStore.java:231)
        at 
org.apache.iceberg.parquet.ParquetValueReaders$PrimitiveReader.setPageSource(ParquetValueReaders.java:185)
        at 
org.apache.iceberg.parquet.ParquetValueReaders$RepeatedKeyValueReader.setPageSource(ParquetValueReaders.java:529)
        at 
org.apache.iceberg.parquet.ParquetValueReaders$StructReader.setPageSource(ParquetValueReaders.java:685)
        at 
org.apache.iceberg.parquet.ParquetReader$FileIterator.advance(ParquetReader.java:142)
        at 
org.apache.iceberg.parquet.ParquetReader$FileIterator.next(ParquetReader.java:112)
        at 
org.apache.iceberg.spark.source.BaseDataReader.next(BaseDataReader.java:81)
        at 
org.apache.spark.sql.execution.datasources.v2.PartitionIterator.hasNext(DataSourceRDD.scala:79)
        at 
org.apache.spark.sql.execution.datasources.v2.MetricsIterator.hasNext(DataSourceRDD.scala:112)
        at 
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
        at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
        at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
 Source)
        at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:755)
        at 
org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:345)
        at 
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:898)
        at 
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:898)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:131)
        at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
   ```
   
   So this issue has actually been present for this particular map data since 
Iceberg 0.9.0 -> Iceberg 0.10.0 Upgrade (which is when we went from Parquet 
1.11.0 -> Parquet 1.11.1).
   
   I also tried with Iceberg 0.12.0, and as expected, the data written from 
Iceberg 0.10.0 is readable, but the data from Iceberg 0.9.0 (Parquet 1.11.0) is 
not readable.
   
   We should investigate if there's a fix in upcoming Parquet 1.12.1.
   
   I'm also going to check to see if this issue affects OSS Spark and not just 
Iceberg (I suspect that it does not as Spark 3.1.2 is still on Parquet 1.10.2 
but will double check).
   
   cc @rdblue 
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to