Hi Sumit I diagnosed it You are right Kafka not accessible I increased ulimit to an astronomical number and passed the issue Maybe we should meet someday and share experiences I live in the Bay Area Cheers Amir
Sent from my iPhone > On Oct 1, 2016, at 10:10 PM, Chawla,Sumit <[email protected]> wrote: > > I have seen this error when Kafka is not accessible. Can you check > connectivity to kafka? Can you read messages from Kafka using console > consumer from the machine where you are running the flink job? > > Regards > Sumit Chawla > > >> On Sat, Oct 1, 2016 at 12:40 PM, amir bahmanyari <[email protected]> wrote: >> Hi Stephan et al. >> I changed ulimit -n at all servers: Flink & Kafka to 10000 & 16000 >> respectively. >> Rebooted. Retried. Failed the same. >> Then restarted everything from the scratch with disk clean up etc. >> Started fine & no more Timeout due to not being able to fetch the topic >> metadata. >> All that started when I increased the deg of parallelism & created a Kafka >> topic with that number for partitions: 2048. >> Its running now. Am hoping to get better perf due to increasing deg >> paralleism. >> Have a great weekend & Appreciate your feedback... >> Amir- >> >> >> From: amir bahmanyari <[email protected]> >> To: "[email protected]" <[email protected]> >> Sent: Friday, September 30, 2016 2:36 PM >> >> Subject: Re: KafkaIO() Timeout expired while fetching kafka topic metadata >> >> Hi Stephan, >> yes, this program uses Beam's KafkaIO(). >> >> PCollection<KV<String, String>> kafkarecords = p >> >> .apply(KafkaIO.read().withBootstrapServers("kafka01:9092").withTopics(topics) >> >> .withValueCoder(StringUtf8Coder.of()).withoutMetadata()) >> .apply("startBundle", ParDo.of( >> new DoFn<KV<byte[], >> String>, KV<String, String>>() { >> >> Thanks+regards, >> Amir- >> >> >> From: Stephan Ewen <[email protected]> >> To: [email protected]; amir bahmanyari <[email protected]> >> Sent: Friday, September 30, 2016 2:13 PM >> Subject: Re: KafkaIO() Timeout expired while fetching kafka topic metadata >> >> Not sure if this is caused by the Flink Kafka Consumer (or if this program >> uses Beam's Kafka IO). >> >> If it is using Flink, there is a requirement that the client that submits >> the job can access Kafka. If it cannot, that may be a cause for such a >> timeout. >> >> >> On Fri, Sep 30, 2016 at 7:11 PM, amir bahmanyari <[email protected]> wrote: >> No more stack trace Raghu. >> Keeping the configs like I described, yes it fails like this since for some >> reason the Kafka Zookeeper gets killed. >> Thanks +regards >> Amir- >> >> From: Raghu Angadi <[email protected]> >> To: [email protected] ; amir bahmanyari <[email protected]> >> Sent: Thursday, September 29, 2016 11:28 PM >> Subject: Re: KafkaIO() Timeout expired while fetching kafka topic metadata >> >> >> On Thu, Sep 29, 2016 at 9:34 PM, amir bahmanyari <[email protected]> wrote: >> at org.apache.flink.client. CliFrontend.parseParameters( >> CliFrontend.java:1189) >> at org.apache.flink.client. CliFrontend.main(CliFrontend. java:1239) >> Caused by: org.apache.kafka.common. errors.TimeoutException: Timeout expired >> while fetching topic metadata >> >> Is there any more stacktrace printed after this line? >> >> Does it always fail? >
