Is it possible that "'localhost:9092'" is not available from the Docker
environment where the Flink step is executed from ? Can you try specifying
the actual IP address of the node running the Kafka broker ?

On Fri, Jun 5, 2020 at 2:53 PM Luke Cwik <lc...@google.com> wrote:

> +dev <d...@beam.apache.org> +Chamikara Jayalath <chamik...@google.com> 
> +Heejong
> Lee <heej...@google.com>
>
> On Fri, Jun 5, 2020 at 8:29 AM Piotr Filipiuk <piotr.filip...@gmail.com>
> wrote:
>
>> I am unable to read from Kafka and getting the following warnings &
>> errors when calling kafka.ReadFromKafka() (Python SDK):
>>
>> WARNING:root:severity: WARN
>> timestamp {
>>   seconds: 1591370012
>>   nanos: 523000000
>> }
>> message: "[Consumer clientId=consumer-2, groupId=] Connection to node -1
>> could not be established. Broker may not be available."
>> log_location: "org.apache.kafka.clients.NetworkClient"
>> thread: "18"
>>
>> Finally the pipeline fails with:
>>
>> RuntimeError: org.apache.beam.sdk.util.UserCodeException:
>> java.lang.RuntimeException:
>> org.apache.kafka.common.errors.TimeoutException: Timeout expired while
>> fetching topic metadata
>>
>> See more complete log attached.
>>
>> The relevant code snippet:
>>
>> consumer_conf = {"bootstrap.servers": 'localhost:9092'}
>> ...
>> kafka.ReadFromKafka(
>>                 consumer_config=consumer_conf,
>>                 topics=[args.topic],
>> )
>> ...
>>
>> Also see full python script attached.
>>
>> I am using Beam Version: 2.21.0 and DirectRunner. For Flink Runner I am
>> also not able to read from topic.
>>
>> I am using kafka 2.5.0 and started the broker by following
>> https://kafka.apache.org/quickstart - using default
>> config/server.properties.
>>
>> Everything runs locally, and I verified that I can publish&consume from
>> that topic using confluent_kafka library.
>>
>> --
>> Best regards,
>> Piotr
>>
>

Reply via email to