Sam-Serpoosh commented on issue #8519: URL: https://github.com/apache/hudi/issues/8519#issuecomment-1550008181
@the-other-tim-brown I was going to post some updates about this but you beat me to it :smile: > My question is simply why are you having this issue when tens of other users/companies including myself are able to get debezium topics without the extra level of nesting. Others not complaining about the **nested** issue also made me suspicious that something else must be going on here, and this whole nested `Value` object **might** be a **red herring**. So here are some updates ... ### Our Hudi Version/Patch & What I Realized So Far We're currently using a FORK based off Hudi `0.11.1` with **some patches/commits** applied to it such as the following (this setup predates me): - [commit-1](https://github.com/rohitmittapalli/hudi/commit/b118df63eedceb50d9fa7045fea6f6ee44289e81) - [commit-2](https://github.com/rohitmittapalli/hudi/commit/55f1f18033f23ae4fb93e9c1c2983dff2d57f3ad) - [commit-3](https://github.com/rohitmittapalli/hudi/commit/7ffa56bc3d549289507dcf70e3fc1f6487cd4b90) - Related to `userProvidedSchemaProvider` - Etc. I rebuilt that fork and used the JAR to run my job. Then noticed the **NPE** was thrown due to **no schema-provider class** since I was **not** passing `--schemaprovider-class` CLI argument. So this line in `DeltaSync.java`, was throwing the exception (understandably since that provider was NULL): ```java userProvidedSchemaProvider.refresh(); ``` I was **not** passing that argument to my run/spark-submit command due to [this comment](https://github.com/apache/hudi/issues/6348#issuecomment-1223742672) I stumbled upon the other day. However, that comment is no longer valid (presumably due to changes to Hudi ever since). So I added back this line to my command: ```bash --schemaprovider-class org.apache.hudi.utilities.schema.SchemaRegistryProvider ``` ## IllegalArgumentException At This Point Then I hit the same issue that @sydneyhoran has posted about in #8521. So I manually applied the **same changes** that @ad1happy2go proposed in [this PR](https://github.com/apache/hudi/pull/7225/files) and that got me a bit further along. ## Not Even Reaching PostgresDebeziumSource As you see, the hypothesis on `PostgresDebeziumSource` and **nested Value object** was a **red herring** since we haven't even gotten that far in the execution path. With the aforementioned changes, I'm now seeing a new exception which is thrown by [this line](https://github.com/apache/hudi/blob/release-0.11.1/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/AvroConversionUtils.scala#L123) (apparently) and that logic is invoked by [this line](https://github.com/apache/hudi/blob/release-0.11.1/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/debezium/DebeziumSource.java#L157). Here's the exception details: ``` Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2258) at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2207) at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2206) at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2206) at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1079) at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1079) at scala.Option.foreach(Option.scala:407) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1079) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2445) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2387) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2376) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:868) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2196) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2217) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2236) at org.apache.spark.rdd.RDD.$anonfun$take$1(RDD.scala:1449) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:414) at org.apache.spark.rdd.RDD.take(RDD.scala:1422) at org.apache.spark.rdd.RDD.$anonfun$isEmpty$1(RDD.scala:1557) at scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:414) at org.apache.spark.rdd.RDD.isEmpty(RDD.scala:1557) at org.apache.hudi.AvroConversionUtils$.createDataFrame(AvroConversionUtils.scala:123) at org.apache.hudi.AvroConversionUtils.createDataFrame(AvroConversionUtils.scala) at org.apache.hudi.utilities.sources.debezium.DebeziumSource.toDataset(DebeziumSource.java:169) at org.apache.hudi.utilities.sources.debezium.DebeziumSource.fetchNextBatch(DebeziumSource.java:134) at org.apache.hudi.utilities.sources.RowSource.fetchNewData(RowSource.java:43) at org.apache.hudi.utilities.sources.Source.fetchNext(Source.java:76) at org.apache.hudi.utilities.deltastreamer.SourceFormatAdapter.fetchNewDataInAvroFormat(SourceFormatAdapter.java:69) at org.apache.hudi.utilities.deltastreamer.DeltaSync.fetchFromSource(DeltaSync.java:465) at org.apache.hudi.utilities.deltastreamer.DeltaSync.readFromSource(DeltaSync.java:398) at org.apache.hudi.utilities.deltastreamer.DeltaSync.syncOnce(DeltaSync.java:303) at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer$DeltaSyncService.lambda$startService$0(HoodieDeltaStreamer.java:679) ... 4 more Caused by: java.io.IOException: unexpected exception type at java.io.ObjectStreamClass.throwMiscException(ObjectStreamClass.java:1750) at java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClass.java:1280) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2196) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667) at java.io.ObjectInputStream.readArray(ObjectInputStream.java:2093) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1655) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2405) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2329) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2187) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667) at java.io.ObjectInputStream.readArray(ObjectInputStream.java:2093) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1655) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2405) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2329) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2187) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2405) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2329) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2187) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2405) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2329) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2187) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:503) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:461) at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:488) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1184) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2296) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2187) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2405) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2329) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2187) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2405) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2329) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2187) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:503) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:461) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:76) at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:115) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:83) at org.apache.spark.scheduler.Task.run(Task.scala:131) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500) ... 3 more Caused by: java.lang.reflect.InvocationTargetException at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) java.lang.reflect.Method.invoke(Method.java:498) at java.lang.invoke.SerializedLambda.readResolve(SerializedLambda.java:230) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClass.java:1274) ... 53 more Caused by: java.lang.IllegalArgumentException: Invalid lambda deserialization at org.apache.hudi.utilities.sources.debezium.DebeziumSource.$deserializeLambda$(DebeziumSource.java:64) ... 63 more ``` ## Is This One Really Due To Avro Schema Now?! As I mentioned, the current Hudi setup we have is a tad strange/out-dated. We utilize `0.11.1` with some patches/commits applied to it. I **think** the best move on my end would be: - Build a JAR with **latest stable Hudi** (0.13.0) (assuming that's doable with our setup/ecosystem) - Try running the **same DeltaStreamer job** with this new JAR and see what happens - Post updates here ... P.S. For reference, this is my spark-submit command BTW: ```shell $ spark-submit \ --jars "opt/spark/jars/hudi-utilities-bundle.jar,<other_jars>" \ --master spark://<SPARK_MASTER_URL>:<PORT> \ --total-executor-cores 1 \ --executor-memory 4g \ --conf spark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem \ --conf spark.hadoop.fs.s3a.connection.maximum=10000 \ --conf spark.scheduler.mode=FAIR \ --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer opt/spark/jars/hudi-utilities-bundle.jar \ --table-type COPY_ON_WRITE \ --target-base-path s3a://path/to/directory \ --target-table <TABLE_NAME> \ --min-sync-interval-seconds 30 \ --source-class org.apache.hudi.utilities.sources.debezium.PostgresDebeziumSource \ --payload-class org.apache.hudi.common.model.debezium.PostgresDebeziumAvroPayload \ --schemaprovider-class org.apache.hudi.utilities.schema.SchemaRegistryProvider \ --source-ordering-field _event_lsn \ --op UPSERT \ --continuous \ --source-limit 5000 \ --hoodie-conf bootstrap.servers=<KAFKA_BOOTSTRAP_SERVER>:9092 \ --hoodie-conf group.id=FOO-deltastreamer \ --hoodie-conf schema.registry.url=http://<SCHEMA_REGISTRY_URL>:8081 \ --hoodie-conf hoodie.deltastreamer.schemaprovider.registry.url=http://<SCHEMA_REGISTRY_URL>:8081/subjects/<KAFKA_TOPIC>-value/versions/<VERSION_NO> \ --hoodie-conf hoodie.deltastreamer.source.kafka.value.deserializer.class=org.apache.hudi.utilities.deser.KafkaAvroSchemaDeserializer \ --hoodie-conf hoodie.deltastreamer.source.kafka.topic=<KAFKA_TOPIC> \ --hoodie-conf auto.offset.reset=earliest \ --hoodie-conf hoodie.datasource.write.recordkey.field=<UPSTREAM_DB_PKEY_FIELD> \ --hoodie-conf hoodie.datasource.write.partitionpath.field=<PARTITION_FIELD_NAME> \ --hoodie-conf hoodie.datasource.write.hive_style_partitioning=true \ --hoodie-conf hoodie.datasource.write.precombine.field=_event_lsn \ --hoodie-conf hoodie.metadata.enable=true \ --hoodie-conf hoodie.metadata.index.column.stats.enable=true \ --hoodie-conf hoodie.parquet.small.file.limit=134217728 ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org