seethb opened a new issue, #10128:
URL: https://github.com/apache/hudi/issues/10128

   Hi I have followed Hudi kafka connect instructions from this document 
https://github.com/apache/hudi/blob/master/hudi-kafka-connect/README.md and 
trying to setup a Hudi Sink connector in my local environment. 
   
   after submitted connector config (shown below) and getting below error.
   curl -i -X POST -H "Accept:application/json" -H 
"Content-Type:application/json" localhost:8083/connectors/ -d '{
       "name": "hudi-kafka-test",
       "config": {
                   "bootstrap.servers": "172.18.0.5:9092",
                   "connector.class": 
"org.apache.hudi.connect.HoodieSinkConnector",
                   "tasks.max": "1",
                   "topics": "hudi-test-topic",
                   "hoodie.table.name": "hudi-test-topic",
                   "key.converter": 
"org.apache.kafka.connect.storage.StringConverter",
                   "value.converter": 
"org.apache.kafka.connect.storage.StringConverter",
                   "value.converter.schemas.enable": "false",
                   "hoodie.table.type": "MERGE_ON_READ",
                   "hoodie.base.path": "file:///tmp/hoodie/hudi-test-topic",
                   "hoodie.datasource.write.recordkey.field": "volume",
                   "hoodie.datasource.write.partitionpath.field": "date",
                   "hoodie.schemaprovider.class": 
"org.apache.hudi.schema.SchemaRegistryProvider",
                   "hoodie.deltastreamer.schemaprovider.registry.url": 
"http://172.18.0.7:8081/subjects/hudi-test-topic-value/versions/latest";,
                   "hoodie.kafka.commit.interval.secs": 60
         }
   }'
   
   SubscriptionState:399)
   [2023-11-15 11:05:12,065] INFO [hudi-kafka-test|task-0] New partitions added 
[hudi-test-topic-0] (org.apache.hudi.connect.HoodieSinkTask:150)
   [2023-11-15 11:05:12,065] INFO [hudi-kafka-test|task-0] Bootstrap task for 
connector hudi-kafka-test with id null with assignments [hudi-test-topic-0] 
part [hudi-test-topic-0] (org.apache.hudi.connect.HoodieSinkTask:185)
   [2023-11-15 11:05:12,067] INFO [hudi-kafka-test|task-0] Existing partitions 
deleted [hudi-test-topic-0] (org.apache.hudi.connect.HoodieSinkTask:156)
   [2023-11-15 11:05:12,067] ERROR [hudi-kafka-test|task-0] 
WorkerSinkTask{id=hudi-kafka-test-0} Task threw an uncaught and unrecoverable 
exception. Task is being killed and will not recover until manually restarted 
(org.apache.kafka.connect.runtime.WorkerTask:195)
   java.lang.NoClassDefFoundError: org/apache/hadoop/fs/FSDataInputStream
        at 
org.apache.hudi.config.metrics.HoodieMetricsConfig.lambda$static$0(HoodieMetricsConfig.java:72)
        at 
org.apache.hudi.common.config.HoodieConfig.setDefaultValue(HoodieConfig.java:86)
        at 
org.apache.hudi.common.config.HoodieConfig.lambda$setDefaults$2(HoodieConfig.java:139)
        at 
java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)
        at 
java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:175)
        at 
java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:175)
        at 
java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:948)
        at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
        at 
java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
        at 
java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)
        at 
java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
        at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
        at 
java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:485)
        at 
org.apache.hudi.common.config.HoodieConfig.setDefaults(HoodieConfig.java:135)
        at 
org.apache.hudi.config.metrics.HoodieMetricsConfig.access$100(HoodieMetricsConfig.java:44)
        at 
org.apache.hudi.config.metrics.HoodieMetricsConfig$Builder.build(HoodieMetricsConfig.java:185)
        at 
org.apache.hudi.config.HoodieWriteConfig$Builder.setDefaults(HoodieWriteConfig.java:2937)
        at 
org.apache.hudi.config.HoodieWriteConfig$Builder.build(HoodieWriteConfig.java:3052)
        at 
org.apache.hudi.config.HoodieWriteConfig$Builder.build(HoodieWriteConfig.java:3047)
        at 
org.apache.hudi.connect.writers.KafkaConnectTransactionServices.<init>(KafkaConnectTransactionServices.java:79)
        at 
org.apache.hudi.connect.transaction.ConnectTransactionCoordinator.<init>(ConnectTransactionCoordinator.java:88)
        at 
org.apache.hudi.connect.HoodieSinkTask.bootstrap(HoodieSinkTask.java:191)
        at org.apache.hudi.connect.HoodieSinkTask.open(HoodieSinkTask.java:151)
        at 
