vbogretsov opened a new issue, #11026: URL: https://github.com/apache/hudi/issues/11026
**_Tips before filing an issue_** - Have you gone through our [FAQs](https://hudi.apache.org/learn/faq/)? - Join the mailing list to engage in conversations and get faster support at dev-subscr...@hudi.apache.org. - If you have triaged this as a bug, then file an [issue](https://issues.apache.org/jira/projects/HUDI/issues) directly. **Describe the problem you faced** I'm getting dependency error on Spark Executor when run the `HoodieMultiTableStreamer` in Spark Operator in Kubernetes: ``` 24/04/15 19:22:20 ERROR HoodieMultiTableStreamer: error while running MultiTableDeltaStreamer for table: my_table1 org.apache.spark.SparkException: Job aborted due to stage failure: Task serialization failed: java.lang.NoSuchMethodError: 'void org.apache.hudi.common.util.HoodieCommonKryoRegistrar.registerClasses(com.esotericsoftware.kryo.Kryo)' java.lang.NoSuchMethodError: 'void org.apache.hudi.common.util.HoodieCommonKryoRegistrar.registerClasses(com.esotericsoftware.kryo.Kryo)' ... ``` **To Reproduce** Steps to reproduce the behavior: 1. `kubectl apply -f config.yaml` (provided bellow) 2. `kubectl -n dp2 logs -l app=hudidemo -f` (to get the logs) **Expected behavior** The mentioned dependency error does not appear in logs and does not cause Hudi Streamer to fail. **Environment Description** * Hudi version : 0.14.1 * Spark version : 3.4.0 * Hive version : I use AWSGlueSyncTool from AWS SDK * Hadoop version : 3.3.4 * Spark Operator version: v1beta2-1.3.7-3.1.1 * Kubernetes version: 1.29 AWS EKS * Storage (HDFS/S3/GCS..) : S3 * AWS SDK version: 1.12.682 * AWS SDK 2 version: 2.25.13 * Running on Docker? (yes/no) : yes **Additional context** I used the following Dockerfile to build the image `myimage:1.0.0`: ``` ARG BASE=apache/spark:3.4.0 ARG MVNROOT=https://maven-central-eu.storage-download.googleapis.com/maven2 FROM alpine:3.19 as aws-jars ARG MVNROOT ARG AWSSDK1=1.12.682 ARG AWSSDK2=2.25.13 WORKDIR /jars RUN wget ${MVNROOT}/com/amazonaws/aws-java-sdk-bundle/${AWSSDK1}/aws-java-sdk-bundle-${AWSSDK1}.jar RUN wget ${MVNROOT}/software/amazon/awssdk/bundle/${AWSSDK2}/bundle-${AWSSDK2}.jar FROM alpine:3.19 as hadoop-jars ARG MVNROOT ARG HADOOP=3.3.4 WORKDIR /jars RUN wget ${MVNROOT}/org/apache/hadoop/hadoop-common/${HADOOP}/hadoop-common-${HADOOP}.jar RUN wget ${MVNROOT}/org/apache/hadoop/hadoop-aws/${HADOOP}/hadoop-aws-${HADOOP}.jar RUN wget ${MVNROOT}/org/apache/hadoop/hadoop-mapreduce-client-core/${HADOOP}/hadoop-mapreduce-client-core-${HADOOP}.jar FROM alpine:3.19 as hudi-jars ARG MVNROOT ARG SCALA=2.12 ARG HUDI=0.14.1 WORKDIR /jars RUN wget ${MVNROOT}/org/apache/hudi/hudi-spark3.4-bundle_${SCALA}/${HUDI}/hudi-spark3.4-bundle_${SCALA}-${HUDI}.jar RUN wget ${MVNROOT}/org/apache/hudi/hudi-aws/${HUDI}/hudi-aws-${HUDI}.jar RUN wget ${MVNROOT}/org/apache/hudi/hudi-sync-common/${HUDI}/hudi-sync-common-${HUDI}.jar RUN wget ${MVNROOT}/org/apache/hudi/hudi-hive-sync-bundle/${HUDI}/hudi-hive-sync-bundle-${HUDI}.jar RUN wget ${MVNROOT}/org/apache/hudi/hudi-utilities-bundle_${SCALA}/${HUDI}/hudi-utilities-bundle_${SCALA}-${HUDI}.jar RUN wget ${MVNROOT}/org/apache/hudi/hudi-hadoop-mr-bundle/${HUDI}/hudi-hadoop-mr-bundle-${HUDI}.jar FROM ${BASE} as final COPY --from=aws-jars /jars /opt/spark/jars COPY --from=hadoop-jars /jars /opt/spark/jars COPY --from=hudi-jars /jars /opt/spark/jars ENV HOME=/opt/spark ENV PATH=/opt/spark/bin:$PATH ENV HUDI_CONF_DIR=/etc/hudi RUN mkdir -p /opt/spark/tmp ``` I can confirm this image works locally being executed in Docker Compose with exactly same command line arguments. My Spark Operator configuration is the following: ``` apiVersion: v1 kind: ConfigMap metadata: name: etc-hudi namespace: dp2 data: hudi-defaults.conf: | hoodie.upsert.shuffle.parallelism=8 hoodie.insert.shuffle.parallelism=8 hoodie.delete.shuffle.parallelism=8 hoodie.bulkinsert.shuffle.parallelism=8 base.properties: | hoodie.parquet.small.file.limit=16777216 hoodie.index.type=GLOBAL_BLOOM hoodie.bloom.index.update.partition.path=true hoodie.datasource.write.hive_style_partitioning=false hoodie.datasource.hive_sync.enable=true hoodie.datasource.hive_sync.database=hudidemo hoodie.datasource.hive_sync.partition_extractor_class=org.apache.hudi.hive.MultiPartKeysValueExtractor hoodie.datasource.hive_sync.use_jdbc=false hoodie.datasource.hive_sync.mode=hms hoodie.streamer.ingestion.tablesToBeIngested=myapp.my_table1,myapp.my_table2 hoodie.streamer.ingestion.myapp.my_table1.configFile=/etc/hudi/myapp.my_table1.properties hoodie.streamer.ingestion.myapp.my_table2.configFile=/etc/hudi/myapp.my_table2.properties myapp.my_table1.properties: | include=base.properties hoodie.datasource.write.keygenerator.type=SIMPLE hoodie.datasource.write.recordkey.field=id hoodie.datasource.write.precombine.field=__version hoodie.datasource.write.partitionpath.field=__gen hoodie.datasource.hive_sync.table=myapp_my_table1 hoodie.streamer.source.dfs.root=s3a://mybucket/hudidemo/raw/myapp/myapp.public.my_table1/ myapp.my_table2.properties: | include=base.properties hoodie.datasource.write.keygenerator.type=SIMPLE hoodie.datasource.write.recordkey.field=id hoodie.datasource.write.precombine.field=__version hoodie.datasource.write.partitionpath.field=__gen hoodie.datasource.hive_sync.table=myapp_my_table2 hoodie.streamer.source.dfs.root=s3a://mybucket/hudidemo/raw/myapp/myapp.public.my_table2/ --- apiVersion: v1 kind: ConfigMap metadata: name: dfs-props namespace: dp2 data: dfs-source.properties: | # empty --- apiVersion: "sparkoperator.k8s.io/v1beta2" kind: SparkApplication metadata: name: hudidemo namespace: dp2 spec: timeToLiveSeconds: 600 type: Java mode: cluster image: myimage:1.0.0 imagePullPolicy: Always imagePullSecrets: - gitlab-datalake sparkVersion: 3.4.0 mainApplicationFile: local:///opt/spark/jars/hudi-utilities-bundle_2.12-0.14.1.jar mainClass: org.apache.hudi.utilities.streamer.HoodieMultiTableStreamer arguments: - --props - /etc/hudi/base.properties - --config-folder - /etc/hudi - --base-path-prefix - s3a://mybucket/hudidemo/hudi/ - --table-type - COPY_ON_WRITE - --source-class - org.apache.hudi.utilities.sources.ParquetDFSSource - --source-ordering-field - __version - --op - UPSERT - --enable-sync - --sync-tool-classes - org.apache.hudi.aws.sync.AwsGlueCatalogSyncTool memoryOverheadFactor: "0.4" sparkConf: spark.kubernetes.file.upload.path: s3a://mybucket/_spark spark.serializer: org.apache.spark.serializer.KryoSerializer spark.kryo.registrator: org.apache.spark.HoodieSparkKryoRegistrar hadoopConf: fs.s3.impl: org.apache.hadoop.fs.s3a.S3AFileSystem fs.s3a.aws.credentials.provider: com.amazonaws.auth.WebIdentityTokenCredentialsProvider metastore.client.factory.class: com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory restartPolicy: type: Always dynamicAllocation: enabled: true minExecutors: 1 maxExecutors: 16 volumes: - name: etc-hudi configMap: name: etc-hudi - name: dfs-props configMap: name: dfs-props driver: image: myimage:1.0.0 annotations: "karpenter.sh/do-not-disrupt": "true" serviceAccount: hudidemo nodeSelector: nodetype: worker tolerations: - key: nodetype value: worker effect: NoSchedule labels: app: hudidemo version: 3.4.0 memory: 4g env: - name: "AWS_DEFAULT_REGION" value: my-region-1 - name: "AWS_REGION" value: my-region-1 - name: "AWS_WEB_IDENTITY_TOKEN_FILE" value: "/var/run/secrets/eks.amazonaws.com/serviceaccount/token" volumeMounts: - mountPath: /etc/hudi name: etc-hudi - mountPath: /opt/spark/work-dir/src/test/resources/streamer-config/dfs-source.properties name: dfs-props executor: image: myimage:1.0.0 annotations: "karpenter.sh/do-not-evict": "true" nodeSelector: nodetype: worker tolerations: - key: nodetype value: worker effect: NoSchedule labels: app: hudidemo version: 3.4.0 memory: 4g ``` I also tried to run without configuration parameters `spark.serializer`, `spark.kryo.registrator` but got same result. **Stacktrace** ``` 24/04/15 19:22:20 ERROR HoodieMultiTableStreamer: error while running MultiTableDeltaStreamer for table: my_table1 org.apache.spark.SparkException: Job aborted due to stage failure: Task serialization failed: java.lang.NoSuchMethodError: 'void org.apache.hudi.common.util.HoodieCommonKryoRegistrar.registerClasses(com.esotericsoftware.kryo.Kryo)' java.lang.NoSuchMethodError: 'void org.apache.hudi.common.util.HoodieCommonKryoRegistrar.registerClasses(com.esotericsoftware.kryo.Kryo)' at org.apache.spark.HoodieSparkKryoRegistrar.registerClasses(HoodieSparkKryoRegistrar.scala:53) at org.apache.spark.serializer.KryoSerializer.$anonfun$newKryo$8(KryoSerializer.scala:182) at org.apache.spark.serializer.KryoSerializer.$anonfun$newKryo$8$adapted(KryoSerializer.scala:182) at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at org.apache.spark.serializer.KryoSerializer.$anonfun$newKryo$5(KryoSerializer.scala:182) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at org.apache.spark.util.Utils$.withContextClassLoader(Utils.scala:240) at org.apache.spark.serializer.KryoSerializer.newKryo(KryoSerializer.scala:173) at org.apache.spark.serializer.KryoSerializer$$anon$1.create(KryoSerializer.scala:104) at com.esotericsoftware.kryo.pool.KryoPoolQueueImpl.borrow(KryoPoolQueueImpl.java:48) at org.apache.spark.serializer.KryoSerializer$PoolWrapper.borrow(KryoSerializer.scala:111) at org.apache.spark.serializer.KryoSerializerInstance.borrowKryo(KryoSerializer.scala:351) at org.apache.spark.serializer.KryoSerializationStream.<init>(KryoSerializer.scala:271) at org.apache.spark.serializer.KryoSerializerInstance.serializeStream(KryoSerializer.scala:437) at org.apache.spark.broadcast.TorrentBroadcast$.blockifyObject(TorrentBroadcast.scala:356) at org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:160) at org.apache.spark.broadcast.TorrentBroadcast.<init>(TorrentBroadcast.scala:99) at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:38) at org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:78) at org.apache.spark.SparkContext.broadcastInternal(SparkContext.scala:1548) at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1530) at org.apache.spark.scheduler.DAGScheduler.submitMissingTasks(DAGScheduler.scala:1535) at org.apache.spark.scheduler.DAGScheduler.submitStage(DAGScheduler.scala:1353) at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:1295) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2931) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2923) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2912) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49) at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2785) at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2721) at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2720) at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2720) at org.apache.spark.scheduler.DAGScheduler.submitMissingTasks(DAGScheduler.scala:1545) at org.apache.spark.scheduler.DAGScheduler.submitStage(DAGScheduler.scala:1353) at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:1295) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2931) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2923) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2912) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:971) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2263) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2284) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2303) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2328) at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1019) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:405) at org.apache.spark.rdd.RDD.collect(RDD.scala:1018) at org.apache.spark.sql.execution.datasources.SchemaMergeUtils$.mergeSchemasInParallel(SchemaMergeUtils.scala:73) at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$.mergeSchemasInParallel(ParquetFileFormat.scala:476) at org.apache.spark.sql.execution.datasources.parquet.ParquetUtils$.inferSchema(ParquetUtils.scala:132) at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat.inferSchema(ParquetFileFormat.scala:78) at org.apache.spark.sql.execution.datasources.DataSource.$anonfun$getOrInferFileFormatSchema$11(DataSource.scala:208) at scala.Option.orElse(Option.scala:447) at org.apache.spark.sql.execution.datasources.DataSource.getOrInferFileFormatSchema(DataSource.scala:205) at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:407) at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:229) at org.apache.spark.sql.DataFrameReader.$anonfun$load$2(DataFrameReader.scala:211) at scala.Option.getOrElse(Option.scala:189) at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211) at org.apache.spark.sql.DataFrameReader.parquet(DataFrameReader.scala:563) at org.apache.spark.sql.DataFrameReader.parquet(DataFrameReader.scala:562) at org.apache.hudi.utilities.sources.ParquetDFSSource.fromFiles(ParquetDFSSource.java:55) at org.apache.hudi.utilities.sources.ParquetDFSSource.lambda$fetchNextBatch$0(ParquetDFSSource.java:50) at org.apache.hudi.common.util.Option.map(Option.java:108) at org.apache.hudi.utilities.sources.ParquetDFSSource.fetchNextBatch(ParquetDFSSource.java:50) at org.apache.hudi.utilities.sources.RowSource.fetchNewData(RowSource.java:44) at org.apache.hudi.utilities.sources.Source.fetchNext(Source.java:76) at org.apache.hudi.utilities.streamer.SourceFormatAdapter.fetchNewDataInAvroFormat(SourceFormatAdapter.java:161) at org.apache.hudi.utilities.streamer.StreamSync.fetchNextBatchFromSource(StreamSync.java:629) at org.apache.hudi.utilities.streamer.StreamSync.fetchFromSourceAndPrepareRecords(StreamSync.java:525) at org.apache.hudi.utilities.streamer.StreamSync.readFromSource(StreamSync.java:498) at org.apache.hudi.utilities.streamer.StreamSync.syncOnce(StreamSync.java:404) at org.apache.hudi.utilities.streamer.HoodieStreamer$StreamSyncService.ingestOnce(HoodieStreamer.java:850) at org.apache.hudi.utilities.ingestion.HoodieIngestionService.startIngestion(HoodieIngestionService.java:72) at org.apache.hudi.common.util.Option.ifPresent(Option.java:97) at org.apache.hudi.utilities.streamer.HoodieStreamer.sync(HoodieStreamer.java:207) at org.apache.hudi.utilities.streamer.HoodieMultiTableStreamer.sync(HoodieMultiTableStreamer.java:456) at org.apache.hudi.utilities.streamer.HoodieMultiTableStreamer.main(HoodieMultiTableStreamer.java:281) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) at java.base/java.lang.reflect.Method.invoke(Unknown Source) at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52) at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:1020) at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:192) at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:215) at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:91) at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1111) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1120) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Caused by: java.lang.NoSuchMethodError: 'void org.apache.hudi.common.util.HoodieCommonKryoRegistrar.registerClasses(com.esotericsoftware.kryo.Kryo)' at org.apache.spark.HoodieSparkKryoRegistrar.registerClasses(HoodieSparkKryoRegistrar.scala:53) at org.apache.spark.serializer.KryoSerializer.$anonfun$newKryo$8(KryoSerializer.scala:182) at org.apache.spark.serializer.KryoSerializer.$anonfun$newKryo$8$adapted(KryoSerializer.scala:182) at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at org.apache.spark.serializer.KryoSerializer.$anonfun$newKryo$5(KryoSerializer.scala:182) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at org.apache.spark.util.Utils$.withContextClassLoader(Utils.scala:240) at org.apache.spark.serializer.KryoSerializer.newKryo(KryoSerializer.scala:173) at org.apache.spark.serializer.KryoSerializer$$anon$1.create(KryoSerializer.scala:104) at com.esotericsoftware.kryo.pool.KryoPoolQueueImpl.borrow(KryoPoolQueueImpl.java:48) at org.apache.spark.serializer.KryoSerializer$PoolWrapper.borrow(KryoSerializer.scala:111) at org.apache.spark.serializer.KryoSerializerInstance.borrowKryo(KryoSerializer.scala:351) at org.apache.spark.serializer.KryoSerializationStream.<init>(KryoSerializer.scala:271) at org.apache.spark.serializer.KryoSerializerInstance.serializeStream(KryoSerializer.scala:437) at org.apache.spark.broadcast.TorrentBroadcast$.blockifyObject(TorrentBroadcast.scala:356) at org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:160) at org.apache.spark.broadcast.TorrentBroadcast.<init>(TorrentBroadcast.scala:99) at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:38) at org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:78) at org.apache.spark.SparkContext.broadcastInternal(SparkContext.scala:1548) at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1530) at org.apache.spark.scheduler.DAGScheduler.submitMissingTasks(DAGScheduler.scala:1535) at org.apache.spark.scheduler.DAGScheduler.submitStage(DAGScheduler.scala:1353) at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:1295) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2931) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2923) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2912) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org