Gautam, If the MSK version (which I'm assuming in AWS msk..) is compatible with kafka-client 0.8, then it seems like this might be an authentication issue. Some details in this post here : https://stackoverflow.com/questions/48164748/kafka-java-io-eofexception-networkreceive-readfromreadablechannel .
Thanks, Nishith On Mon, Sep 30, 2019 at 12:39 PM Gautam Nayak <[email protected]> wrote: > > > When executing a HoodieDeltaStreamer Job, We run into this below > exception. I see hudi-utilities-bundle packaged with kafka-0.8 client libs, > but believe it should be compatible with the MSK version of Kafka. Any > pointers what the issue could be? > > > > Spark - 2.2.1 > > Kafka – MSK 2.1.0 > > > > 19/09/30 19:10:56 main INFO SimpleConsumer: Reconnect due to error: > > java.io.EOFException > > at > org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:99) > > at > kafka.network.BlockingChannel.readCompletely(BlockingChannel.scala:129) > > at kafka.network.BlockingChannel.receive(BlockingChannel.scala:120) > > at > kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:86) > > at > kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:83) > > at kafka.consumer.SimpleConsumer.send(SimpleConsumer.scala:111) > > at > org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$getPartitionMetadata$1.apply(KafkaCluster.scala:133) > > at > org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$getPartitionMetadata$1.apply(KafkaCluster.scala:132) > > at > org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$org$apache$spark$streaming$kafka$KafkaCluster$$withBrokers$1.apply(KafkaCluster.scala:365) > > at > org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$org$apache$spark$streaming$kafka$KafkaCluster$$withBrokers$1.apply(KafkaCluster.scala:361) > > at > scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) > > at > scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:35) > > at org.apache.spark.streaming.kafka.KafkaCluster.org > $apache$spark$streaming$kafka$KafkaCluster$$withBrokers(KafkaCluster.scala:361) > > at > org.apache.spark.streaming.kafka.KafkaCluster.getPartitionMetadata(KafkaCluster.scala:132) > > at > org.apache.spark.streaming.kafka.KafkaCluster.getPartitions(KafkaCluster.scala:119) > > at > org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.getNextOffsetRanges(KafkaOffsetGen.java:196) > > at > org.apache.hudi.utilities.sources.AvroKafkaSource.fetchNewData(AvroKafkaSource.java:55) > > at > org.apache.hudi.utilities.sources.Source.fetchNext(Source.java:72) > > at > org.apache.hudi.utilities.deltastreamer.SourceFormatAdapter.fetchNewDataInAvroFormat(SourceFormatAdapter.java:62) > > at > org.apache.hudi.utilities.deltastreamer.DeltaSync.readFromSource(DeltaSync.java:299) > > at > org.apache.hudi.utilities.deltastreamer.DeltaSync.syncOnce(DeltaSync.java:218) > > at > org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.sync(HoodieDeltaStreamer.java:123) > > at > org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.main(HoodieDeltaStreamer.java:294) > > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > > at java.lang.reflect.Method.invoke(Method.java:498) > > at > org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:926) > > at > org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:204) > > at > org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:229) > > at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:127) > > at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) > > 19/09/30 19:10:57 main INFO KafkaOffsetGen: HUDI -- getNextOffsetRanges -- > topic name is KAFKA_TEST > > > > Thanks, > > Gautam > >
