Thank you for the suggestions.

Neither Kafka nor Flink run in a docker container, they all run locally.
Furthermore, the same issue happens for Direct Runner. That being said
changing from localhost:$PORT to $IP_ADDR:$PORT resulted in a different
error, see attached.

On Fri, Jun 5, 2020 at 3:47 PM Venkat Muthuswamy <venkat_pack...@yahoo.com>
wrote:

> Is Kafka itself running inside another container? If so inspect that
> container and see if it has a network alias and add that alias to your
> /etc/hosts file and map it to 127.0.0.1.
>
>
>
> *From:* Chamikara Jayalath <chamik...@google.com>
> *Sent:* Friday, June 5, 2020 2:58 PM
> *To:* Luke Cwik <lc...@google.com>
> *Cc:* user <user@beam.apache.org>; dev <d...@beam.apache.org>; Heejong Lee
> <heej...@google.com>
> *Subject:* Re: Python SDK ReadFromKafka: Timeout expired while fetching
> topic metadata
>
>
>
> Is it possible that "'localhost:9092'" is not available from the Docker
> environment where the Flink step is executed from ? Can you try specifying
> the actual IP address of the node running the Kafka broker ?
>
>
>
> On Fri, Jun 5, 2020 at 2:53 PM Luke Cwik <lc...@google.com> wrote:
>
> +dev <d...@beam.apache.org> +Chamikara Jayalath <chamik...@google.com> 
> +Heejong
> Lee <heej...@google.com>
>
>
>
> On Fri, Jun 5, 2020 at 8:29 AM Piotr Filipiuk <piotr.filip...@gmail.com>
> wrote:
>
> I am unable to read from Kafka and getting the following warnings & errors
> when calling kafka.ReadFromKafka() (Python SDK):
>
>
>
> WARNING:root:severity: WARN
> timestamp {
>   seconds: 1591370012
>   nanos: 523000000
> }
> message: "[Consumer clientId=consumer-2, groupId=] Connection to node -1
> could not be established. Broker may not be available."
> log_location: "org.apache.kafka.clients.NetworkClient"
> thread: "18"
>
>
>
> Finally the pipeline fails with:
>
>
>
> RuntimeError: org.apache.beam.sdk.util.UserCodeException:
> java.lang.RuntimeException:
> org.apache.kafka.common.errors.TimeoutException: Timeout expired while
> fetching topic metadata
>
>
>
> See more complete log attached.
>
>
>
> The relevant code snippet:
>
>
>
> consumer_conf = {"bootstrap.servers": 'localhost:9092'}
>
> ...
>
> kafka.ReadFromKafka(
>                 consumer_config=consumer_conf,
>                 topics=[args.topic],
>
> )
>
> ...
>
>
>
> Also see full python script attached.
>
>
>
> I am using Beam Version: 2.21.0 and DirectRunner. For Flink Runner I am
> also not able to read from topic.
>
>
>
> I am using kafka 2.5.0 and started the broker by following
> https://kafka.apache.org/quickstart - using default
> config/server.properties.
>
>
>
> Everything runs locally, and I verified that I can publish&consume from
> that topic using confluent_kafka library.
>
>
>
> --
>
> Best regards,
> Piotr
>
>

-- 
Best regards,
Piotr

Attachment: beam01.log
Description: Binary data

Reply via email to