Never mind, I found this thread on user list:
https://lists.apache.org/thread.html/raeb69afbd820fdf32b3cf0a273060b6b149f80fa49c7414a1bb60528%40%3Cuser.beam.apache.org%3E,
which answers my question.

On Mon, Jul 13, 2020 at 4:10 PM Kamil Wasilewski <
kamil.wasilew...@polidea.com> wrote:

> I'd like to bump this thread up since I get the same error when trying to
> read from Kafka in Python SDK:
>
> *java.lang.UnsupportedOperationException: The ActiveBundle does not have a
> registered bundle checkpoint handler.*
>
> Can someone familiar with cross-language and Flink verify the problem? I
> use the latest Beam master with the following pipeline options:
>
> --runner=FlinkRunner
> --parallelism=2
> --experiment=beam_fn_api
> --environment_type=DOCKER
> --environment_cache_millis=10000
>
> Those are the same options which are used in CrossLanguageKafkaIOTest:
> https://github.com/apache/beam/blob/master/sdks/python/test-suites/portable/common.gradle#L114
> Speaking of which, is there a specific reason why reading from Kafka is not
> yet being tested by Jenkins at the moment?
>
> Thanks,
> Kamil
>
> On Thu, Jun 18, 2020 at 11:35 PM Piotr Filipiuk <piotr.filip...@gmail.com>
> wrote:
>
>> Thank you for clarifying.
>>
>> Would you mind clarifying whether the issues that I experience running
>> Kafka IO on Flink (or DirectRunner for testing) specific to my setup etc.
>> or this setup is not yet fully functional (for Python SDK)?
>>
>> On Thu, Jun 18, 2020 at 12:03 PM Chamikara Jayalath <chamik...@google.com>
>> wrote:
>>
>>> Beam does not have a concept of general availability. It's released with
>>> Beam so available. Some of the APIs used by Kafka are experimental so are
>>> subject to change (but less likely at this point).
>>> Various runners may offer their own levels of availability for
>>> cross-language transforms.
>>>
>>> Thanks,
>>> Cham
>>>
>>>
>>> On Thu, Jun 18, 2020 at 11:26 AM Piotr Filipiuk <
>>> piotr.filip...@gmail.com> wrote:
>>>
>>>> I also wanted to clarify whether Kafka IO for Python SDK is general
>>>> availability or is it still experimental?
>>>>
>>>> On Fri, Jun 12, 2020 at 2:52 PM Piotr Filipiuk <
>>>> piotr.filip...@gmail.com> wrote:
>>>>
>>>>> For completeness I am also attaching task manager logs.
>>>>>
>>>>> On Fri, Jun 12, 2020 at 2:32 PM Piotr Filipiuk <
>>>>> piotr.filip...@gmail.com> wrote:
>>>>>
>>>>>> Thank you for clarifying.
>>>>>>
>>>>>> I attempted to use FlinkRunner with 2.22 and I am
>>>>>> getting the following error, which I am not sure how to debug:
>>>>>>
>>>>>> ERROR:root:java.lang.UnsupportedOperationException: The ActiveBundle
>>>>>> does not have a registered bundle checkpoint handler.
>>>>>> INFO:apache_beam.runners.portability.portable_runner:Job state
>>>>>> changed to FAILED
>>>>>> Starting pipeline kafka = 192.168.1.219:9092, topic = piotr-test
>>>>>> Traceback (most recent call last):
>>>>>>   File "apache_beam/examples/streaming_wordcount_kafka.py", line 73,
>>>>>> in <module>
>>>>>>     run()
>>>>>>   File "apache_beam/examples/streaming_wordcount_kafka.py", line 68,
>>>>>> in run
>>>>>>     | "WriteUserScoreSums" >> beam.io.WriteToText(args.output)
>>>>>>   File
>>>>>> "/Users/piotr.filipiuk/.virtualenvs/apache-beam/lib/python3.7/site-packages/apache_beam/pipeline.py",
>>>>>> line 547, in __exit__
>>>>>>     self.run().wait_until_finish()
>>>>>>   File
>>>>>> "/Users/piotr.filipiuk/.virtualenvs/apache-beam/lib/python3.7/site-packages/apache_beam/runners/portability/portable_runner.py",
>>>>>> line 583, in wait_until_finish
>>>>>>     raise self._runtime_exception
>>>>>> RuntimeError: Pipeline
>>>>>> BeamApp-piotr0filipiuk-0612212344-159c632b_45b947f8-bc64-4aad-a96c-6fb2bd834d60
>>>>>> failed in state FAILED: java.lang.UnsupportedOperationException: The
>>>>>> ActiveBundle does not have a registered bundle checkpoint handler.
>>>>>>
>>>>>> My setup is (everything runs locally):
>>>>>> Beam Version: 2.22.0.
>>>>>> Kafka 2.5.0 (https://kafka.apache.org/quickstart - using default
>>>>>> config/server.properties)
>>>>>> Flink 1.10 (
>>>>>> https://ci.apache.org/projects/flink/flink-docs-stable/getting-started/tutorials/local_setup.html
>>>>>> )
>>>>>>
>>>>>> I run the pipeline using the following command:
>>>>>>
>>>>>> python apache_beam/examples/streaming_wordcount_kafka.py
>>>>>> --bootstrap_servers=192.168.1.219:9092 --topic=piotr-test
>>>>>> --runner=FlinkRunner --flink_version=1.10 --flink_master=
>>>>>> 192.168.1.219:8081 --environment_type=LOOPBACK
>>>>>>
>>>>>> I can see the following error in the logs:
>>>>>>
>>>>>> ERROR:apache_beam.runners.worker.data_plane:Failed to read inputs in
>>>>>> the data plane.
>>>>>> Traceback (most recent call last):
>>>>>>   File
>>>>>> "/Users/piotr.filipiuk/.virtualenvs/apache-beam/lib/python3.7/site-packages/apache_beam/runners/worker/data_plane.py",
>>>>>> line 528, in _read_inputs
>>>>>>     for elements in elements_iterator:
>>>>>>   File
>>>>>> "/Users/piotr.filipiuk/.virtualenvs/apache-beam/lib/python3.7/site-packages/grpc/_channel.py",
>>>>>> line 416, in __next__
>>>>>>     return self._next()
>>>>>>   File
>>>>>> "/Users/piotr.filipiuk/.virtualenvs/apache-beam/lib/python3.7/site-packages/grpc/_channel.py",
>>>>>> line 689, in _next
>>>>>>     raise self
>>>>>> grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of
>>>>>> RPC that terminated with:
>>>>>>         status = StatusCode.UNAVAILABLE
>>>>>>         details = "DNS resolution failed"
>>>>>>         debug_error_string =
>>>>>> "{"created":"@1591997030.613849000","description":"Failed to pick
>>>>>> subchannel","file":"src/core/ext/filters/client_channel/client_channel.cc","file_line":3941,"referenced_errors":[{"created":"@1591997030.613847000","description":"Resolver
>>>>>> transient
>>>>>> failure","file":"src/core/ext/filters/client_channel/resolving_lb_policy.cc","file_line":262,"referenced_errors":[{"created":"@1591997030.613847000","description":"DNS
>>>>>> resolution
>>>>>> failed","file":"src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc","file_line":370,"grpc_status":14,"referenced_errors":[{"created":"@1591997030.613840000","description":"C-ares
>>>>>> status is not ARES_SUCCESS: Misformatted domain
>>>>>> name","file":"src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc","file_line":244,"referenced_errors":[{"created":"@1591997030.613728000","description":"C-ares
>>>>>> status is not ARES_SUCCESS: Misformatted domain
>>>>>> name","file":"src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc","file_line":244}]}]}]}]}"
>>>>>> >
>>>>>>
>>>>>> Which I thought might be a culprit, however it also happens when
>>>>>> running the wordcount.py example
>>>>>> <https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/wordcount.py>
>>>>>> that succeeds. That error appears only for Flink 1.10, not for Flink 1.9.
>>>>>>
>>>>>> Full log attached.
>>>>>>
>>>>>> I would appreciate help and suggestions on how to proceed.
>>>>>>
>>>>>>
>>>>>> On Mon, Jun 8, 2020 at 5:49 PM Heejong Lee <heej...@google.com>
>>>>>> wrote:
>>>>>>
>>>>>>> DirectRunner is not well-tested for xlang transforms and you need to
>>>>>>> specify jar_packages experimental flag for Java dependencies from Python
>>>>>>> SDK. I'd recommend using 2.22 + FlinkRunner for xlang pipelines.
>>>>>>>
>>>>>>> On Mon, Jun 8, 2020 at 3:27 PM Chamikara Jayalath <
>>>>>>> chamik...@google.com> wrote:
>>>>>>>
>>>>>>>> To clarify, Kafka dependency was already available as an embedded
>>>>>>>> dependency in Java SDK Harness but not sure if this worked for
>>>>>>>> DirectRunner. starting 2.22 we'll be staging dependencies from the
>>>>>>>> environment during pipeline submission.
>>>>>>>>
>>>>>>>> On Mon, Jun 8, 2020 at 3:23 PM Chamikara Jayalath <
>>>>>>>> chamik...@google.com> wrote:
>>>>>>>>
>>>>>>>>> Seems like Java dependency is not being properly set up when
>>>>>>>>> running the cross-language Kafka step. I don't think this was 
>>>>>>>>> available for
>>>>>>>>> Beam 2.21. Can you try with the latest Beam HEAD or Beam 2.22 when 
>>>>>>>>> it's
>>>>>>>>> released ?
>>>>>>>>> +Heejong Lee <heej...@google.com>
>>>>>>>>>
>>>>>>>>> On Mon, Jun 8, 2020 at 12:39 PM Piotr Filipiuk <
>>>>>>>>> piotr.filip...@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> Pasting the error inline:
>>>>>>>>>>
>>>>>>>>>> ERROR:root:severity: ERROR
>>>>>>>>>> timestamp {
>>>>>>>>>>   seconds: 1591405163
>>>>>>>>>>   nanos: 815000000
>>>>>>>>>> }
>>>>>>>>>> message: "Client failed to dequeue and process the value"
>>>>>>>>>> trace: "org.apache.beam.sdk.util.UserCodeException:
>>>>>>>>>> java.lang.NoClassDefFoundError:
>>>>>>>>>> org/springframework/expression/EvaluationContext\n\tat
>>>>>>>>>> org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:36)\n\tat
>>>>>>>>>> org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn$DoFnInvoker.invokeGetSize(Unknown
>>>>>>>>>> Source)\n\tat
>>>>>>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner$2.outputWithTimestamp(FnApiDoFnRunner.java:497)\n\tat
>>>>>>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.output(FnApiDoFnRunner.java:1335)\n\tat
>>>>>>>>>> org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:75)\n\tat
>>>>>>>>>> org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn.splitRestriction(Read.java:504)\n\tat
>>>>>>>>>> org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn$DoFnInvoker.invokeSplitRestriction(Unknown
>>>>>>>>>> Source)\n\tat
>>>>>>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForSplitRestriction(FnApiDoFnRunner.java:715)\n\tat
>>>>>>>>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:216)\n\tat
>>>>>>>>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:179)\n\tat
>>>>>>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:874)\n\tat
>>>>>>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForPairWithRestriction(FnApiDoFnRunner.java:688)\n\tat
>>>>>>>>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:216)\n\tat
>>>>>>>>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:179)\n\tat
>>>>>>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:874)\n\tat
>>>>>>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner.access$600(FnApiDoFnRunner.java:121)\n\tat
>>>>>>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:1340)\n\tat
>>>>>>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.output(FnApiDoFnRunner.java:1335)\n\tat
>>>>>>>>>> org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:75)\n\tat
>>>>>>>>>> org.apache.beam.sdk.transforms.MapElements$1.processElement(MapElements.java:139)\n\tat
>>>>>>>>>> org.apache.beam.sdk.transforms.MapElements$1$DoFnInvoker.invokeProcessElement(Unknown
>>>>>>>>>> Source)\n\tat
>>>>>>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:672)\n\tat
>>>>>>>>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:216)\n\tat
>>>>>>>>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:179)\n\tat
>>>>>>>>>> org.apache.beam.fn.harness.BeamFnDataReadRunner.forwardElementToConsumer(BeamFnDataReadRunner.java:177)\n\tat
>>>>>>>>>> org.apache.beam.fn.harness.data.QueueingBeamFnDataClient.drainAndBlock(QueueingBeamFnDataClient.java:106)\n\tat
>>>>>>>>>> org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:294)\n\tat
>>>>>>>>>> org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:173)\n\tat
>>>>>>>>>> org.apache.beam.fn.harness.control.BeamFnControlClient.lambda$processInstructionRequests$0(BeamFnControlClient.java:157)\n\tat
>>>>>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat
>>>>>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat
>>>>>>>>>> java.lang.Thread.run(Thread.java:748)\nCaused by:
>>>>>>>>>> java.lang.NoClassDefFoundError:
>>>>>>>>>> org/springframework/expression/EvaluationContext\n\tat
>>>>>>>>>> org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.<init>(KafkaUnboundedReader.java:478)\n\tat
>>>>>>>>>> org.apache.beam.sdk.io.kafka.KafkaUnboundedSource.createReader(KafkaUnboundedSource.java:121)\n\tat
>>>>>>>>>> org.apache.beam.sdk.io.kafka.KafkaUnboundedSource.createReader(KafkaUnboundedSource.java:43)\n\tat
>>>>>>>>>> org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn.getSize(Read.java:465)\nCaused
>>>>>>>>>> by: java.lang.ClassNotFoundException:
>>>>>>>>>> org.springframework.expression.EvaluationContext\n\tat
>>>>>>>>>> java.net.URLClassLoader.findClass(URLClassLoader.java:382)\n\tat
>>>>>>>>>> java.lang.ClassLoader.loadClass(ClassLoader.java:419)\n\tat
>>>>>>>>>> sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)\n\tat
>>>>>>>>>> java.lang.ClassLoader.loadClass(ClassLoader.java:352)\n\tat
>>>>>>>>>> org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.<init>(KafkaUnboundedReader.java:478)\n\tat
>>>>>>>>>> org.apache.beam.sdk.io.kafka.KafkaUnboundedSource.createReader(KafkaUnboundedSource.java:121)\n\tat
>>>>>>>>>> org.apache.beam.sdk.io.kafka.KafkaUnboundedSource.createReader(KafkaUnboundedSource.java:43)\n\tat
>>>>>>>>>> org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn.getSize(Read.java:465)\n\tat
>>>>>>>>>> org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn$DoFnInvoker.invokeGetSize(Unknown
>>>>>>>>>> Source)\n\tat
>>>>>>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner$2.outputWithTimestamp(FnApiDoFnRunner.java:497)\n\tat
>>>>>>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.output(FnApiDoFnRunner.java:1335)\n\tat
>>>>>>>>>> org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:75)\n\tat
>>>>>>>>>> org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn.splitRestriction(Read.java:504)\n\tat
>>>>>>>>>> org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn$DoFnInvoker.invokeSplitRestriction(Unknown
>>>>>>>>>> Source)\n\tat
>>>>>>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForSplitRestriction(FnApiDoFnRunner.java:715)\n\tat
>>>>>>>>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:216)\n\tat
>>>>>>>>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:179)\n\tat
>>>>>>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:874)\n\tat
>>>>>>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForPairWithRestriction(FnApiDoFnRunner.java:688)\n\tat
>>>>>>>>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:216)\n\tat
>>>>>>>>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:179)\n\tat
>>>>>>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:874)\n\tat
>>>>>>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner.access$600(FnApiDoFnRunner.java:121)\n\tat
>>>>>>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:1340)\n\tat
>>>>>>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.output(FnApiDoFnRunner.java:1335)\n\tat
>>>>>>>>>> org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:75)\n\tat
>>>>>>>>>> org.apache.beam.sdk.transforms.MapElements$1.processElement(MapElements.java:139)\n\tat
>>>>>>>>>> org.apache.beam.sdk.transforms.MapElements$1$DoFnInvoker.invokeProcessElement(Unknown
>>>>>>>>>> Source)\n\tat
>>>>>>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:672)\n\tat
>>>>>>>>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:216)\n\tat
>>>>>>>>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:179)\n\tat
>>>>>>>>>> org.apache.beam.fn.harness.BeamFnDataReadRunner.forwardElementToConsumer(BeamFnDataReadRunner.java:177)\n\tat
>>>>>>>>>> org.apache.beam.fn.harness.data.QueueingBeamFnDataClient.drainAndBlock(QueueingBeamFnDataClient.java:106)\n\tat
>>>>>>>>>> org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:294)\n\tat
>>>>>>>>>> org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:173)\n\tat
>>>>>>>>>> org.apache.beam.fn.harness.control.BeamFnControlClient.lambda$processInstructionRequests$0(BeamFnControlClient.java:157)\n\tat
>>>>>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat
>>>>>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat
>>>>>>>>>> java.lang.Thread.run(Thread.java:748)\n"
>>>>>>>>>> log_location:
>>>>>>>>>> "org.apache.beam.fn.harness.data.QueueingBeamFnDataClient"
>>>>>>>>>>
>>>>>>>>>> On Fri, Jun 5, 2020 at 3:57 PM Piotr Filipiuk <
>>>>>>>>>> piotr.filip...@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> Thank you for the suggestions.
>>>>>>>>>>>
>>>>>>>>>>> Neither Kafka nor Flink run in a docker container, they all run
>>>>>>>>>>> locally. Furthermore, the same issue happens for Direct Runner. 
>>>>>>>>>>> That being
>>>>>>>>>>> said changing from localhost:$PORT to $IP_ADDR:$PORT resulted in a
>>>>>>>>>>> different error, see attached.
>>>>>>>>>>>
>>>>>>>>>>> On Fri, Jun 5, 2020 at 3:47 PM Venkat Muthuswamy <
>>>>>>>>>>> venkat_pack...@yahoo.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Is Kafka itself running inside another container? If so inspect
>>>>>>>>>>>> that container and see if it has a network alias and add that 
>>>>>>>>>>>> alias to your
>>>>>>>>>>>> /etc/hosts file and map it to 127.0.0.1.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> *From:* Chamikara Jayalath <chamik...@google.com>
>>>>>>>>>>>> *Sent:* Friday, June 5, 2020 2:58 PM
>>>>>>>>>>>> *To:* Luke Cwik <lc...@google.com>
>>>>>>>>>>>> *Cc:* user <user@beam.apache.org>; dev <d...@beam.apache.org>;
>>>>>>>>>>>> Heejong Lee <heej...@google.com>
>>>>>>>>>>>> *Subject:* Re: Python SDK ReadFromKafka: Timeout expired while
>>>>>>>>>>>> fetching topic metadata
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> Is it possible that "'localhost:9092'" is not available from
>>>>>>>>>>>> the Docker environment where the Flink step is executed from ? Can 
>>>>>>>>>>>> you try
>>>>>>>>>>>> specifying the actual IP address of the node running the Kafka 
>>>>>>>>>>>> broker ?
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On Fri, Jun 5, 2020 at 2:53 PM Luke Cwik <lc...@google.com>
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>> +dev <d...@beam.apache.org> +Chamikara Jayalath
>>>>>>>>>>>> <chamik...@google.com> +Heejong Lee <heej...@google.com>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On Fri, Jun 5, 2020 at 8:29 AM Piotr Filipiuk <
>>>>>>>>>>>> piotr.filip...@gmail.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>> I am unable to read from Kafka and getting the following
>>>>>>>>>>>> warnings & errors when calling kafka.ReadFromKafka() (Python SDK):
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> WARNING:root:severity: WARN
>>>>>>>>>>>> timestamp {
>>>>>>>>>>>>   seconds: 1591370012
>>>>>>>>>>>>   nanos: 523000000
>>>>>>>>>>>> }
>>>>>>>>>>>> message: "[Consumer clientId=consumer-2, groupId=] Connection
>>>>>>>>>>>> to node -1 could not be established. Broker may not be available."
>>>>>>>>>>>> log_location: "org.apache.kafka.clients.NetworkClient"
>>>>>>>>>>>> thread: "18"
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> Finally the pipeline fails with:
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> RuntimeError: org.apache.beam.sdk.util.UserCodeException:
>>>>>>>>>>>> java.lang.RuntimeException:
>>>>>>>>>>>> org.apache.kafka.common.errors.TimeoutException: Timeout expired 
>>>>>>>>>>>> while
>>>>>>>>>>>> fetching topic metadata
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> See more complete log attached.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> The relevant code snippet:
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> consumer_conf = {"bootstrap.servers": 'localhost:9092'}
>>>>>>>>>>>>
>>>>>>>>>>>> ...
>>>>>>>>>>>>
>>>>>>>>>>>> kafka.ReadFromKafka(
>>>>>>>>>>>>                 consumer_config=consumer_conf,
>>>>>>>>>>>>                 topics=[args.topic],
>>>>>>>>>>>>
>>>>>>>>>>>> )
>>>>>>>>>>>>
>>>>>>>>>>>> ...
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> Also see full python script attached.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> I am using Beam Version: 2.21.0 and DirectRunner. For Flink
>>>>>>>>>>>> Runner I am also not able to read from topic.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> I am using kafka 2.5.0 and started the broker by following
>>>>>>>>>>>> https://kafka.apache.org/quickstart - using default
>>>>>>>>>>>> config/server.properties.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> Everything runs locally, and I verified that I can
>>>>>>>>>>>> publish&consume from that topic using confluent_kafka library.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> --
>>>>>>>>>>>>
>>>>>>>>>>>> Best regards,
>>>>>>>>>>>> Piotr
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> --
>>>>>>>>>>> Best regards,
>>>>>>>>>>> Piotr
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> --
>>>>>>>>>> Best regards,
>>>>>>>>>> Piotr
>>>>>>>>>>
>>>>>>>>>
>>>>>>
>>>>>> --
>>>>>> Best regards,
>>>>>> Piotr
>>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Best regards,
>>>>> Piotr
>>>>>
>>>>
>>>>
>>>> --
>>>> Best regards,
>>>> Piotr
>>>>
>>>
>>
>> --
>> Best regards,
>> Piotr
>>
>

Reply via email to