Hi Sumit 
I diagnosed it 
You are right Kafka not accessible 
I increased ulimit to an astronomical number and passed the issue 
Maybe we should meet someday and share experiences 
I live in the Bay Area 
Cheers 
Amir


Sent from my iPhone

> On Oct 1, 2016, at 10:10 PM, Chawla,Sumit <[email protected]> wrote:
> 
> I have seen this error when Kafka is not accessible. Can you check 
> connectivity to kafka? Can you read messages from Kafka using console 
> consumer from the machine where you are running the flink job?
> 
> Regards
> Sumit Chawla
> 
> 
>> On Sat, Oct 1, 2016 at 12:40 PM, amir bahmanyari <[email protected]> wrote:
>> Hi Stephan et al.
>> I changed ulimit -n at all servers: Flink & Kafka to 10000 & 16000 
>> respectively.
>> Rebooted. Retried. Failed the same.
>> Then restarted everything from the scratch with disk clean up etc.
>> Started fine & no more Timeout due to not being able to fetch the topic 
>> metadata.
>> All that started when I increased the deg of parallelism & created a Kafka 
>> topic with that number for partitions: 2048.
>> Its running now. Am hoping to get better perf due to increasing deg 
>> paralleism.
>> Have a great weekend & Appreciate your feedback...
>> Amir-
>> 
>> 
>> From: amir bahmanyari <[email protected]>
>> To: "[email protected]" <[email protected]> 
>> Sent: Friday, September 30, 2016 2:36 PM
>> 
>> Subject: Re: KafkaIO() Timeout expired while fetching kafka topic metadata
>> 
>> Hi Stephan,
>> yes, this program uses Beam's KafkaIO().
>> 
>> PCollection<KV<String, String>> kafkarecords = p
>>                                      
>> .apply(KafkaIO.read().withBootstrapServers("kafka01:9092").withTopics(topics)
>>                                                      
>> .withValueCoder(StringUtf8Coder.of()).withoutMetadata())
>>                                      .apply("startBundle", ParDo.of(
>>                                                      new DoFn<KV<byte[], 
>> String>, KV<String, String>>() {
>> 
>> Thanks+regards,
>> Amir-
>> 
>> 
>> From: Stephan Ewen <[email protected]>
>> To: [email protected]; amir bahmanyari <[email protected]> 
>> Sent: Friday, September 30, 2016 2:13 PM
>> Subject: Re: KafkaIO() Timeout expired while fetching kafka topic metadata
>> 
>> Not sure if this is caused by the Flink Kafka Consumer (or if this program 
>> uses Beam's Kafka IO).
>> 
>> If it is using Flink, there is a requirement that the client that submits 
>> the job can access Kafka. If it cannot, that may be a cause for such a 
>> timeout.
>> 
>> 
>> On Fri, Sep 30, 2016 at 7:11 PM, amir bahmanyari <[email protected]> wrote:
>> No more stack trace Raghu.
>> Keeping the configs like I described, yes it fails like this since for some 
>> reason the Kafka Zookeeper gets killed.
>> Thanks +regards
>> Amir-
>> 
>> From: Raghu Angadi <[email protected]>
>> To: [email protected] ; amir bahmanyari <[email protected]> 
>> Sent: Thursday, September 29, 2016 11:28 PM
>> Subject: Re: KafkaIO() Timeout expired while fetching kafka topic metadata
>> 
>> 
>> On Thu, Sep 29, 2016 at 9:34 PM, amir bahmanyari <[email protected]> wrote:
>>         at org.apache.flink.client. CliFrontend.parseParameters( 
>> CliFrontend.java:1189)
>>         at org.apache.flink.client. CliFrontend.main(CliFrontend. java:1239)
>> Caused by: org.apache.kafka.common. errors.TimeoutException: Timeout expired 
>> while fetching topic metadata
>> 
>> Is there any more stacktrace printed after this line?
>> 
>> Does it always fail? 
> 

Reply via email to