Hi Stephan,yes, this program uses Beam's KafkaIO().

PCollection<KV<String, String>> kafkarecords = p 
.apply(KafkaIO.read().withBootstrapServers("kafka01:9092").withTopics(topics) 
.withValueCoder(StringUtf8Coder.of()).withoutMetadata()) .apply("startBundle", 
ParDo.of( new DoFn<KV<byte[], String>, KV<String, String>>() {
Thanks+regards,Amir-

      From: Stephan Ewen <[email protected]>
 To: [email protected]; amir bahmanyari <[email protected]> 
 Sent: Friday, September 30, 2016 2:13 PM
 Subject: Re: KafkaIO() Timeout expired while fetching kafka topic metadata
   
Not sure if this is caused by the Flink Kafka Consumer (or if this program uses 
Beam's Kafka IO).
If it is using Flink, there is a requirement that the client that submits the 
job can access Kafka. If it cannot, that may be a cause for such a timeout.

On Fri, Sep 30, 2016 at 7:11 PM, amir bahmanyari <[email protected]> wrote:

No more stack trace Raghu.Keeping the configs like I described, yes it fails 
like this since for some reason the Kafka Zookeeper gets killed.Thanks 
+regardsAmir-

      From: Raghu Angadi <[email protected]>
 To: [email protected] ; amir bahmanyari <[email protected]> 
 Sent: Thursday, September 29, 2016 11:28 PM
 Subject: Re: KafkaIO() Timeout expired while fetching kafka topic metadata
  

On Thu, Sep 29, 2016 at 9:34 PM, amir bahmanyari <[email protected]> wrote:

        at org.apache.flink.client. CliFrontend.parseParameters( 
CliFrontend.java:1189)        at org.apache.flink.client. 
CliFrontend.main(CliFrontend. java:1239)Caused by: org.apache.kafka.common. 
errors.TimeoutException: Timeout expired while fetching topic metadata

Is there any more stacktrace printed after this line?
Does it always fail? 


   



   

Reply via email to