org.apache.kafka.connect.runtime.WorkerSinkTask.openPartitions(WorkerSinkTask.java:637)
        at 
org.apache.kafka.connect.runtime.WorkerSinkTask.access$1200(WorkerSinkTask.java:72)
        at 
org.apache.kafka.connect.runtime.WorkerSinkTask$HandleRebalance.onPartitionsAssigned(WorkerSinkTask.java:733)
        at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.invokePartitionsAssigned(ConsumerCoordinator.java:311)
        at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:460)
        at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:460)
        at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:371)
        at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:542)
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1271)
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1235)
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1215)
        at 
org.apache.kafka.connect.runtime.WorkerSinkTask.pollConsumer(WorkerSinkTask.java:472)
        at 
org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:328)
        at 
org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:234)
        at 
org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:203)
        at 
org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243)
        at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:750)
   Caused by: java.lang.ClassNotFoundException: 
org.apache.hadoop.fs.FSDataInputStream
        at java.net.URLClassLoader.findClass(URLClassLoader.java:387)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
        at 
org.apache.kafka.connect.runtime.isolation.PluginClassLoader.loadClass(PluginClassLoader.java:103)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
        ... 45 more
   [2023-11-15 11:05:12,067] INFO [hudi-kafka-test|task-0] [Consumer 
clientId=connector-consumer-hudi-kafka-test-0, groupId=connect-hudi-kafka-test] 
Revoke previously assigned partitions hudi-test-topic-0 
(org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:325)
   [2023-11-15 11:05:12,067] INFO [hudi-kafka-test|task-0] [Consumer 
clientId=connector-consumer-hudi-kafka-test-0, groupId=connect-hudi-kafka-test] 
Member 
connector-consumer-hudi-kafka-test-0-b692100a-19f6-4f93-b74d-b3aad60c73e0 
sending LeaveGroup request to coordinator localhost:9092 (id: 2147483646 rack: 
null) due to the consumer is being closed 
(org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:1106)
   [2023-11-15 11:05:12,068] INFO [hudi-kafka-test|task-0] [Consumer 
clientId=connector-consumer-hudi-kafka-test-0, groupId=connect-hudi-kafka-test] 
Resetting generation and member id due to: consumer pro-actively leaving the 
group (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:998)
   [2023-11-15 11:05:12,068] INFO [hudi-kafka-test|task-0] [Consumer 
clientId=connector-consumer-hudi-kafka-test-0, groupId=connect-hudi-kafka-test] 
Request joining group due to: consumer pro-actively leaving the group 
(org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:1045)
   [2023-11-15 11:05:12,068] INFO [hudi-kafka-test|task-0] Metrics scheduler 
closed (org.apache.kafka.common.metrics.Metrics:659)
   [2023-11-15 11:05:12,068] INFO [hudi-kafka-test|task-0] Closing reporter 
org.apache.kafka.common.metrics.JmxReporter 
(org.apache.kafka.common.metrics.Metrics:663)
   [2023-11-15 11:05:12,068] INFO [hudi-kafka-test|task-0] Metrics reporters 
closed (org.apache.kafka.common.metrics.Metrics:669)
   [2023-11-15 11:05:12,069] INFO [hudi-kafka-test|task-0] App info 
kafka.consumer for connector-consumer-hudi-kafka-test-0 unregistered 
(org.apache.kafka.common.utils.AppInfoParser:83)
   [2023-11-15 11:09:48,011] INFO [AdminClient clientId=adminclient-8] Node -1 
disconnected. (org.apache.kafka.clients.NetworkClient:935)
   [2023-11-15 11:13:48,152] INFO [Producer clientId=producer-1] Node -1 
disconnected. (org.apache.kafka.clients.NetworkClient:935)
   [2023-11-15 11:13:48,160] INFO [Worker clientId=connect-1, 
groupId=hudi-connect-cluster] Node -1 disconnected. 
(org.apache.kafka.clients.NetworkClient:935)
   [2023-11-15 11:13:48,203] INFO [Producer clientId=producer-2] Node -1 
disconnected. (org.apache.kafka.clients.NetworkClient:935)
   [2023-11-15 11:13:48,225] INFO [hudi-kafka-test|task-0] [Consumer 
clientId=consumer-hudi-control-group2183cefd-ff0b-4049-80c2-0eb9fcee21b1-4, 
groupId=hudi-control-group2183cefd-ff0b-4049-80c2-0eb9fcee21b1] Node -1 
disconnected. (org.apache.kafka.clients.NetworkClient:935)
   [2023-11-15 11:13:48,282] INFO [Producer clientId=producer-3] Node -1 
