rmnlchh opened a new issue, #9282:
URL: https://github.com/apache/hudi/issues/9282

   As part of our pipelines, we use tables that are being deltastreamed. Trying 
to upgrade to EMR 6.11 (which bring hudi 0.13.0/spark 3.3.2) we started facing 
issue which is discussed in 
   https://github.com/apache/hudi/issues/8061#issuecomment-1447657892
   The fix with 
   sc.set("spark.sql.legacy.parquet.nanosAsLong", "false");
   sc.set("spark.sql.parquet.binaryAsString", "false");
   sc.set("spark.sql.parquet.int96AsTimestamp", "true");
   sc.set("spark.sql.caseSensitive", "false");
   worked for all the cases except for those where we query delta streamed 
tables.
   
   Steps to reproduce the behavior:
   
   1. Use hudi 0.13.0, spark 3.3.2
   2. Used spark configs:
   spark.shuffle.spill.compress -> true
   spark.serializer -> org.apache.spark.serializer.KryoSerializer
   spark.sql.warehouse.dir -> 
file:/XXX/cdp-datapipeline-curation/datalake-deltastreamer/spark-warehouse
   spark.sql.parquet.int96AsTimestamp -> true
   spark.io.compression.lz4.blockSize -> 64k
   spark.executor.extraJavaOptions -> -XX:+IgnoreUnrecognizedVMOptions 
--add-opens=java.base/java.lang=ALL-UNNAMED 
--add-opens=java.base/java.lang.invoke=ALL-UNNAMED 
--add-opens=java.base/java.lang.reflect=ALL-UNNAMED 
--add-opens=java.base/java.io=ALL-UNNAMED 
--add-opens=java.base/java.net=ALL-UNNAMED 
--add-opens=java.base/java.nio=ALL-UNNAMED 
--add-opens=java.base/java.util=ALL-UNNAMED 
--add-opens=java.base/java.util.concurrent=ALL-UNNAMED 
--add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED 
--add-opens=java.base/sun.nio.ch=ALL-UNNAMED 
--add-opens=java.base/sun.nio.cs=ALL-UNNAMED 
--add-opens=java.base/sun.security.action=ALL-UNNAMED 
--add-opens=java.base/sun.util.calendar=ALL-UNNAMED 
--add-opens=java.security.jgss/sun.security.krb5=ALL-UNNAMED
   spark.driver.host -> 127.0.0.1
   spark.sql.hive.convertMetastoreParquet -> false
   spark.broadcast.compress -> true
   spark.io.compression.codec -> snappy
   spark.sql.adaptive.skewJoin.enabled -> true
   spark.sql.parquet.binaryAsString -> false
   spark.driver.port -> 36083
   spark.rdd.compress -> true
   spark.io.compression.zstd.level -> 1
   spark.sql.caseSensitive -> false
   spark.shuffle.compress -> true
   spark.io.compression.zstd.bufferSize -> 64k
   spark.sql.catalog -> org.apache.spark.sql.hudi.catalog.HoodieCatalog
   spark.sql.parquet.int96RebaseModeInRead -> LEGACY
   spark.memory.storageFraction -> 0.20
   spark.app.name -> CreativeDeltaStreamerTest-creative-deltastreamer-1689954313
   spark.sql.parquet.datetimeRebaseModeInWrite -> LEGACY
   spark.sql.parquet.outputTimestampType -> TIMESTAMP_MICROS
   spark.sql.avro.datetimeRebaseModeInWrite -> LEGACY
   spark.sql.avro.compression.codec -> snappy
   spark.sql.legacy.parquet.nanosAsLong -> false
   spark.sql.extension -> org.apache.spark.sql.hudi.HoodieSparkSessionExtension
   spark.app.startTime -> 1689968713919
   spark.executor.id -> driver
   spark.sql.parquet.enableVectorizedReader -> true
   spark.sql.legacy.timeParserPolicy -> LEGACY
   spark.driver.extraJavaOptions -> -XX:+IgnoreUnrecognizedVMOptions 
