Hi Colleagues,I have been successfully running KafkaIO in my Beam app thus 
far.I mvn clean rebuilt/packaged it and all of the sudden it now throws this 
exception below.SchemaException: Error reading field 'topic_metadata': Error 
reading array of size 420978, only 34 bytes available
Any idea pls? Thanks for your attention.
/opt/analytics/apache/flink-1.0.0/bin/flink run 
/opt/maven305/dataflow-test/target/dataflow-test-1.0.jar --topic lrdata 
--bootstrap.servers kafkahost:9092 --zookeeper.connect kafkahost:2181 
--group.id myGroup
.....................Completed method...about to run pipeline...Running thread  
threw:  java.lang.RuntimeException: Error while translating UnboundedSource: 
org.apache.beam.sdk.io.kafka.KafkaIO$UnboundedKafkaSource@6e5bfdfc        at 
org.apache.beam.runners.flink.translation.FlinkStreamingTransformTranslators$UnboundedReadSourceTranslator.translateNode(FlinkStreamingTransformTranslators.java:283)
        at 
org.apache.beam.runners.flink.translation.FlinkStreamingTransformTranslators$UnboundedReadSourceTranslator.translateNode(FlinkStreamingTransformTranslators.java:244)
        at 
org.apache.beam.runners.flink.translation.FlinkStreamingPipelineTranslator.applyStreamingTransform(FlinkStreamingPipelineTranslator.java:108)
        at 
org.apache.beam.runners.flink.translation.FlinkStreamingPipelineTranslator.visitPrimitiveTransform(FlinkStreamingPipelineTranslator.java:89)
        at 
org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:225) 
       at 
org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:220) 
       at 
org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:220) 
       at 
org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:104)
        at 
org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:292)        at 
org.apache.beam.runners.flink.translation.FlinkPipelineTranslator.translate(FlinkPipelineTranslator.java:34)
        at 
org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironment.translate(FlinkPipelineExecutionEnvironment.java:132)
        at 
org.apache.beam.runners.flink.FlinkPipelineRunner.run(FlinkPipelineRunner.java:108)
        at 
org.apache.beam.runners.flink.FlinkPipelineRunner.run(FlinkPipelineRunner.java:49)
        at org.apache.beam.sdk.Pipeline.run(Pipeline.java:182)        at 
benchmark.flinkspark.flink.ReadFromKafka2.main(ReadFromKafka2.java:61)  // this 
is p.run() statement        at 
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)   
     at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)        at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
        at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
        at org.apache.flink.client.program.Client.runBlocking(Client.java:248)  
      at 
org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866)
        at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)        
at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189)   
     at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239)Caused 
by: org.apache.kafka.common.protocol.types.SchemaException: Error reading field 
'topic_metadata': Error reading array of size 420978, only 34 bytes available   
     at org.apache.kafka.common.protocol.types.Schema.read(Schema.java:73)      
  at 
org.apache.kafka.clients.NetworkClient.parseResponse(NetworkClient.java:380)    
    at 
org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:449)
        at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:269)  
      at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360)
        at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224)
        at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:178)
        at 
org.apache.kafka.clients.consumer.internals.Fetcher.getTopicMetadata(Fetcher.java:205)
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.partitionsFor(KafkaConsumer.java:1272)
        at 
org.apache.beam.sdk.io.kafka.KafkaIO$UnboundedKafkaSource.generateInitialSplits(KafkaIO.java:574)
        at 
org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper.<init>(UnboundedSourceWrapper.java:131)
        at 
org.apache.beam.runners.flink.translation.FlinkStreamingTransformTranslators$UnboundedReadSourceTranslator.translateNode(FlinkStreamingTransformTranslators.java:280)
        ... 25 more

Reply via email to