disconnected. (org.apache.kafka.clients.NetworkClient:935)
   [2023-11-15 11:13:48,294] INFO [Consumer 
clientId=consumer-hudi-connect-cluster-2, groupId=hudi-connect-cluster] Node -1 
disconnected. (org.apache.kafka.clients.NetworkClient:935)
   [2023-11-15 11:13:48,294] INFO [Consumer 
clientId=consumer-hudi-connect-cluster-1, groupId=hudi-connect-cluster] Node -1 
disconnected. (org.apache.kafka.clients.NetworkClient:935)
   [2023-11-15 11:13:48,294] INFO [Consumer 
clientId=consumer-hudi-connect-cluster-2, groupId=hudi-connect-cluster] Node 
2147483646 disconnected. (org.apache.kafka.clients.NetworkClient:935)
   [2023-11-15 11:13:48,295] INFO [Consumer 
clientId=consumer-hudi-connect-cluster-1, groupId=hudi-connect-cluster] Node 
2147483646 disconnected. (org.apache.kafka.clients.NetworkClient:935)
   [2023-11-15 11:13:48,295] INFO [Consumer 
clientId=consumer-hudi-connect-cluster-2, groupId=hudi-connect-cluster] Group 
coordinator localhost:9092 (id: 2147483646 rack: null) is unavailable or 
invalid due to cause: coordinator unavailable.isDisconnected: true. Rediscovery 
will be attempted. 
(org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:942)
   [2023-11-15 11:13:48,295] INFO [Consumer 
clientId=consumer-hudi-connect-cluster-1, groupId=hudi-connect-cluster] Group 
coordinator localhost:9092 (id: 2147483646 rack: null) is unavailable or 
invalid due to cause: coordinator unavailable.isDisconnected: true. Rediscovery 
will be attempted. 
(org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:942)
   [2023-11-15 11:13:48,383] INFO [hudi-kafka-test|task-0] [Producer 
clientId=producer-4] Node -1 disconnected. 
(org.apache.kafka.clients.NetworkClient:935)
   [2023-11-15 11:13:48,548] INFO [Consumer 
clientId=consumer-hudi-connect-cluster-3, groupId=hudi-connect-cluster] Node -1 
disconnected. (org.apache.kafka.clients.NetworkClient:935)
   [2023-11-15 11:13:48,548] INFO [Consumer 
clientId=consumer-hudi-connect-cluster-3, groupId=hudi-connect-cluster] Node 
2147483646 disconnected. (org.apache.kafka.clients.NetworkClient:935)
   [2023-11-15 11:13:48,548] INFO [Consumer 
clientId=consumer-hudi-connect-cluster-3, groupId=hudi-connect-cluster] Group 
coordinator localhost:9092 (id: 2147483646 rack: null) is unavailable or 
invalid due to cause: coordinator unavailable.isDisconnected: true. Rediscovery 
will be attempted. 
(org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:942)
   [2023-11-15 11:13:48,796] INFO [Consumer 
clientId=consumer-hudi-connect-cluster-2, groupId=hudi-connect-cluster] 
Discovered group coordinator localhost:9092 (id: 2147483646 rack: null) 
(org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:879)
   [2023-11-15 11:13:48,797] INFO [Consumer 
clientId=consumer-hudi-connect-cluster-1, groupId=hudi-connect-cluster] 
Discovered group coordinator localhost:9092 (id: 2147483646 rack: null) 
(org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:879)
   [2023-11-15 11:13:49,050] INFO [Consumer 
clientId=consumer-hudi-connect-cluster-3, groupId=hudi-connect-cluster] 
Discovered group coordinator localhost:9092 (id: 2147483646 rack: null) 
(org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:879)
   
   **To Reproduce**
   
   Steps to reproduce the behavior:
   
   1. follow the same document linked above try with both the version of Hudi 
(0.14.0 or 0.13.1) 
   2. keep the connect-distributed.properties like below
   bootstrap.servers=localhost:9092
   group.id=hudi-connect-cluster
   key.converter=org.apache.kafka.connect.json.JsonConverter
   value.converter=org.apache.kafka.connect.json.JsonConverter
   key.converter.schemas.enable=true
   value.converter.schemas.enable=true
   offset.storage.topic=connect-offsets
   offset.storage.replication.factor=1
   config.storage.topic=connect-configs
   config.storage.replication.factor=1
   status.storage.topic=connect-status
   status.storage.replication.factor=1
   
   offset.flush.interval.ms=60000
   listeners=HTTP://:8083
   plugin.path=/usr/local/share/kafka/plugins
   3. Please try the same hudi-test-topic given in the kafka demo 
https://github.com/apache/hudi/tree/master/hudi-kafka-connect/demo
   4. Submit the kafka connector config mentioned above. 
   
   **Expected behavior**
   
   It should work and I want to see the hudi table gets created in my local 