--add-opens=java.base/java.lang=ALL-UNNAMED 
--add-opens=java.base/java.lang.invoke=ALL-UNNAMED 
--add-opens=java.base/java.lang.reflect=ALL-UNNAMED 
--add-opens=java.base/java.io=ALL-UNNAMED 
--add-opens=java.base/java.net=ALL-UNNAMED 
--add-opens=java.base/java.nio=ALL-UNNAMED 
--add-opens=java.base/java.util=ALL-UNNAMED 
--add-opens=java.base/java.util.concurrent=ALL-UNNAMED 
--add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED 
--add-opens=java.base/sun.nio.ch=ALL-UNNAMED 
--add-opens=java.base/sun.nio.cs=ALL-UNNAMED 
--add-opens=java.base/sun.security.action=ALL-UNNAMED 
--add-opens=java.base/sun.util.calendar=ALL-UNNAMED 
--add-opens=java.security.jgss/sun.security.krb5=ALL-UNNAMED
   spark.sql.parquet.datetimeRebaseModeInRead -> LEGACY
   spark.driver.memoryOverheadFactor -> 0.15
   spark.master -> local[*]
   spark.sql.parquet.filterPushdown -> true
   spark.executor.cores -> 1
   spark.memory.fraction -> 0.50
   spark.sql.avro.datetimeRebaseModeInRead -> LEGACY
   spark.executor.memoryOverheadFactor -> 0.20
   spark.sql.parquet.compression.codec -> snappy
   spark.sql.parquet.recordLevelFilter.enabled -> true
   spark.app.id -> local-1689968714613
   3. Used Delta streamer configs
   hoodie.datasource.hive_sync.database -> datalake_ods_local
   hoodie.datasource.hive_sync.support_timestamp -> true
   hoodie.datasource.write.precombine.field -> StartDateUtc
   hoodie.datasource.hive_sync.partition_fields -> CampaignId
   hoodie.metadata.index.column.stats.enable -> true
   hoodie.cleaner.fileversions.retained -> 2
   hoodie.parquet.max.file.size -> 6291456
   hoodie.datasource.write.keygenerator.consistent.logical.timestamp.enabled -> 
true
   hoodie.bloom.index.prune.by.ranges -> true
   hoodie.parquet.block.size -> 6291456
   hoodie.metadata.enable -> true
   hoodie.datasource.hive_sync.table -> published_ad
   hoodie.index.type -> BLOOM
   hoodie.parquet.compression.codec -> snappy
   hoodie.datasource.write.recordkey.field -> AdId
   hoodie.table.name -> published_ad
   hoodie.datasource.write.hive_style_partitioning -> true
   hoodie.datasource.meta.sync.base.path -> 
/XXXX/cdp-datapipeline-curation/datalake-deltastreamer/./tmp/AdDeltaStreamerTest/Domain=CampaignBuild/Table=published_ad/
   hoodie.metadata.insert.parallelism -> 1
   hoodie.enable.data.skipping -> false
   hoodie.metadata.index.column.stats.parallelism -> 1
   hoodie.datasource.write.keygenerator.class -> 
org.apache.hudi.keygen.SimpleKeyGenerator
   hoodie.meta.sync.client.tool.class -> 
org.apache.hudi.aws.sync.AwsGlueCatalogSyncTool
   hoodie.datasource.write.partitionpath.field -> CampaignId
   hoodie.index.bloom.num_entries -> 60000
   4. delta stream a table locally
   5. try reading it using like:
   println(s"READ CONFIG: ${readConfigurations.mkString("\n")}")
   val df = spark.read.format("hudi")
        .options(readConfigurations)
        .load(basePath)
   
   println(s"Querying hudi table: ${query}")
   df.createOrReplaceTempView(tempViewName)
   val selectedDF = spark.sql(query)
   the readConfigurations are:
   READ CONFIG: hoodie.datasource.hive_sync.database -> datalake_ods_local
   hoodie.datasource.hive_sync.support_timestamp -> true
   hoodie.datasource.write.precombine.field -> StartDateUtc
   hoodie.datasource.hive_sync.partition_fields -> CampaignId
   hoodie.metadata.index.column.stats.enable -> true
   hoodie.cleaner.fileversions.retained -> 2
   hoodie.parquet.max.file.size -> 6291456
   hoodie.datasource.write.keygenerator.consistent.logical.timestamp.enabled -> 
