Hi Colleagues,I created a kafka topic with 2048 partitions. kafka-topics.sh
--describe lists everything accurately and its open for business.I start Flibk
Cluster. Everything come up normal.I start my Beam app that uses KafkaIO()
...it waits for a while and then
throws:org.apache.kafka.common.errors.TimeoutException: Timeout expired while
fetching topic metadata
The deg of parallelism is 1024. Works with lower degree of parallelism like 512
& 1024 kafka partitions.Line 581 in my code is just p.run();
And this how KafkaIO.read looks like:PCollection<KV<String, String>>
kafkarecords = p
.apply(KafkaIO.read().withBootstrapServers("kafka01:9092").withTopics(topics)
.withValueCoder(StringUtf8Coder.of()).withoutMetadata()) .apply("startBundle",
ParDo.of( new DoFn<KV<byte[], String>, KV<String, String>>() {.....etc.
Thanks for your help.Amir-
...ADEBUG Completed method...ADEBUG about to run pipeline...ADEBUG Running
thread threw: java.lang.RuntimeException: Error while translating
UnboundedSource:
org.apache.beam.sdk.io.kafka.KafkaIO$UnboundedKafkaSource@24b52d3e at
org.apache.beam.runners.flink.translation.FlinkStreamingTransformTranslators$UnboundedReadSourceTranslator.translateNode(FlinkStreamingTransformTranslators.java:288)
at
org.apache.beam.runners.flink.translation.FlinkStreamingTransformTranslators$UnboundedReadSourceTranslator.translateNode(FlinkStreamingTransformTranslators.java:247)
at
org.apache.beam.runners.flink.translation.FlinkStreamingPipelineTranslator.applyStreamingTransform(FlinkStreamingPipelineTranslator.java:106)
at
org.apache.beam.runners.flink.translation.FlinkStreamingPipelineTranslator.visitPrimitiveTransform(FlinkStreamingPipelineTranslator.java:87)
at
org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:226)
at
org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:221)
at
org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:221)
at
org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:102)
at
org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:293) at
org.apache.beam.runners.flink.translation.FlinkPipelineTranslator.translate(FlinkPipelineTranslator.java:38)
at
org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironment.translate(FlinkPipelineExecutionEnvironment.java:106)
at org.apache.beam.runners.flink.FlinkRunner.run(FlinkRunner.java:105)
at org.apache.beam.runners.flink.FlinkRunner.run(FlinkRunner.java:48)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:183) at
benchmark.flinkspark.flink.BenchBeamRunners.main(BenchBeamRunners.java:581)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498) at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
at org.apache.flink.client.program.Client.runBlocking(Client.java:248)
at
org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866)
at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189)
at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239)Caused
by: org.apache.kafka.common.errors.TimeoutException: Timeout expired while
fetching topic metadata