seethb opened a new issue, #10128: URL: https://github.com/apache/hudi/issues/10128
Hi I have followed Hudi kafka connect instructions from this document https://github.com/apache/hudi/blob/master/hudi-kafka-connect/README.md and trying to setup a Hudi Sink connector in my local environment. after submitted connector config (shown below) and getting below error. curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '{ "name": "hudi-kafka-test", "config": { "bootstrap.servers": "172.18.0.5:9092", "connector.class": "org.apache.hudi.connect.HoodieSinkConnector", "tasks.max": "1", "topics": "hudi-test-topic", "hoodie.table.name": "hudi-test-topic", "key.converter": "org.apache.kafka.connect.storage.StringConverter", "value.converter": "org.apache.kafka.connect.storage.StringConverter", "value.converter.schemas.enable": "false", "hoodie.table.type": "MERGE_ON_READ", "hoodie.base.path": "file:///tmp/hoodie/hudi-test-topic", "hoodie.datasource.write.recordkey.field": "volume", "hoodie.datasource.write.partitionpath.field": "date", "hoodie.schemaprovider.class": "org.apache.hudi.schema.SchemaRegistryProvider", "hoodie.deltastreamer.schemaprovider.registry.url": "http://172.18.0.7:8081/subjects/hudi-test-topic-value/versions/latest", "hoodie.kafka.commit.interval.secs": 60 } }' SubscriptionState:399) [2023-11-15 11:05:12,065] INFO [hudi-kafka-test|task-0] New partitions added [hudi-test-topic-0] (org.apache.hudi.connect.HoodieSinkTask:150) [2023-11-15 11:05:12,065] INFO [hudi-kafka-test|task-0] Bootstrap task for connector hudi-kafka-test with id null with assignments [hudi-test-topic-0] part [hudi-test-topic-0] (org.apache.hudi.connect.HoodieSinkTask:185) [2023-11-15 11:05:12,067] INFO [hudi-kafka-test|task-0] Existing partitions deleted [hudi-test-topic-0] (org.apache.hudi.connect.HoodieSinkTask:156) [2023-11-15 11:05:12,067] ERROR [hudi-kafka-test|task-0] WorkerSinkTask{id=hudi-kafka-test-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:195) java.lang.NoClassDefFoundError: org/apache/hadoop/fs/FSDataInputStream at org.apache.hudi.config.metrics.HoodieMetricsConfig.lambda$static$0(HoodieMetricsConfig.java:72) at org.apache.hudi.common.config.HoodieConfig.setDefaultValue(HoodieConfig.java:86) at org.apache.hudi.common.config.HoodieConfig.lambda$setDefaults$2(HoodieConfig.java:139) at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183) at java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:175) at java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:175) at java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:948) at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482) at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472) at java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150) at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173) at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:485) at org.apache.hudi.common.config.HoodieConfig.setDefaults(HoodieConfig.java:135) at org.apache.hudi.config.metrics.HoodieMetricsConfig.access$100(HoodieMetricsConfig.java:44) at org.apache.hudi.config.metrics.HoodieMetricsConfig$Builder.build(HoodieMetricsConfig.java:185) at org.apache.hudi.config.HoodieWriteConfig$Builder.setDefaults(HoodieWriteConfig.java:2937) at org.apache.hudi.config.HoodieWriteConfig$Builder.build(HoodieWriteConfig.java:3052) at org.apache.hudi.config.HoodieWriteConfig$Builder.build(HoodieWriteConfig.java:3047) at org.apache.hudi.connect.writers.KafkaConnectTransactionServices.<init>(KafkaConnectTransactionServices.java:79) at org.apache.hudi.connect.transaction.ConnectTransactionCoordinator.<init>(ConnectTransactionCoordinator.java:88) at org.apache.hudi.connect.HoodieSinkTask.bootstrap(HoodieSinkTask.java:191) at org.apache.hudi.connect.HoodieSinkTask.open(HoodieSinkTask.java:151) at org.apache.kafka.connect.runtime.WorkerSinkTask.openPartitions(WorkerSinkTask.java:637) at org.apache.kafka.connect.runtime.WorkerSinkTask.access$1200(WorkerSinkTask.java:72) at org.apache.kafka.connect.runtime.WorkerSinkTask$HandleRebalance.onPartitionsAssigned(WorkerSinkTask.java:733) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.invokePartitionsAssigned(ConsumerCoordinator.java:311) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:460) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:460) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:371) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:542) at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1271) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1235) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1215) at org.apache.kafka.connect.runtime.WorkerSinkTask.pollConsumer(WorkerSinkTask.java:472) at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:328) at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:234) at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:203) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:750) Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.fs.FSDataInputStream at java.net.URLClassLoader.findClass(URLClassLoader.java:387) at java.lang.ClassLoader.loadClass(ClassLoader.java:418) at org.apache.kafka.connect.runtime.isolation.PluginClassLoader.loadClass(PluginClassLoader.java:103) at java.lang.ClassLoader.loadClass(ClassLoader.java:351) ... 45 more [2023-11-15 11:05:12,067] INFO [hudi-kafka-test|task-0] [Consumer clientId=connector-consumer-hudi-kafka-test-0, groupId=connect-hudi-kafka-test] Revoke previously assigned partitions hudi-test-topic-0 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:325) [2023-11-15 11:05:12,067] INFO [hudi-kafka-test|task-0] [Consumer clientId=connector-consumer-hudi-kafka-test-0, groupId=connect-hudi-kafka-test] Member connector-consumer-hudi-kafka-test-0-b692100a-19f6-4f93-b74d-b3aad60c73e0 sending LeaveGroup request to coordinator localhost:9092 (id: 2147483646 rack: null) due to the consumer is being closed (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:1106) [2023-11-15 11:05:12,068] INFO [hudi-kafka-test|task-0] [Consumer clientId=connector-consumer-hudi-kafka-test-0, groupId=connect-hudi-kafka-test] Resetting generation and member id due to: consumer pro-actively leaving the group (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:998) [2023-11-15 11:05:12,068] INFO [hudi-kafka-test|task-0] [Consumer clientId=connector-consumer-hudi-kafka-test-0, groupId=connect-hudi-kafka-test] Request joining group due to: consumer pro-actively leaving the group (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:1045) [2023-11-15 11:05:12,068] INFO [hudi-kafka-test|task-0] Metrics scheduler closed (org.apache.kafka.common.metrics.Metrics:659) [2023-11-15 11:05:12,068] INFO [hudi-kafka-test|task-0] Closing reporter org.apache.kafka.common.metrics.JmxReporter (org.apache.kafka.common.metrics.Metrics:663) [2023-11-15 11:05:12,068] INFO [hudi-kafka-test|task-0] Metrics reporters closed (org.apache.kafka.common.metrics.Metrics:669) [2023-11-15 11:05:12,069] INFO [hudi-kafka-test|task-0] App info kafka.consumer for connector-consumer-hudi-kafka-test-0 unregistered (org.apache.kafka.common.utils.AppInfoParser:83) [2023-11-15 11:09:48,011] INFO [AdminClient clientId=adminclient-8] Node -1 disconnected. (org.apache.kafka.clients.NetworkClient:935) [2023-11-15 11:13:48,152] INFO [Producer clientId=producer-1] Node -1 disconnected. (org.apache.kafka.clients.NetworkClient:935) [2023-11-15 11:13:48,160] INFO [Worker clientId=connect-1, groupId=hudi-connect-cluster] Node -1 disconnected. (org.apache.kafka.clients.NetworkClient:935) [2023-11-15 11:13:48,203] INFO [Producer clientId=producer-2] Node -1 disconnected. (org.apache.kafka.clients.NetworkClient:935) [2023-11-15 11:13:48,225] INFO [hudi-kafka-test|task-0] [Consumer clientId=consumer-hudi-control-group2183cefd-ff0b-4049-80c2-0eb9fcee21b1-4, groupId=hudi-control-group2183cefd-ff0b-4049-80c2-0eb9fcee21b1] Node -1 disconnected. (org.apache.kafka.clients.NetworkClient:935) [2023-11-15 11:13:48,282] INFO [Producer clientId=producer-3] Node -1 disconnected. (org.apache.kafka.clients.NetworkClient:935) [2023-11-15 11:13:48,294] INFO [Consumer clientId=consumer-hudi-connect-cluster-2, groupId=hudi-connect-cluster] Node -1 disconnected. (org.apache.kafka.clients.NetworkClient:935) [2023-11-15 11:13:48,294] INFO [Consumer clientId=consumer-hudi-connect-cluster-1, groupId=hudi-connect-cluster] Node -1 disconnected. (org.apache.kafka.clients.NetworkClient:935) [2023-11-15 11:13:48,294] INFO [Consumer clientId=consumer-hudi-connect-cluster-2, groupId=hudi-connect-cluster] Node 2147483646 disconnected. (org.apache.kafka.clients.NetworkClient:935) [2023-11-15 11:13:48,295] INFO [Consumer clientId=consumer-hudi-connect-cluster-1, groupId=hudi-connect-cluster] Node 2147483646 disconnected. (org.apache.kafka.clients.NetworkClient:935) [2023-11-15 11:13:48,295] INFO [Consumer clientId=consumer-hudi-connect-cluster-2, groupId=hudi-connect-cluster] Group coordinator localhost:9092 (id: 2147483646 rack: null) is unavailable or invalid due to cause: coordinator unavailable.isDisconnected: true. Rediscovery will be attempted. (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:942) [2023-11-15 11:13:48,295] INFO [Consumer clientId=consumer-hudi-connect-cluster-1, groupId=hudi-connect-cluster] Group coordinator localhost:9092 (id: 2147483646 rack: null) is unavailable or invalid due to cause: coordinator unavailable.isDisconnected: true. Rediscovery will be attempted. (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:942) [2023-11-15 11:13:48,383] INFO [hudi-kafka-test|task-0] [Producer clientId=producer-4] Node -1 disconnected. (org.apache.kafka.clients.NetworkClient:935) [2023-11-15 11:13:48,548] INFO [Consumer clientId=consumer-hudi-connect-cluster-3, groupId=hudi-connect-cluster] Node -1 disconnected. (org.apache.kafka.clients.NetworkClient:935) [2023-11-15 11:13:48,548] INFO [Consumer clientId=consumer-hudi-connect-cluster-3, groupId=hudi-connect-cluster] Node 2147483646 disconnected. (org.apache.kafka.clients.NetworkClient:935) [2023-11-15 11:13:48,548] INFO [Consumer clientId=consumer-hudi-connect-cluster-3, groupId=hudi-connect-cluster] Group coordinator localhost:9092 (id: 2147483646 rack: null) is unavailable or invalid due to cause: coordinator unavailable.isDisconnected: true. Rediscovery will be attempted. (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:942) [2023-11-15 11:13:48,796] INFO [Consumer clientId=consumer-hudi-connect-cluster-2, groupId=hudi-connect-cluster] Discovered group coordinator localhost:9092 (id: 2147483646 rack: null) (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:879) [2023-11-15 11:13:48,797] INFO [Consumer clientId=consumer-hudi-connect-cluster-1, groupId=hudi-connect-cluster] Discovered group coordinator localhost:9092 (id: 2147483646 rack: null) (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:879) [2023-11-15 11:13:49,050] INFO [Consumer clientId=consumer-hudi-connect-cluster-3, groupId=hudi-connect-cluster] Discovered group coordinator localhost:9092 (id: 2147483646 rack: null) (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:879) **To Reproduce** Steps to reproduce the behavior: 1. follow the same document linked above try with both the version of Hudi (0.14.0 or 0.13.1) 2. keep the connect-distributed.properties like below bootstrap.servers=localhost:9092 group.id=hudi-connect-cluster key.converter=org.apache.kafka.connect.json.JsonConverter value.converter=org.apache.kafka.connect.json.JsonConverter key.converter.schemas.enable=true value.converter.schemas.enable=true offset.storage.topic=connect-offsets offset.storage.replication.factor=1 config.storage.topic=connect-configs config.storage.replication.factor=1 status.storage.topic=connect-status status.storage.replication.factor=1 offset.flush.interval.ms=60000 listeners=HTTP://:8083 plugin.path=/usr/local/share/kafka/plugins 3. Please try the same hudi-test-topic given in the kafka demo https://github.com/apache/hudi/tree/master/hudi-kafka-connect/demo 4. Submit the kafka connector config mentioned above. **Expected behavior** It should work and I want to see the hudi table gets created in my local /tmp/hoodie/hudi-test/topic **Environment Description** * Hudi version : 0.14/0.13.1 * Spark version : spark-3.2.3 * Hive version : N/A * Hadoop version : n/A * Storage (HDFS/S3/GCS..) :Local * Running on Docker? (yes/no) : no **Additional context** I have tried both confluentinc-kafka-connect-hdfs-10.2.4 and confluentinc-kafka-connect-hdfs-10.1.0 JARS but the error is same. **Stacktrace** "id": 3, "state": "FAILED", "worker_id": "10.12.22.168:8083", "trace": "java.lang.NoClassDefFoundError: org/apache/hadoop/fs/FSDataInputStream\n\tat org.apache.hudi.config.metrics.HoodieMetricsConfig.lambda$static$0(HoodieMetricsConfig.java:73)\n\tat org.apache.hudi.common.config.HoodieConfig.setDefaultValue(HoodieConfig.java:88)\n\tat org.apache.hudi.common.config.HoodieConfig.lambda$setDefaults$2(HoodieConfig.java:130)\n\tat java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)\n\tat java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:175)\n\tat java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:175)\n\tat java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:948)\n\tat java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)\n\tat java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)\n\tat java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)\n\tat java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateS equential(ForEachOps.java:173)\n\tat java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)\n\tat java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:485)\n\tat org.apache.hudi.common.config.HoodieConfig.setDefaults(HoodieConfig.java:126)\n\tat org.apache.hudi.config.metrics.HoodieMetricsConfig.access$100(HoodieMetricsConfig.java:44)\n\tat org.apache.hudi.config.metrics.HoodieMetricsConfig$Builder.build(HoodieMetricsConfig.java:207)\n\tat org.apache.hudi.config.HoodieWriteConfig$Builder.setDefaults(HoodieWriteConfig.java:3095)\n\tat org.apache.hudi.config.HoodieWriteConfig$Builder.build(HoodieWriteConfig.java:3213)\n\tat org.apache.hudi.config.HoodieWriteConfig$Builder.build(HoodieWriteConfig.java:3208)\n\tat org.apache.hudi.connect.writers.KafkaConnectTransactionServices.<init>(KafkaConnectTransactionServices.java:79)\n\tat org.apache.hudi.connect.transaction.ConnectTransactionCoordinator.<init>(ConnectTransactionCoordinator.java:88)\n\tat org.apache.hu di.connect.HoodieSinkTask.bootstrap(HoodieSinkTask.java:191)\n\tat org.apache.hudi.connect.HoodieSinkTask.open(HoodieSinkTask.java:151)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.openPartitions(WorkerSinkTask.java:637)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.access$1200(WorkerSinkTask.java:72)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask$HandleRebalance.onPartitionsAssigned(WorkerSinkTask.java:733)\n\tat org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.invokePartitionsAssigned(ConsumerCoordinator.java:311)\n\tat org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:460)\n\tat org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:460)\n\tat org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:371)\n\tat org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCo ordinator.java:542)\n\tat org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1271)\n\tat org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1235)\n\tat org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1215)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.pollConsumer(WorkerSinkTask.java:472)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:328)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:234)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:203)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243)\n\tat java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\n\tat java.util.concurrent.FutureTask.run(FutureTask.java:266)\n\tat java.util.concurrent.ThreadPoolExecutor.runWor ker(ThreadPoolExecutor.java:1149)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat java.lang.Thread.run(Thread.java:750)\nCaused by: java.lang.ClassNotFoundException: org.apache.hadoop.fs.FSDataInputStream\n\tat java.net.URLClassLoader.findClass(URLClassLoader.java:387)\n\tat java.lang.ClassLoader.loadClass(ClassLoader.java:418)\n\tat org.apache.kafka.connect.runtime.isolation.PluginClassLoader.loadClass(PluginClassLoader.java:103)\n\tat java.lang.ClassLoader.loadClass(ClassLoader.java:351)\n\t... 45 more\n" } ], "type": "sink" -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org