true
   hoodie.bloom.index.prune.by.ranges -> true
   hoodie.parquet.block.size -> 6291456
   hoodie.metadata.enable -> true
   hoodie.datasource.hive_sync.table -> published_ad
   hoodie.index.type -> BLOOM
   hoodie.parquet.compression.codec -> snappy
   hoodie.datasource.write.recordkey.field -> AdId
   hoodie.table.name -> published_ad
   hoodie.datasource.write.hive_style_partitioning -> true
   hoodie.datasource.meta.sync.base.path -> 
/XXXX/cdp-datapipeline-curation/cdp-datapipeline-curation/datalake-deltastreamer/./tmp/AdDeltaStreamerTest/Domain=CampaignBuild/Table=published_ad/
   hoodie.metadata.insert.parallelism -> 1
   hoodie.enable.data.skipping -> false
   hoodie.metadata.index.column.stats.parallelism -> 1
   hoodie.datasource.write.keygenerator.class -> 
org.apache.hudi.keygen.SimpleKeyGenerator
   hoodie.meta.sync.client.tool.class -> 
org.apache.hudi.aws.sync.AwsGlueCatalogSyncTool
   hoodie.datasource.write.partitionpath.field -> CampaignId
   hoodie.index.bloom.num_entries -> 60000
   
   spark.sql is where it will fail, same behaviour on .show
   
   
   I expected the table to be read successfully and have an ability to execute 
actions on the df.
   
   **Environment Description**
   * Hudi version : 0.13.0
   * Spark version : 3.3.2
   * Hive version : spark_hive:3.3.2 (hive 2.3.9 I suppose)
   * Hadoop version : 3.3.3
   * Storage (HDFS/S3/GCS..) : Local
   * Running on Docker? (yes/no) : no
   
   **Stacktrace**
   Driver stacktrace:
   at 
org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2672)
   at 
org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2608)
   at 
org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2607)
   at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
   at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
   at 
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2607)
   at 
org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1182)
   at 
org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1182)
   at scala.Option.foreach(Option.scala:407)
   ...
   Cause: java.lang.IllegalArgumentException: For input string: "null"
   at scala.collection.immutable.StringLike.parseBoolean(StringLike.scala:330)
   at scala.collection.immutable.StringLike.toBoolean(StringLike.scala:289)
   at scala.collection.immutable.StringLike.toBoolean$(StringLike.scala:289)
   at scala.collection.immutable.StringOps.toBoolean(StringOps.scala:33)
   at 
org.apache.spark.sql.execution.datasources.parquet.ParquetToSparkSchemaConverter.<init>(ParquetSchemaConverter.scala:70)
   at 
org.apache.spark.sql.execution.datasources.parquet.HoodieParquetFileFormatHelper$.buildImplicitSchemaChangeInfo(HoodieParquetFileFormatHelper.scala:30)
   at 
org.apache.spark.sql.execution.datasources.parquet.Spark32PlusHoodieParquetFileFormat.$anonfun$buildReaderWithPartitionValues$2(Spark32PlusHoodieParquetFileFormat.scala:231)
   at 
org.apache.hudi.HoodieDataSourceHelper$.$anonfun$buildHoodieParquetReader$1(HoodieDataSourceHelper.scala:71)
   at 
org.apache.hudi.HoodieBaseRelation.$anonfun$createBaseFileReader$1(HoodieBaseRelation.scala:554)
   at 
org.apache.hudi.HoodieBaseRelation$BaseFileReader.apply(HoodieBaseRelation.scala:613)
   ...
   [Executor task launch worker for task 1.0 in stage 85.0 (TID 123)] ERROR 
org.apache.spark.executor.Executor - Exception in task 1.0 in stage 85.0 (TID 
123)
   ava.lang.IllegalArgumentException: For input string: "null"
   at scala.collection.immutable.StringLike.parseBoolean(StringLike.scala:330) 
