Hi Raghu,Thanks for your reply.No, same issue with Kafka 0901.I rebuilt/rerun
everything and still same issue.Thanks again.
From: Raghu Angadi <[email protected]>
To: [email protected]; amir bahmanyari <[email protected]>
Sent: Monday, May 23, 2016 3:59 PM
Subject: Re: Error reading field 'topic_metadata'
Strange. may be the issue is similar to one here :
https://github.com/apache/incubator-beam/pull/374
Can you fix your kafka version to 0.9.0.1 as in above pull request?
On Mon, May 23, 2016 at 3:32 PM, amir bahmanyari <[email protected]> wrote:
Hi Colleagues,I have been successfully running KafkaIO in my Beam app thus
far.I mvn clean rebuilt/packaged it and all of the sudden it now throws this
exception below.SchemaException: Error reading field 'topic_metadata': Error
reading array of size 420978, only 34 bytes available
Any idea pls? Thanks for your attention.
/opt/analytics/apache/flink-1.0.0/bin/flink run
/opt/maven305/dataflow-test/target/dataflow-test-1.0.jar --topic lrdata
--bootstrap.servers kafkahost:9092 --zookeeper.connect kafkahost:2181
--group.id myGroup
.....................Completed method...about to run pipeline...Running thread
threw: java.lang.RuntimeException: Error while translating UnboundedSource:
org.apache.beam.sdk.io.kafka.KafkaIO$UnboundedKafkaSource@6e5bfdfc at
org.apache.beam.runners.flink.translation.FlinkStreamingTransformTranslators$UnboundedReadSourceTranslator.translateNode(FlinkStreamingTransformTranslators.java:283)
at
org.apache.beam.runners.flink.translation.FlinkStreamingTransformTranslators$UnboundedReadSourceTranslator.translateNode(FlinkStreamingTransformTranslators.java:244)
at
org.apache.beam.runners.flink.translation.FlinkStreamingPipelineTranslator.applyStreamingTransform(FlinkStreamingPipelineTranslator.java:108)
at
org.apache.beam.runners.flink.translation.FlinkStreamingPipelineTranslator.visitPrimitiveTransform(FlinkStreamingPipelineTranslator.java:89)
at
org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:225)
at
org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:220)
at
org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:220)
at
org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:104)
at
org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:292) at
org.apache.beam.runners.flink.translation.FlinkPipelineTranslator.translate(FlinkPipelineTranslator.java:34)
at
org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironment.translate(FlinkPipelineExecutionEnvironment.java:132)
at
org.apache.beam.runners.flink.FlinkPipelineRunner.run(FlinkPipelineRunner.java:108)
at
org.apache.beam.runners.flink.FlinkPipelineRunner.run(FlinkPipelineRunner.java:49)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:182) at
benchmark.flinkspark.flink.ReadFromKafka2.main(ReadFromKafka2.java:61) // this
is p.run() statement at
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498) at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
at org.apache.flink.client.program.Client.runBlocking(Client.java:248)
at
org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866)
at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189)
at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239)Caused
by: org.apache.kafka.common.protocol.types.SchemaException: Error reading field
'topic_metadata': Error reading array of size 420978, only 34 bytes available
at org.apache.kafka.common.protocol.types.Schema.read(Schema.java:73)
at
org.apache.kafka.clients.NetworkClient.parseResponse(NetworkClient.java:380)
at
org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:449)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:269)
at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360)
at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224)
at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:178)
at
org.apache.kafka.clients.consumer.internals.Fetcher.getTopicMetadata(Fetcher.java:205)
at
org.apache.kafka.clients.consumer.KafkaConsumer.partitionsFor(KafkaConsumer.java:1272)
at
org.apache.beam.sdk.io.kafka.KafkaIO$UnboundedKafkaSource.generateInitialSplits(KafkaIO.java:574)
at
org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper.<init>(UnboundedSourceWrapper.java:131)
at
org.apache.beam.runners.flink.translation.FlinkStreamingTransformTranslators$UnboundedReadSourceTranslator.translateNode(FlinkStreamingTransformTranslators.java:280)
... 25 more