Sam-Serpoosh commented on issue #8519:
URL: https://github.com/apache/hudi/issues/8519#issuecomment-1550008181

   @the-other-tim-brown I was going to post some updates about this but you 
beat me to it :smile:
   
   > My question is simply why are you having this issue when tens of other 
users/companies including myself are able to get debezium topics without the 
extra level of nesting.
   
   Others not complaining about the **nested** issue also made me suspicious 
that something else must be going on here, and this whole nested `Value` object 
**might** be a **red herring**. So here are some updates ...
   
   ### Our Hudi Version/Patch & What I Realized So Far
   
   We're currently using a FORK based off Hudi `0.11.1` with **some 
patches/commits** applied to it such as the following (this setup predates me):
   
   - 
[commit-1](https://github.com/rohitmittapalli/hudi/commit/b118df63eedceb50d9fa7045fea6f6ee44289e81)
   - 
[commit-2](https://github.com/rohitmittapalli/hudi/commit/55f1f18033f23ae4fb93e9c1c2983dff2d57f3ad)
   - 
[commit-3](https://github.com/rohitmittapalli/hudi/commit/7ffa56bc3d549289507dcf70e3fc1f6487cd4b90)
     - Related to `userProvidedSchemaProvider`
   - Etc.
   
   I rebuilt that fork and used the JAR to run my job. Then noticed the **NPE** 
was thrown due to **no schema-provider class** since I was **not** passing 
`--schemaprovider-class` CLI argument. So this line in `DeltaSync.java`, was 
throwing  the exception (understandably since that provider was NULL):
   
   ```java
   userProvidedSchemaProvider.refresh();
   ```
   
   I was **not** passing that argument to my run/spark-submit command due to 
[this 
comment](https://github.com/apache/hudi/issues/6348#issuecomment-1223742672) I 
stumbled upon the other day. However, that comment is no longer valid 
(presumably due to changes to Hudi ever since). So I added back this line to my 
command:
   
   ```bash
   --schemaprovider-class 
org.apache.hudi.utilities.schema.SchemaRegistryProvider
   ```
   
   ## IllegalArgumentException At This Point
   
   Then I hit the same issue that @sydneyhoran has posted about in #8521. So I 
manually applied the **same changes** that @ad1happy2go proposed in [this 
PR](https://github.com/apache/hudi/pull/7225/files) and that got me a bit 
further along.
   
   ## Not Even Reaching PostgresDebeziumSource
   
   As you see, the hypothesis on `PostgresDebeziumSource` and **nested Value 
object** was a **red herring** since we haven't even gotten that far in the 
execution path. With the aforementioned changes, I'm now seeing a new exception 
which is thrown by [this 
line](https://github.com/apache/hudi/blob/release-0.11.1/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/AvroConversionUtils.scala#L123)
 (apparently) and that logic is invoked by [this 
line](https://github.com/apache/hudi/blob/release-0.11.1/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/debezium/DebeziumSource.java#L157).
 Here's the exception details:
   
   ```
   Driver stacktrace:
     at 
org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2258)
     at 
org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2207)
     at 
org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2206)
     at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
     at 
scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
     at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
     at 
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2206)
     at 
org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1079)
     at 
org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1079)
     at scala.Option.foreach(Option.scala:407)
     at 
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1079)
     at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2445)
     at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2387)
     at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2376)
     at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
     at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:868)
     at org.apache.spark.SparkContext.runJob(SparkContext.scala:2196)
     at org.apache.spark.SparkContext.runJob(SparkContext.scala:2217)
     at org.apache.spark.SparkContext.runJob(SparkContext.scala:2236)
     at org.apache.spark.rdd.RDD.$anonfun$take$1(RDD.scala:1449)
     at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
     at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
     at org.apache.spark.rdd.RDD.withScope(RDD.scala:414)
     at org.apache.spark.rdd.RDD.take(RDD.scala:1422)
     at org.apache.spark.rdd.RDD.$anonfun$isEmpty$1(RDD.scala:1557)
     at scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23)
     at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
     at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
     at org.apache.spark.rdd.RDD.withScope(RDD.scala:414)
     at org.apache.spark.rdd.RDD.isEmpty(RDD.scala:1557)
     at 
org.apache.hudi.AvroConversionUtils$.createDataFrame(AvroConversionUtils.scala:123)
     at 
org.apache.hudi.AvroConversionUtils.createDataFrame(AvroConversionUtils.scala)
     at 
org.apache.hudi.utilities.sources.debezium.DebeziumSource.toDataset(DebeziumSource.java:169)
     at 
org.apache.hudi.utilities.sources.debezium.DebeziumSource.fetchNextBatch(DebeziumSource.java:134)
     at 
org.apache.hudi.utilities.sources.RowSource.fetchNewData(RowSource.java:43)
     at org.apache.hudi.utilities.sources.Source.fetchNext(Source.java:76)
     at 
org.apache.hudi.utilities.deltastreamer.SourceFormatAdapter.fetchNewDataInAvroFormat(SourceFormatAdapter.java:69)
     at 
org.apache.hudi.utilities.deltastreamer.DeltaSync.fetchFromSource(DeltaSync.java:465)
     at 
org.apache.hudi.utilities.deltastreamer.DeltaSync.readFromSource(DeltaSync.java:398)
     at 
org.apache.hudi.utilities.deltastreamer.DeltaSync.syncOnce(DeltaSync.java:303)
     at 
org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer$DeltaSyncService.lambda$startService$0(HoodieDeltaStreamer.java:679)
     ... 4 more
   Caused by: java.io.IOException: unexpected exception type
     at 
java.io.ObjectStreamClass.throwMiscException(ObjectStreamClass.java:1750)
     at java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClass.java:1280)
     at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2196)
     at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
     at java.io.ObjectInputStream.readArray(ObjectInputStream.java:2093)
     at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1655)
     at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2405)
     at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2329)
     at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2187)
     at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
     at java.io.ObjectInputStream.readArray(ObjectInputStream.java:2093)
     at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1655)
     at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2405)
     at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2329)
     at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2187)
     at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
     at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2405)
     at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2329)
     at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2187)
     at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
     at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2405)
     at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2329)
     at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2187)
     at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
     at java.io.ObjectInputStream.readObject(ObjectInputStream.java:503)
     at java.io.ObjectInputStream.readObject(ObjectInputStream.java:461)
     at 
scala.collection.immutable.List$SerializationProxy.readObject(List.scala:488)
     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
     at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
     at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
     at java.lang.reflect.Method.invoke(Method.java:498)
     at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1184)
     at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2296)
     at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2187)
     at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
     at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2405)
     at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2329)
     at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2187)
     at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
     at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2405)
     at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2329)
     at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2187)
     at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
     at java.io.ObjectInputStream.readObject(ObjectInputStream.java:503)
     at java.io.ObjectInputStream.readObject(ObjectInputStream.java:461)
     at 
org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:76)
     at 
org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:115)
     at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:83)
     at org.apache.spark.scheduler.Task.run(Task.scala:131)
     at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
     at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
     at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
     ... 3 more
   Caused by: java.lang.reflect.InvocationTargetException
     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
     at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
     at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
     java.lang.reflect.Method.invoke(Method.java:498)
     at java.lang.invoke.SerializedLambda.readResolve(SerializedLambda.java:230)
     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
     at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
     at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
     at java.lang.reflect.Method.invoke(Method.java:498)
     at java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClass.java:1274)
     ... 53 more
   Caused by: java.lang.IllegalArgumentException: Invalid lambda deserialization
     at 
org.apache.hudi.utilities.sources.debezium.DebeziumSource.$deserializeLambda$(DebeziumSource.java:64)
     ... 63 more
   ```
   
   ## Is This One Really Due To Avro Schema Now?!
   
   As I mentioned, the current Hudi setup we have is a tad strange/out-dated. 
We utilize `0.11.1` with some patches/commits applied to it. I **think** the 
best move on my end would be:
   
   - Build a JAR with **latest stable Hudi** (0.13.0) (assuming that's doable 
with our setup/ecosystem)
   - Try running the **same DeltaStreamer job** with this new JAR and see what 
happens
   - Post updates here ...
   
   P.S. For reference, this is my spark-submit command BTW:
   
   ```shell
   $ spark-submit \
   --jars "opt/spark/jars/hudi-utilities-bundle.jar,<other_jars>" \
   --master spark://<SPARK_MASTER_URL>:<PORT> \
   --total-executor-cores 1 \
   --executor-memory 4g \
   --conf spark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem \
   --conf spark.hadoop.fs.s3a.connection.maximum=10000 \
   --conf spark.scheduler.mode=FAIR \
   --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer 
opt/spark/jars/hudi-utilities-bundle.jar \
   --table-type COPY_ON_WRITE \
   --target-base-path s3a://path/to/directory \
   --target-table <TABLE_NAME> \
   --min-sync-interval-seconds 30 \
   --source-class 
org.apache.hudi.utilities.sources.debezium.PostgresDebeziumSource \
   --payload-class 
org.apache.hudi.common.model.debezium.PostgresDebeziumAvroPayload \
   --schemaprovider-class 
org.apache.hudi.utilities.schema.SchemaRegistryProvider \
   --source-ordering-field _event_lsn \
   --op UPSERT \
   --continuous \
   --source-limit 5000 \
   --hoodie-conf bootstrap.servers=<KAFKA_BOOTSTRAP_SERVER>:9092 \
   --hoodie-conf group.id=FOO-deltastreamer \
   --hoodie-conf schema.registry.url=http://<SCHEMA_REGISTRY_URL>:8081 \
   --hoodie-conf 
hoodie.deltastreamer.schemaprovider.registry.url=http://<SCHEMA_REGISTRY_URL>:8081/subjects/<KAFKA_TOPIC>-value/versions/<VERSION_NO>
 \
   --hoodie-conf 
hoodie.deltastreamer.source.kafka.value.deserializer.class=org.apache.hudi.utilities.deser.KafkaAvroSchemaDeserializer
 \
   --hoodie-conf hoodie.deltastreamer.source.kafka.topic=<KAFKA_TOPIC> \
   --hoodie-conf auto.offset.reset=earliest \
   --hoodie-conf 
hoodie.datasource.write.recordkey.field=<UPSTREAM_DB_PKEY_FIELD> \
   --hoodie-conf 
hoodie.datasource.write.partitionpath.field=<PARTITION_FIELD_NAME> \
   --hoodie-conf hoodie.datasource.write.hive_style_partitioning=true \
   --hoodie-conf hoodie.datasource.write.precombine.field=_event_lsn \
   --hoodie-conf hoodie.metadata.enable=true \
   --hoodie-conf hoodie.metadata.index.column.stats.enable=true \
   --hoodie-conf hoodie.parquet.small.file.limit=134217728
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to