~[scala-library-2.12.17.jar:?]
   at scala.collection.immutable.StringLike.toBoolean(StringLike.scala:289) 
~[scala-library-2.12.17.jar:?]
   at scala.collection.immutable.StringLike.toBoolean$(StringLike.scala:289) 
~[scala-library-2.12.17.jar:?]
   at scala.collection.immutable.StringOps.toBoolean(StringOps.scala:33) 
~[scala-library-2.12.17.jar:?]
   at 
org.apache.spark.sql.execution.datasources.parquet.ParquetToSparkSchemaConverter.<init>(ParquetSchemaConverter.scala:70)
 ~[spark-sql_2.12-3.3.2.jar:3.3.2]
   at 
org.apache.spark.sql.execution.datasources.parquet.HoodieParquetFileFormatHelper$.buildImplicitSchemaChangeInfo(HoodieParquetFileFormatHelper.scala:30)
 ~[hudi-spark3.3-bundle_2.12-0.13.0.jar:3.3.2]
   at 
org.apache.spark.sql.execution.datasources.parquet.Spark32PlusHoodieParquetFileFormat.$anonfun$buildReaderWithPartitionValues$2(Spark32PlusHoodieParquetFileFormat.scala:231)
 ~[hudi-spark3.3-bundle_2.12-0.13.0.jar:3.3.2]
   at 
org.apache.hudi.HoodieDataSourceHelper$.$anonfun$buildHoodieParquetReader$1(HoodieDataSourceHelper.scala:71)
 ~[hudi-spark3.3-bundle_2.12-0.13.0.jar:0.13.0]
   at 
org.apache.hudi.HoodieBaseRelation.$anonfun$createBaseFileReader$1(HoodieBaseRelation.scala:554)
 ~[hudi-spark3.3-bundle_2.12-0.13.0.jar:0.13.0]
   at 
org.apache.hudi.HoodieBaseRelation$BaseFileReader.apply(HoodieBaseRelation.scala:613)
 ~[hudi-spark3.3-bundle_2.12-0.13.0.jar:0.13.0]
   at 
org.apache.hudi.HoodieMergeOnReadRDD.compute(HoodieMergeOnReadRDD.scala:87) 
~[hudi-spark3.3-bundle_2.12-0.13.0.jar:0.13.0]
   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365) 
~[spark-core_2.12-3.3.2.jar:3.3.2]
   at org.apache.spark.rdd.RDD.iterator(RDD.scala:329) 
~[spark-core_2.12-3.3.2.jar:3.3.2]
   at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) 
~[spark-core_2.12-3.3.2.jar:3.3.2]
   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365) 
~[spark-core_2.12-3.3.2.jar:3.3.2]
   at org.apache.spark.rdd.RDD.iterator(RDD.scala:329) 
~[spark-core_2.12-3.3.2.jar:3.3.2]
   at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) 
~[spark-core_2.12-3.3.2.jar:3.3.2]
   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365) 
~[spark-core_2.12-3.3.2.jar:3.3.2]
   at org.apache.spark.rdd.RDD.iterator(RDD.scala:329) 
~[spark-core_2.12-3.3.2.jar:3.3.2]
   at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) 
~[spark-core_2.12-3.3.2.jar:3.3.2]
   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365) 
~[spark-core_2.12-3.3.2.jar:3.3.2]
   at org.apache.spark.rdd.RDD.iterator(RDD.scala:329) 
~[spark-core_2.12-3.3.2.jar:3.3.2]
   at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) 
~[spark-core_2.12-3.3.2.jar:3.3.2]
   at org.apache.spark.scheduler.Task.run(Task.scala:136) 
~[spark-core_2.12-3.3.2.jar:3.3.2]
   at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
 ~[spark-core_2.12-3.3.2.jar:3.3.2]
   at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504) 
~[spark-core_2.12-3.3.2.jar:3.3.2]
   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551) 
~[spark-core_2.12-3.3.2.jar:3.3.2]
   at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144) 
~[?:?]
   at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642) 
~[?:?]
   at java.lang.Thread.run(Thread.java:1589) ~[?:?]
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to