Hi Colleagues,I created a kafka topic with 2048 partitions. kafka-topics.sh 
--describe lists everything accurately and its open for business.I start Flibk 
Cluster. Everything come up normal.I start my Beam app that uses KafkaIO() 
...it waits for a while and then 
throws:org.apache.kafka.common.errors.TimeoutException: Timeout expired while 
fetching topic metadata
The deg of parallelism is 1024. Works with lower degree of parallelism like 512 
& 1024 kafka partitions.Line 581 in my code is just p.run(); 
And this how KafkaIO.read looks like:PCollection<KV<String, String>> 
kafkarecords = p 
.apply(KafkaIO.read().withBootstrapServers("kafka01:9092").withTopics(topics) 
.withValueCoder(StringUtf8Coder.of()).withoutMetadata()) .apply("startBundle", 
ParDo.of( new DoFn<KV<byte[], String>, KV<String, String>>() {.....etc.

Thanks for your help.Amir-


...ADEBUG Completed method...ADEBUG about to run pipeline...ADEBUG Running 
thread  threw:  java.lang.RuntimeException: Error while translating 
UnboundedSource: 
org.apache.beam.sdk.io.kafka.KafkaIO$UnboundedKafkaSource@24b52d3e        at 
org.apache.beam.runners.flink.translation.FlinkStreamingTransformTranslators$UnboundedReadSourceTranslator.translateNode(FlinkStreamingTransformTranslators.java:288)
        at 
org.apache.beam.runners.flink.translation.FlinkStreamingTransformTranslators$UnboundedReadSourceTranslator.translateNode(FlinkStreamingTransformTranslators.java:247)
        at 
org.apache.beam.runners.flink.translation.FlinkStreamingPipelineTranslator.applyStreamingTransform(FlinkStreamingPipelineTranslator.java:106)
        at 
org.apache.beam.runners.flink.translation.FlinkStreamingPipelineTranslator.visitPrimitiveTransform(FlinkStreamingPipelineTranslator.java:87)
        at 
org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:226) 
       at 
org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:221) 
       at 
org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:221) 
       at 
org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:102)
        at 
org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:293)        at 
org.apache.beam.runners.flink.translation.FlinkPipelineTranslator.translate(FlinkPipelineTranslator.java:38)
        at 
org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironment.translate(FlinkPipelineExecutionEnvironment.java:106)
        at org.apache.beam.runners.flink.FlinkRunner.run(FlinkRunner.java:105)  
      at org.apache.beam.runners.flink.FlinkRunner.run(FlinkRunner.java:48)     
   at org.apache.beam.sdk.Pipeline.run(Pipeline.java:183)        at 
benchmark.flinkspark.flink.BenchBeamRunners.main(BenchBeamRunners.java:581)     
   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)   
     at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)        at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
        at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
        at org.apache.flink.client.program.Client.runBlocking(Client.java:248)  
      at 
org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866)
        at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)        
at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189)   
     at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239)Caused 
by: org.apache.kafka.common.errors.TimeoutException: Timeout expired while 
fetching topic metadata

Reply via email to