Never mind, I found this thread on user list: https://lists.apache.org/thread.html/raeb69afbd820fdf32b3cf0a273060b6b149f80fa49c7414a1bb60528%40%3Cuser.beam.apache.org%3E, which answers my question.
On Mon, Jul 13, 2020 at 4:10 PM Kamil Wasilewski < kamil.wasilew...@polidea.com> wrote: > I'd like to bump this thread up since I get the same error when trying to > read from Kafka in Python SDK: > > *java.lang.UnsupportedOperationException: The ActiveBundle does not have a > registered bundle checkpoint handler.* > > Can someone familiar with cross-language and Flink verify the problem? I > use the latest Beam master with the following pipeline options: > > --runner=FlinkRunner > --parallelism=2 > --experiment=beam_fn_api > --environment_type=DOCKER > --environment_cache_millis=10000 > > Those are the same options which are used in CrossLanguageKafkaIOTest: > https://github.com/apache/beam/blob/master/sdks/python/test-suites/portable/common.gradle#L114 > Speaking of which, is there a specific reason why reading from Kafka is not > yet being tested by Jenkins at the moment? > > Thanks, > Kamil > > On Thu, Jun 18, 2020 at 11:35 PM Piotr Filipiuk <piotr.filip...@gmail.com> > wrote: > >> Thank you for clarifying. >> >> Would you mind clarifying whether the issues that I experience running >> Kafka IO on Flink (or DirectRunner for testing) specific to my setup etc. >> or this setup is not yet fully functional (for Python SDK)? >> >> On Thu, Jun 18, 2020 at 12:03 PM Chamikara Jayalath <chamik...@google.com> >> wrote: >> >>> Beam does not have a concept of general availability. It's released with >>> Beam so available. Some of the APIs used by Kafka are experimental so are >>> subject to change (but less likely at this point). >>> Various runners may offer their own levels of availability for >>> cross-language transforms. >>> >>> Thanks, >>> Cham >>> >>> >>> On Thu, Jun 18, 2020 at 11:26 AM Piotr Filipiuk < >>> piotr.filip...@gmail.com> wrote: >>> >>>> I also wanted to clarify whether Kafka IO for Python SDK is general >>>> availability or is it still experimental? >>>> >>>> On Fri, Jun 12, 2020 at 2:52 PM Piotr Filipiuk < >>>> piotr.filip...@gmail.com> wrote: >>>> >>>>> For completeness I am also attaching task manager logs. >>>>> >>>>> On Fri, Jun 12, 2020 at 2:32 PM Piotr Filipiuk < >>>>> piotr.filip...@gmail.com> wrote: >>>>> >>>>>> Thank you for clarifying. >>>>>> >>>>>> I attempted to use FlinkRunner with 2.22 and I am >>>>>> getting the following error, which I am not sure how to debug: >>>>>> >>>>>> ERROR:root:java.lang.UnsupportedOperationException: The ActiveBundle >>>>>> does not have a registered bundle checkpoint handler. >>>>>> INFO:apache_beam.runners.portability.portable_runner:Job state >>>>>> changed to FAILED >>>>>> Starting pipeline kafka = 192.168.1.219:9092, topic = piotr-test >>>>>> Traceback (most recent call last): >>>>>> File "apache_beam/examples/streaming_wordcount_kafka.py", line 73, >>>>>> in <module> >>>>>> run() >>>>>> File "apache_beam/examples/streaming_wordcount_kafka.py", line 68, >>>>>> in run >>>>>> | "WriteUserScoreSums" >> beam.io.WriteToText(args.output) >>>>>> File >>>>>> "/Users/piotr.filipiuk/.virtualenvs/apache-beam/lib/python3.7/site-packages/apache_beam/pipeline.py", >>>>>> line 547, in __exit__ >>>>>> self.run().wait_until_finish() >>>>>> File >>>>>> "/Users/piotr.filipiuk/.virtualenvs/apache-beam/lib/python3.7/site-packages/apache_beam/runners/portability/portable_runner.py", >>>>>> line 583, in wait_until_finish >>>>>> raise self._runtime_exception >>>>>> RuntimeError: Pipeline >>>>>> BeamApp-piotr0filipiuk-0612212344-159c632b_45b947f8-bc64-4aad-a96c-6fb2bd834d60 >>>>>> failed in state FAILED: java.lang.UnsupportedOperationException: The >>>>>> ActiveBundle does not have a registered bundle checkpoint handler. >>>>>> >>>>>> My setup is (everything runs locally): >>>>>> Beam Version: 2.22.0. >>>>>> Kafka 2.5.0 (https://kafka.apache.org/quickstart - using default >>>>>> config/server.properties) >>>>>> Flink 1.10 ( >>>>>> https://ci.apache.org/projects/flink/flink-docs-stable/getting-started/tutorials/local_setup.html >>>>>> ) >>>>>> >>>>>> I run the pipeline using the following command: >>>>>> >>>>>> python apache_beam/examples/streaming_wordcount_kafka.py >>>>>> --bootstrap_servers=192.168.1.219:9092 --topic=piotr-test >>>>>> --runner=FlinkRunner --flink_version=1.10 --flink_master= >>>>>> 192.168.1.219:8081 --environment_type=LOOPBACK >>>>>> >>>>>> I can see the following error in the logs: >>>>>> >>>>>> ERROR:apache_beam.runners.worker.data_plane:Failed to read inputs in >>>>>> the data plane. >>>>>> Traceback (most recent call last): >>>>>> File >>>>>> "/Users/piotr.filipiuk/.virtualenvs/apache-beam/lib/python3.7/site-packages/apache_beam/runners/worker/data_plane.py", >>>>>> line 528, in _read_inputs >>>>>> for elements in elements_iterator: >>>>>> File >>>>>> "/Users/piotr.filipiuk/.virtualenvs/apache-beam/lib/python3.7/site-packages/grpc/_channel.py", >>>>>> line 416, in __next__ >>>>>> return self._next() >>>>>> File >>>>>> "/Users/piotr.filipiuk/.virtualenvs/apache-beam/lib/python3.7/site-packages/grpc/_channel.py", >>>>>> line 689, in _next >>>>>> raise self >>>>>> grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of >>>>>> RPC that terminated with: >>>>>> status = StatusCode.UNAVAILABLE >>>>>> details = "DNS resolution failed" >>>>>> debug_error_string = >>>>>> "{"created":"@1591997030.613849000","description":"Failed to pick >>>>>> subchannel","file":"src/core/ext/filters/client_channel/client_channel.cc","file_line":3941,"referenced_errors":[{"created":"@1591997030.613847000","description":"Resolver >>>>>> transient >>>>>> failure","file":"src/core/ext/filters/client_channel/resolving_lb_policy.cc","file_line":262,"referenced_errors":[{"created":"@1591997030.613847000","description":"DNS >>>>>> resolution >>>>>> failed","file":"src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc","file_line":370,"grpc_status":14,"referenced_errors":[{"created":"@1591997030.613840000","description":"C-ares >>>>>> status is not ARES_SUCCESS: Misformatted domain >>>>>> name","file":"src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc","file_line":244,"referenced_errors":[{"created":"@1591997030.613728000","description":"C-ares >>>>>> status is not ARES_SUCCESS: Misformatted domain >>>>>> name","file":"src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc","file_line":244}]}]}]}]}" >>>>>> > >>>>>> >>>>>> Which I thought might be a culprit, however it also happens when >>>>>> running the wordcount.py example >>>>>> <https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/wordcount.py> >>>>>> that succeeds. That error appears only for Flink 1.10, not for Flink 1.9. >>>>>> >>>>>> Full log attached. >>>>>> >>>>>> I would appreciate help and suggestions on how to proceed. >>>>>> >>>>>> >>>>>> On Mon, Jun 8, 2020 at 5:49 PM Heejong Lee <heej...@google.com> >>>>>> wrote: >>>>>> >>>>>>> DirectRunner is not well-tested for xlang transforms and you need to >>>>>>> specify jar_packages experimental flag for Java dependencies from Python >>>>>>> SDK. I'd recommend using 2.22 + FlinkRunner for xlang pipelines. >>>>>>> >>>>>>> On Mon, Jun 8, 2020 at 3:27 PM Chamikara Jayalath < >>>>>>> chamik...@google.com> wrote: >>>>>>> >>>>>>>> To clarify, Kafka dependency was already available as an embedded >>>>>>>> dependency in Java SDK Harness but not sure if this worked for >>>>>>>> DirectRunner. starting 2.22 we'll be staging dependencies from the >>>>>>>> environment during pipeline submission. >>>>>>>> >>>>>>>> On Mon, Jun 8, 2020 at 3:23 PM Chamikara Jayalath < >>>>>>>> chamik...@google.com> wrote: >>>>>>>> >>>>>>>>> Seems like Java dependency is not being properly set up when >>>>>>>>> running the cross-language Kafka step. I don't think this was >>>>>>>>> available for >>>>>>>>> Beam 2.21. Can you try with the latest Beam HEAD or Beam 2.22 when >>>>>>>>> it's >>>>>>>>> released ? >>>>>>>>> +Heejong Lee <heej...@google.com> >>>>>>>>> >>>>>>>>> On Mon, Jun 8, 2020 at 12:39 PM Piotr Filipiuk < >>>>>>>>> piotr.filip...@gmail.com> wrote: >>>>>>>>> >>>>>>>>>> Pasting the error inline: >>>>>>>>>> >>>>>>>>>> ERROR:root:severity: ERROR >>>>>>>>>> timestamp { >>>>>>>>>> seconds: 1591405163 >>>>>>>>>> nanos: 815000000 >>>>>>>>>> } >>>>>>>>>> message: "Client failed to dequeue and process the value" >>>>>>>>>> trace: "org.apache.beam.sdk.util.UserCodeException: >>>>>>>>>> java.lang.NoClassDefFoundError: >>>>>>>>>> org/springframework/expression/EvaluationContext\n\tat >>>>>>>>>> org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:36)\n\tat >>>>>>>>>> org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn$DoFnInvoker.invokeGetSize(Unknown >>>>>>>>>> Source)\n\tat >>>>>>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner$2.outputWithTimestamp(FnApiDoFnRunner.java:497)\n\tat >>>>>>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.output(FnApiDoFnRunner.java:1335)\n\tat >>>>>>>>>> org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:75)\n\tat >>>>>>>>>> org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn.splitRestriction(Read.java:504)\n\tat >>>>>>>>>> org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn$DoFnInvoker.invokeSplitRestriction(Unknown >>>>>>>>>> Source)\n\tat >>>>>>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForSplitRestriction(FnApiDoFnRunner.java:715)\n\tat >>>>>>>>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:216)\n\tat >>>>>>>>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:179)\n\tat >>>>>>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:874)\n\tat >>>>>>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForPairWithRestriction(FnApiDoFnRunner.java:688)\n\tat >>>>>>>>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:216)\n\tat >>>>>>>>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:179)\n\tat >>>>>>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:874)\n\tat >>>>>>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner.access$600(FnApiDoFnRunner.java:121)\n\tat >>>>>>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:1340)\n\tat >>>>>>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.output(FnApiDoFnRunner.java:1335)\n\tat >>>>>>>>>> org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:75)\n\tat >>>>>>>>>> org.apache.beam.sdk.transforms.MapElements$1.processElement(MapElements.java:139)\n\tat >>>>>>>>>> org.apache.beam.sdk.transforms.MapElements$1$DoFnInvoker.invokeProcessElement(Unknown >>>>>>>>>> Source)\n\tat >>>>>>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:672)\n\tat >>>>>>>>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:216)\n\tat >>>>>>>>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:179)\n\tat >>>>>>>>>> org.apache.beam.fn.harness.BeamFnDataReadRunner.forwardElementToConsumer(BeamFnDataReadRunner.java:177)\n\tat >>>>>>>>>> org.apache.beam.fn.harness.data.QueueingBeamFnDataClient.drainAndBlock(QueueingBeamFnDataClient.java:106)\n\tat >>>>>>>>>> org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:294)\n\tat >>>>>>>>>> org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:173)\n\tat >>>>>>>>>> org.apache.beam.fn.harness.control.BeamFnControlClient.lambda$processInstructionRequests$0(BeamFnControlClient.java:157)\n\tat >>>>>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat >>>>>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat >>>>>>>>>> java.lang.Thread.run(Thread.java:748)\nCaused by: >>>>>>>>>> java.lang.NoClassDefFoundError: >>>>>>>>>> org/springframework/expression/EvaluationContext\n\tat >>>>>>>>>> org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.<init>(KafkaUnboundedReader.java:478)\n\tat >>>>>>>>>> org.apache.beam.sdk.io.kafka.KafkaUnboundedSource.createReader(KafkaUnboundedSource.java:121)\n\tat >>>>>>>>>> org.apache.beam.sdk.io.kafka.KafkaUnboundedSource.createReader(KafkaUnboundedSource.java:43)\n\tat >>>>>>>>>> org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn.getSize(Read.java:465)\nCaused >>>>>>>>>> by: java.lang.ClassNotFoundException: >>>>>>>>>> org.springframework.expression.EvaluationContext\n\tat >>>>>>>>>> java.net.URLClassLoader.findClass(URLClassLoader.java:382)\n\tat >>>>>>>>>> java.lang.ClassLoader.loadClass(ClassLoader.java:419)\n\tat >>>>>>>>>> sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)\n\tat >>>>>>>>>> java.lang.ClassLoader.loadClass(ClassLoader.java:352)\n\tat >>>>>>>>>> org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.<init>(KafkaUnboundedReader.java:478)\n\tat >>>>>>>>>> org.apache.beam.sdk.io.kafka.KafkaUnboundedSource.createReader(KafkaUnboundedSource.java:121)\n\tat >>>>>>>>>> org.apache.beam.sdk.io.kafka.KafkaUnboundedSource.createReader(KafkaUnboundedSource.java:43)\n\tat >>>>>>>>>> org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn.getSize(Read.java:465)\n\tat >>>>>>>>>> org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn$DoFnInvoker.invokeGetSize(Unknown >>>>>>>>>> Source)\n\tat >>>>>>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner$2.outputWithTimestamp(FnApiDoFnRunner.java:497)\n\tat >>>>>>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.output(FnApiDoFnRunner.java:1335)\n\tat >>>>>>>>>> org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:75)\n\tat >>>>>>>>>> org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn.splitRestriction(Read.java:504)\n\tat >>>>>>>>>> org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn$DoFnInvoker.invokeSplitRestriction(Unknown >>>>>>>>>> Source)\n\tat >>>>>>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForSplitRestriction(FnApiDoFnRunner.java:715)\n\tat >>>>>>>>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:216)\n\tat >>>>>>>>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:179)\n\tat >>>>>>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:874)\n\tat >>>>>>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForPairWithRestriction(FnApiDoFnRunner.java:688)\n\tat >>>>>>>>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:216)\n\tat >>>>>>>>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:179)\n\tat >>>>>>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:874)\n\tat >>>>>>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner.access$600(FnApiDoFnRunner.java:121)\n\tat >>>>>>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:1340)\n\tat >>>>>>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.output(FnApiDoFnRunner.java:1335)\n\tat >>>>>>>>>> org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:75)\n\tat >>>>>>>>>> org.apache.beam.sdk.transforms.MapElements$1.processElement(MapElements.java:139)\n\tat >>>>>>>>>> org.apache.beam.sdk.transforms.MapElements$1$DoFnInvoker.invokeProcessElement(Unknown >>>>>>>>>> Source)\n\tat >>>>>>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:672)\n\tat >>>>>>>>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:216)\n\tat >>>>>>>>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:179)\n\tat >>>>>>>>>> org.apache.beam.fn.harness.BeamFnDataReadRunner.forwardElementToConsumer(BeamFnDataReadRunner.java:177)\n\tat >>>>>>>>>> org.apache.beam.fn.harness.data.QueueingBeamFnDataClient.drainAndBlock(QueueingBeamFnDataClient.java:106)\n\tat >>>>>>>>>> org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:294)\n\tat >>>>>>>>>> org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:173)\n\tat >>>>>>>>>> org.apache.beam.fn.harness.control.BeamFnControlClient.lambda$processInstructionRequests$0(BeamFnControlClient.java:157)\n\tat >>>>>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat >>>>>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat >>>>>>>>>> java.lang.Thread.run(Thread.java:748)\n" >>>>>>>>>> log_location: >>>>>>>>>> "org.apache.beam.fn.harness.data.QueueingBeamFnDataClient" >>>>>>>>>> >>>>>>>>>> On Fri, Jun 5, 2020 at 3:57 PM Piotr Filipiuk < >>>>>>>>>> piotr.filip...@gmail.com> wrote: >>>>>>>>>> >>>>>>>>>>> Thank you for the suggestions. >>>>>>>>>>> >>>>>>>>>>> Neither Kafka nor Flink run in a docker container, they all run >>>>>>>>>>> locally. Furthermore, the same issue happens for Direct Runner. >>>>>>>>>>> That being >>>>>>>>>>> said changing from localhost:$PORT to $IP_ADDR:$PORT resulted in a >>>>>>>>>>> different error, see attached. >>>>>>>>>>> >>>>>>>>>>> On Fri, Jun 5, 2020 at 3:47 PM Venkat Muthuswamy < >>>>>>>>>>> venkat_pack...@yahoo.com> wrote: >>>>>>>>>>> >>>>>>>>>>>> Is Kafka itself running inside another container? If so inspect >>>>>>>>>>>> that container and see if it has a network alias and add that >>>>>>>>>>>> alias to your >>>>>>>>>>>> /etc/hosts file and map it to 127.0.0.1. >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> *From:* Chamikara Jayalath <chamik...@google.com> >>>>>>>>>>>> *Sent:* Friday, June 5, 2020 2:58 PM >>>>>>>>>>>> *To:* Luke Cwik <lc...@google.com> >>>>>>>>>>>> *Cc:* user <user@beam.apache.org>; dev <d...@beam.apache.org>; >>>>>>>>>>>> Heejong Lee <heej...@google.com> >>>>>>>>>>>> *Subject:* Re: Python SDK ReadFromKafka: Timeout expired while >>>>>>>>>>>> fetching topic metadata >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> Is it possible that "'localhost:9092'" is not available from >>>>>>>>>>>> the Docker environment where the Flink step is executed from ? Can >>>>>>>>>>>> you try >>>>>>>>>>>> specifying the actual IP address of the node running the Kafka >>>>>>>>>>>> broker ? >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> On Fri, Jun 5, 2020 at 2:53 PM Luke Cwik <lc...@google.com> >>>>>>>>>>>> wrote: >>>>>>>>>>>> >>>>>>>>>>>> +dev <d...@beam.apache.org> +Chamikara Jayalath >>>>>>>>>>>> <chamik...@google.com> +Heejong Lee <heej...@google.com> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> On Fri, Jun 5, 2020 at 8:29 AM Piotr Filipiuk < >>>>>>>>>>>> piotr.filip...@gmail.com> wrote: >>>>>>>>>>>> >>>>>>>>>>>> I am unable to read from Kafka and getting the following >>>>>>>>>>>> warnings & errors when calling kafka.ReadFromKafka() (Python SDK): >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> WARNING:root:severity: WARN >>>>>>>>>>>> timestamp { >>>>>>>>>>>> seconds: 1591370012 >>>>>>>>>>>> nanos: 523000000 >>>>>>>>>>>> } >>>>>>>>>>>> message: "[Consumer clientId=consumer-2, groupId=] Connection >>>>>>>>>>>> to node -1 could not be established. Broker may not be available." >>>>>>>>>>>> log_location: "org.apache.kafka.clients.NetworkClient" >>>>>>>>>>>> thread: "18" >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> Finally the pipeline fails with: >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> RuntimeError: org.apache.beam.sdk.util.UserCodeException: >>>>>>>>>>>> java.lang.RuntimeException: >>>>>>>>>>>> org.apache.kafka.common.errors.TimeoutException: Timeout expired >>>>>>>>>>>> while >>>>>>>>>>>> fetching topic metadata >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> See more complete log attached. >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> The relevant code snippet: >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> consumer_conf = {"bootstrap.servers": 'localhost:9092'} >>>>>>>>>>>> >>>>>>>>>>>> ... >>>>>>>>>>>> >>>>>>>>>>>> kafka.ReadFromKafka( >>>>>>>>>>>> consumer_config=consumer_conf, >>>>>>>>>>>> topics=[args.topic], >>>>>>>>>>>> >>>>>>>>>>>> ) >>>>>>>>>>>> >>>>>>>>>>>> ... >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> Also see full python script attached. >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> I am using Beam Version: 2.21.0 and DirectRunner. For Flink >>>>>>>>>>>> Runner I am also not able to read from topic. >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> I am using kafka 2.5.0 and started the broker by following >>>>>>>>>>>> https://kafka.apache.org/quickstart - using default >>>>>>>>>>>> config/server.properties. >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> Everything runs locally, and I verified that I can >>>>>>>>>>>> publish&consume from that topic using confluent_kafka library. >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> -- >>>>>>>>>>>> >>>>>>>>>>>> Best regards, >>>>>>>>>>>> Piotr >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> -- >>>>>>>>>>> Best regards, >>>>>>>>>>> Piotr >>>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> -- >>>>>>>>>> Best regards, >>>>>>>>>> Piotr >>>>>>>>>> >>>>>>>>> >>>>>> >>>>>> -- >>>>>> Best regards, >>>>>> Piotr >>>>>> >>>>> >>>>> >>>>> -- >>>>> Best regards, >>>>> Piotr >>>>> >>>> >>>> >>>> -- >>>> Best regards, >>>> Piotr >>>> >>> >> >> -- >> Best regards, >> Piotr >> >