/tmp/hoodie/hudi-test/topic 
   
   **Environment Description**
   
   * Hudi version : 0.14/0.13.1
   
   * Spark version : spark-3.2.3
   
   * Hive version : N/A
   
   * Hadoop version : n/A
   
   * Storage (HDFS/S3/GCS..) :Local 
   
   * Running on Docker? (yes/no) : no
   
   
   **Additional context**
   
   I have tried both confluentinc-kafka-connect-hdfs-10.2.4 and 
confluentinc-kafka-connect-hdfs-10.1.0 JARS but the error is same.
   
   **Stacktrace**
   
     "id": 3,
         "state": "FAILED",
         "worker_id": "10.12.22.168:8083",
         "trace": "java.lang.NoClassDefFoundError: 
org/apache/hadoop/fs/FSDataInputStream\n\tat 
org.apache.hudi.config.metrics.HoodieMetricsConfig.lambda$static$0(HoodieMetricsConfig.java:73)\n\tat
 
org.apache.hudi.common.config.HoodieConfig.setDefaultValue(HoodieConfig.java:88)\n\tat
 
org.apache.hudi.common.config.HoodieConfig.lambda$setDefaults$2(HoodieConfig.java:130)\n\tat
 java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)\n\tat 
java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:175)\n\tat 
java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:175)\n\tat 
java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:948)\n\tat
 java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)\n\tat 
java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)\n\tat
 
java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)\n\tat
 java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateS
 equential(ForEachOps.java:173)\n\tat 
java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)\n\tat 
java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:485)\n\tat 
org.apache.hudi.common.config.HoodieConfig.setDefaults(HoodieConfig.java:126)\n\tat
 
org.apache.hudi.config.metrics.HoodieMetricsConfig.access$100(HoodieMetricsConfig.java:44)\n\tat
 
org.apache.hudi.config.metrics.HoodieMetricsConfig$Builder.build(HoodieMetricsConfig.java:207)\n\tat
 
org.apache.hudi.config.HoodieWriteConfig$Builder.setDefaults(HoodieWriteConfig.java:3095)\n\tat
 
org.apache.hudi.config.HoodieWriteConfig$Builder.build(HoodieWriteConfig.java:3213)\n\tat
 
org.apache.hudi.config.HoodieWriteConfig$Builder.build(HoodieWriteConfig.java:3208)\n\tat
 
org.apache.hudi.connect.writers.KafkaConnectTransactionServices.<init>(KafkaConnectTransactionServices.java:79)\n\tat
 
org.apache.hudi.connect.transaction.ConnectTransactionCoordinator.<init>(ConnectTransactionCoordinator.java:88)\n\tat
 org.apache.hu
 di.connect.HoodieSinkTask.bootstrap(HoodieSinkTask.java:191)\n\tat 
org.apache.hudi.connect.HoodieSinkTask.open(HoodieSinkTask.java:151)\n\tat 
org.apache.kafka.connect.runtime.WorkerSinkTask.openPartitions(WorkerSinkTask.java:637)\n\tat
 
org.apache.kafka.connect.runtime.WorkerSinkTask.access$1200(WorkerSinkTask.java:72)\n\tat
 
org.apache.kafka.connect.runtime.WorkerSinkTask$HandleRebalance.onPartitionsAssigned(WorkerSinkTask.java:733)\n\tat
 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.invokePartitionsAssigned(ConsumerCoordinator.java:311)\n\tat
 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:460)\n\tat
 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:460)\n\tat
 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:371)\n\tat
 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCo
 ordinator.java:542)\n\tat 
org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1271)\n\tat
 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1235)\n\tat
 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1215)\n\tat
 
org.apache.kafka.connect.runtime.WorkerSinkTask.pollConsumer(WorkerSinkTask.java:472)\n\tat
 
org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:328)\n\tat
 
org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:234)\n\tat
 
org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:203)\n\tat
 org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188)\n\tat 
org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243)\n\tat 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\n\tat 
java.util.concurrent.FutureTask.run(FutureTask.java:266)\n\tat 
java.util.concurrent.ThreadPoolExecutor.runWor
 ker(ThreadPoolExecutor.java:1149)\n\tat 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat
 java.lang.Thread.run(Thread.java:750)\nCaused by: 
java.lang.ClassNotFoundException: org.apache.hadoop.fs.FSDataInputStream\n\tat 
java.net.URLClassLoader.findClass(URLClassLoader.java:387)\n\tat 
java.lang.ClassLoader.loadClass(ClassLoader.java:418)\n\tat 
org.apache.kafka.connect.runtime.isolation.PluginClassLoader.loadClass(PluginClassLoader.java:103)\n\tat
 java.lang.ClassLoader.loadClass(ClassLoader.java:351)\n\t... 45 more\n"
       }
     ],
     "type": "sink"
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to