rmnlchh opened a new issue, #9282: URL: https://github.com/apache/hudi/issues/9282
As part of our pipelines, we use tables that are being deltastreamed. Trying to upgrade to EMR 6.11 (which bring hudi 0.13.0/spark 3.3.2) we started facing issue which is discussed in https://github.com/apache/hudi/issues/8061#issuecomment-1447657892 The fix with sc.set("spark.sql.legacy.parquet.nanosAsLong", "false"); sc.set("spark.sql.parquet.binaryAsString", "false"); sc.set("spark.sql.parquet.int96AsTimestamp", "true"); sc.set("spark.sql.caseSensitive", "false"); worked for all the cases except for those where we query delta streamed tables. Steps to reproduce the behavior: 1. Use hudi 0.13.0, spark 3.3.2 2. Used spark configs: spark.shuffle.spill.compress -> true spark.serializer -> org.apache.spark.serializer.KryoSerializer spark.sql.warehouse.dir -> file:/XXX/cdp-datapipeline-curation/datalake-deltastreamer/spark-warehouse spark.sql.parquet.int96AsTimestamp -> true spark.io.compression.lz4.blockSize -> 64k spark.executor.extraJavaOptions -> -XX:+IgnoreUnrecognizedVMOptions --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.lang.invoke=ALL-UNNAMED --add-opens=java.base/java.lang.reflect=ALL-UNNAMED --add-opens=java.base/java.io=ALL-UNNAMED --add-opens=java.base/java.net=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.util.concurrent=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED --add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/sun.nio.cs=ALL-UNNAMED --add-opens=java.base/sun.security.action=ALL-UNNAMED --add-opens=java.base/sun.util.calendar=ALL-UNNAMED --add-opens=java.security.jgss/sun.security.krb5=ALL-UNNAMED spark.driver.host -> 127.0.0.1 spark.sql.hive.convertMetastoreParquet -> false spark.broadcast.compress -> true spark.io.compression.codec -> snappy spark.sql.adaptive.skewJoin.enabled -> true spark.sql.parquet.binaryAsString -> false spark.driver.port -> 36083 spark.rdd.compress -> true spark.io.compression.zstd.level -> 1 spark.sql.caseSensitive -> false spark.shuffle.compress -> true spark.io.compression.zstd.bufferSize -> 64k spark.sql.catalog -> org.apache.spark.sql.hudi.catalog.HoodieCatalog spark.sql.parquet.int96RebaseModeInRead -> LEGACY spark.memory.storageFraction -> 0.20 spark.app.name -> CreativeDeltaStreamerTest-creative-deltastreamer-1689954313 spark.sql.parquet.datetimeRebaseModeInWrite -> LEGACY spark.sql.parquet.outputTimestampType -> TIMESTAMP_MICROS spark.sql.avro.datetimeRebaseModeInWrite -> LEGACY spark.sql.avro.compression.codec -> snappy spark.sql.legacy.parquet.nanosAsLong -> false spark.sql.extension -> org.apache.spark.sql.hudi.HoodieSparkSessionExtension spark.app.startTime -> 1689968713919 spark.executor.id -> driver spark.sql.parquet.enableVectorizedReader -> true spark.sql.legacy.timeParserPolicy -> LEGACY spark.driver.extraJavaOptions -> -XX:+IgnoreUnrecognizedVMOptions --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.lang.invoke=ALL-UNNAMED --add-opens=java.base/java.lang.reflect=ALL-UNNAMED --add-opens=java.base/java.io=ALL-UNNAMED --add-opens=java.base/java.net=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.util.concurrent=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED --add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/sun.nio.cs=ALL-UNNAMED --add-opens=java.base/sun.security.action=ALL-UNNAMED --add-opens=java.base/sun.util.calendar=ALL-UNNAMED --add-opens=java.security.jgss/sun.security.krb5=ALL-UNNAMED spark.sql.parquet.datetimeRebaseModeInRead -> LEGACY spark.driver.memoryOverheadFactor -> 0.15 spark.master -> local[*] spark.sql.parquet.filterPushdown -> true spark.executor.cores -> 1 spark.memory.fraction -> 0.50 spark.sql.avro.datetimeRebaseModeInRead -> LEGACY spark.executor.memoryOverheadFactor -> 0.20 spark.sql.parquet.compression.codec -> snappy spark.sql.parquet.recordLevelFilter.enabled -> true spark.app.id -> local-1689968714613 3. Used Delta streamer configs hoodie.datasource.hive_sync.database -> datalake_ods_local hoodie.datasource.hive_sync.support_timestamp -> true hoodie.datasource.write.precombine.field -> StartDateUtc hoodie.datasource.hive_sync.partition_fields -> CampaignId hoodie.metadata.index.column.stats.enable -> true hoodie.cleaner.fileversions.retained -> 2 hoodie.parquet.max.file.size -> 6291456 hoodie.datasource.write.keygenerator.consistent.logical.timestamp.enabled -> true hoodie.bloom.index.prune.by.ranges -> true hoodie.parquet.block.size -> 6291456 hoodie.metadata.enable -> true hoodie.datasource.hive_sync.table -> published_ad hoodie.index.type -> BLOOM hoodie.parquet.compression.codec -> snappy hoodie.datasource.write.recordkey.field -> AdId hoodie.table.name -> published_ad hoodie.datasource.write.hive_style_partitioning -> true hoodie.datasource.meta.sync.base.path -> /XXXX/cdp-datapipeline-curation/datalake-deltastreamer/./tmp/AdDeltaStreamerTest/Domain=CampaignBuild/Table=published_ad/ hoodie.metadata.insert.parallelism -> 1 hoodie.enable.data.skipping -> false hoodie.metadata.index.column.stats.parallelism -> 1 hoodie.datasource.write.keygenerator.class -> org.apache.hudi.keygen.SimpleKeyGenerator hoodie.meta.sync.client.tool.class -> org.apache.hudi.aws.sync.AwsGlueCatalogSyncTool hoodie.datasource.write.partitionpath.field -> CampaignId hoodie.index.bloom.num_entries -> 60000 4. delta stream a table locally 5. try reading it using like: println(s"READ CONFIG: ${readConfigurations.mkString("\n")}") val df = spark.read.format("hudi") .options(readConfigurations) .load(basePath) println(s"Querying hudi table: ${query}") df.createOrReplaceTempView(tempViewName) val selectedDF = spark.sql(query) the readConfigurations are: READ CONFIG: hoodie.datasource.hive_sync.database -> datalake_ods_local hoodie.datasource.hive_sync.support_timestamp -> true hoodie.datasource.write.precombine.field -> StartDateUtc hoodie.datasource.hive_sync.partition_fields -> CampaignId hoodie.metadata.index.column.stats.enable -> true hoodie.cleaner.fileversions.retained -> 2 hoodie.parquet.max.file.size -> 6291456 hoodie.datasource.write.keygenerator.consistent.logical.timestamp.enabled -> true hoodie.bloom.index.prune.by.ranges -> true hoodie.parquet.block.size -> 6291456 hoodie.metadata.enable -> true hoodie.datasource.hive_sync.table -> published_ad hoodie.index.type -> BLOOM hoodie.parquet.compression.codec -> snappy hoodie.datasource.write.recordkey.field -> AdId hoodie.table.name -> published_ad hoodie.datasource.write.hive_style_partitioning -> true hoodie.datasource.meta.sync.base.path -> /XXXX/cdp-datapipeline-curation/cdp-datapipeline-curation/datalake-deltastreamer/./tmp/AdDeltaStreamerTest/Domain=CampaignBuild/Table=published_ad/ hoodie.metadata.insert.parallelism -> 1 hoodie.enable.data.skipping -> false hoodie.metadata.index.column.stats.parallelism -> 1 hoodie.datasource.write.keygenerator.class -> org.apache.hudi.keygen.SimpleKeyGenerator hoodie.meta.sync.client.tool.class -> org.apache.hudi.aws.sync.AwsGlueCatalogSyncTool hoodie.datasource.write.partitionpath.field -> CampaignId hoodie.index.bloom.num_entries -> 60000 spark.sql is where it will fail, same behaviour on .show I expected the table to be read successfully and have an ability to execute actions on the df. **Environment Description** * Hudi version : 0.13.0 * Spark version : 3.3.2 * Hive version : spark_hive:3.3.2 (hive 2.3.9 I suppose) * Hadoop version : 3.3.3 * Storage (HDFS/S3/GCS..) : Local * Running on Docker? (yes/no) : no **Stacktrace** Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2672) at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2608) at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2607) at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2607) at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1182) at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1182) at scala.Option.foreach(Option.scala:407) ... Cause: java.lang.IllegalArgumentException: For input string: "null" at scala.collection.immutable.StringLike.parseBoolean(StringLike.scala:330) at scala.collection.immutable.StringLike.toBoolean(StringLike.scala:289) at scala.collection.immutable.StringLike.toBoolean$(StringLike.scala:289) at scala.collection.immutable.StringOps.toBoolean(StringOps.scala:33) at org.apache.spark.sql.execution.datasources.parquet.ParquetToSparkSchemaConverter.<init>(ParquetSchemaConverter.scala:70) at org.apache.spark.sql.execution.datasources.parquet.HoodieParquetFileFormatHelper$.buildImplicitSchemaChangeInfo(HoodieParquetFileFormatHelper.scala:30) at org.apache.spark.sql.execution.datasources.parquet.Spark32PlusHoodieParquetFileFormat.$anonfun$buildReaderWithPartitionValues$2(Spark32PlusHoodieParquetFileFormat.scala:231) at org.apache.hudi.HoodieDataSourceHelper$.$anonfun$buildHoodieParquetReader$1(HoodieDataSourceHelper.scala:71) at org.apache.hudi.HoodieBaseRelation.$anonfun$createBaseFileReader$1(HoodieBaseRelation.scala:554) at org.apache.hudi.HoodieBaseRelation$BaseFileReader.apply(HoodieBaseRelation.scala:613) ... [Executor task launch worker for task 1.0 in stage 85.0 (TID 123)] ERROR org.apache.spark.executor.Executor - Exception in task 1.0 in stage 85.0 (TID 123) ava.lang.IllegalArgumentException: For input string: "null" at scala.collection.immutable.StringLike.parseBoolean(StringLike.scala:330) ~[scala-library-2.12.17.jar:?] at scala.collection.immutable.StringLike.toBoolean(StringLike.scala:289) ~[scala-library-2.12.17.jar:?] at scala.collection.immutable.StringLike.toBoolean$(StringLike.scala:289) ~[scala-library-2.12.17.jar:?] at scala.collection.immutable.StringOps.toBoolean(StringOps.scala:33) ~[scala-library-2.12.17.jar:?] at org.apache.spark.sql.execution.datasources.parquet.ParquetToSparkSchemaConverter.<init>(ParquetSchemaConverter.scala:70) ~[spark-sql_2.12-3.3.2.jar:3.3.2] at org.apache.spark.sql.execution.datasources.parquet.HoodieParquetFileFormatHelper$.buildImplicitSchemaChangeInfo(HoodieParquetFileFormatHelper.scala:30) ~[hudi-spark3.3-bundle_2.12-0.13.0.jar:3.3.2] at org.apache.spark.sql.execution.datasources.parquet.Spark32PlusHoodieParquetFileFormat.$anonfun$buildReaderWithPartitionValues$2(Spark32PlusHoodieParquetFileFormat.scala:231) ~[hudi-spark3.3-bundle_2.12-0.13.0.jar:3.3.2] at org.apache.hudi.HoodieDataSourceHelper$.$anonfun$buildHoodieParquetReader$1(HoodieDataSourceHelper.scala:71) ~[hudi-spark3.3-bundle_2.12-0.13.0.jar:0.13.0] at org.apache.hudi.HoodieBaseRelation.$anonfun$createBaseFileReader$1(HoodieBaseRelation.scala:554) ~[hudi-spark3.3-bundle_2.12-0.13.0.jar:0.13.0] at org.apache.hudi.HoodieBaseRelation$BaseFileReader.apply(HoodieBaseRelation.scala:613) ~[hudi-spark3.3-bundle_2.12-0.13.0.jar:0.13.0] at org.apache.hudi.HoodieMergeOnReadRDD.compute(HoodieMergeOnReadRDD.scala:87) ~[hudi-spark3.3-bundle_2.12-0.13.0.jar:0.13.0] at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365) ~[spark-core_2.12-3.3.2.jar:3.3.2] at org.apache.spark.rdd.RDD.iterator(RDD.scala:329) ~[spark-core_2.12-3.3.2.jar:3.3.2] at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) ~[spark-core_2.12-3.3.2.jar:3.3.2] at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365) ~[spark-core_2.12-3.3.2.jar:3.3.2] at org.apache.spark.rdd.RDD.iterator(RDD.scala:329) ~[spark-core_2.12-3.3.2.jar:3.3.2] at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) ~[spark-core_2.12-3.3.2.jar:3.3.2] at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365) ~[spark-core_2.12-3.3.2.jar:3.3.2] at org.apache.spark.rdd.RDD.iterator(RDD.scala:329) ~[spark-core_2.12-3.3.2.jar:3.3.2] at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) ~[spark-core_2.12-3.3.2.jar:3.3.2] at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365) ~[spark-core_2.12-3.3.2.jar:3.3.2] at org.apache.spark.rdd.RDD.iterator(RDD.scala:329) ~[spark-core_2.12-3.3.2.jar:3.3.2] at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) ~[spark-core_2.12-3.3.2.jar:3.3.2] at org.apache.spark.scheduler.Task.run(Task.scala:136) ~[spark-core_2.12-3.3.2.jar:3.3.2] at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548) ~[spark-core_2.12-3.3.2.jar:3.3.2] at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504) ~[spark-core_2.12-3.3.2.jar:3.3.2] at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551) ~[spark-core_2.12-3.3.2.jar:3.3.2] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144) ~[?:?] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642) ~[?:?] at java.lang.Thread.run(Thread.java:1589) ~[?:?] -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org