Thank you for clarifying. I attempted to use FlinkRunner with 2.22 and I am getting the following error, which I am not sure how to debug:
ERROR:root:java.lang.UnsupportedOperationException: The ActiveBundle does not have a registered bundle checkpoint handler. INFO:apache_beam.runners.portability.portable_runner:Job state changed to FAILED Starting pipeline kafka = 192.168.1.219:9092, topic = piotr-test Traceback (most recent call last): File "apache_beam/examples/streaming_wordcount_kafka.py", line 73, in <module> run() File "apache_beam/examples/streaming_wordcount_kafka.py", line 68, in run | "WriteUserScoreSums" >> beam.io.WriteToText(args.output) File "/Users/piotr.filipiuk/.virtualenvs/apache-beam/lib/python3.7/site-packages/apache_beam/pipeline.py", line 547, in __exit__ self.run().wait_until_finish() File "/Users/piotr.filipiuk/.virtualenvs/apache-beam/lib/python3.7/site-packages/apache_beam/runners/portability/portable_runner.py", line 583, in wait_until_finish raise self._runtime_exception RuntimeError: Pipeline BeamApp-piotr0filipiuk-0612212344-159c632b_45b947f8-bc64-4aad-a96c-6fb2bd834d60 failed in state FAILED: java.lang.UnsupportedOperationException: The ActiveBundle does not have a registered bundle checkpoint handler. My setup is (everything runs locally): Beam Version: 2.22.0. Kafka 2.5.0 (https://kafka.apache.org/quickstart - using default config/server.properties) Flink 1.10 ( https://ci.apache.org/projects/flink/flink-docs-stable/getting-started/tutorials/local_setup.html ) I run the pipeline using the following command: python apache_beam/examples/streaming_wordcount_kafka.py --bootstrap_servers=192.168.1.219:9092 --topic=piotr-test --runner=FlinkRunner --flink_version=1.10 --flink_master=192.168.1.219:8081 --environment_type=LOOPBACK I can see the following error in the logs: ERROR:apache_beam.runners.worker.data_plane:Failed to read inputs in the data plane. Traceback (most recent call last): File "/Users/piotr.filipiuk/.virtualenvs/apache-beam/lib/python3.7/site-packages/apache_beam/runners/worker/data_plane.py", line 528, in _read_inputs for elements in elements_iterator: File "/Users/piotr.filipiuk/.virtualenvs/apache-beam/lib/python3.7/site-packages/grpc/_channel.py", line 416, in __next__ return self._next() File "/Users/piotr.filipiuk/.virtualenvs/apache-beam/lib/python3.7/site-packages/grpc/_channel.py", line 689, in _next raise self grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that terminated with: status = StatusCode.UNAVAILABLE details = "DNS resolution failed" debug_error_string = "{"created":"@1591997030.613849000","description":"Failed to pick subchannel","file":"src/core/ext/filters/client_channel/client_channel.cc","file_line":3941,"referenced_errors":[{"created":"@1591997030.613847000","description":"Resolver transient failure","file":"src/core/ext/filters/client_channel/resolving_lb_policy.cc","file_line":262,"referenced_errors":[{"created":"@1591997030.613847000","description":"DNS resolution failed","file":"src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc","file_line":370,"grpc_status":14,"referenced_errors":[{"created":"@1591997030.613840000","description":"C-ares status is not ARES_SUCCESS: Misformatted domain name","file":"src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc","file_line":244,"referenced_errors":[{"created":"@1591997030.613728000","description":"C-ares status is not ARES_SUCCESS: Misformatted domain name","file":"src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc","file_line":244}]}]}]}]}" > Which I thought might be a culprit, however it also happens when running the wordcount.py example <https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/wordcount.py> that succeeds. That error appears only for Flink 1.10, not for Flink 1.9. Full log attached. I would appreciate help and suggestions on how to proceed. On Mon, Jun 8, 2020 at 5:49 PM Heejong Lee <heej...@google.com> wrote: > DirectRunner is not well-tested for xlang transforms and you need to > specify jar_packages experimental flag for Java dependencies from Python > SDK. I'd recommend using 2.22 + FlinkRunner for xlang pipelines. > > On Mon, Jun 8, 2020 at 3:27 PM Chamikara Jayalath <chamik...@google.com> > wrote: > >> To clarify, Kafka dependency was already available as an embedded >> dependency in Java SDK Harness but not sure if this worked for >> DirectRunner. starting 2.22 we'll be staging dependencies from the >> environment during pipeline submission. >> >> On Mon, Jun 8, 2020 at 3:23 PM Chamikara Jayalath <chamik...@google.com> >> wrote: >> >>> Seems like Java dependency is not being properly set up when running the >>> cross-language Kafka step. I don't think this was available for Beam 2.21. >>> Can you try with the latest Beam HEAD or Beam 2.22 when it's released ? >>> +Heejong Lee <heej...@google.com> >>> >>> On Mon, Jun 8, 2020 at 12:39 PM Piotr Filipiuk <piotr.filip...@gmail.com> >>> wrote: >>> >>>> Pasting the error inline: >>>> >>>> ERROR:root:severity: ERROR >>>> timestamp { >>>> seconds: 1591405163 >>>> nanos: 815000000 >>>> } >>>> message: "Client failed to dequeue and process the value" >>>> trace: "org.apache.beam.sdk.util.UserCodeException: >>>> java.lang.NoClassDefFoundError: >>>> org/springframework/expression/EvaluationContext\n\tat >>>> org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:36)\n\tat >>>> org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn$DoFnInvoker.invokeGetSize(Unknown >>>> Source)\n\tat >>>> org.apache.beam.fn.harness.FnApiDoFnRunner$2.outputWithTimestamp(FnApiDoFnRunner.java:497)\n\tat >>>> org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.output(FnApiDoFnRunner.java:1335)\n\tat >>>> org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:75)\n\tat >>>> org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn.splitRestriction(Read.java:504)\n\tat >>>> org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn$DoFnInvoker.invokeSplitRestriction(Unknown >>>> Source)\n\tat >>>> org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForSplitRestriction(FnApiDoFnRunner.java:715)\n\tat >>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:216)\n\tat >>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:179)\n\tat >>>> org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:874)\n\tat >>>> org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForPairWithRestriction(FnApiDoFnRunner.java:688)\n\tat >>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:216)\n\tat >>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:179)\n\tat >>>> org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:874)\n\tat >>>> org.apache.beam.fn.harness.FnApiDoFnRunner.access$600(FnApiDoFnRunner.java:121)\n\tat >>>> org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:1340)\n\tat >>>> org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.output(FnApiDoFnRunner.java:1335)\n\tat >>>> org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:75)\n\tat >>>> org.apache.beam.sdk.transforms.MapElements$1.processElement(MapElements.java:139)\n\tat >>>> org.apache.beam.sdk.transforms.MapElements$1$DoFnInvoker.invokeProcessElement(Unknown >>>> Source)\n\tat >>>> org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:672)\n\tat >>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:216)\n\tat >>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:179)\n\tat >>>> org.apache.beam.fn.harness.BeamFnDataReadRunner.forwardElementToConsumer(BeamFnDataReadRunner.java:177)\n\tat >>>> org.apache.beam.fn.harness.data.QueueingBeamFnDataClient.drainAndBlock(QueueingBeamFnDataClient.java:106)\n\tat >>>> org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:294)\n\tat >>>> org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:173)\n\tat >>>> org.apache.beam.fn.harness.control.BeamFnControlClient.lambda$processInstructionRequests$0(BeamFnControlClient.java:157)\n\tat >>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat >>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat >>>> java.lang.Thread.run(Thread.java:748)\nCaused by: >>>> java.lang.NoClassDefFoundError: >>>> org/springframework/expression/EvaluationContext\n\tat >>>> org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.<init>(KafkaUnboundedReader.java:478)\n\tat >>>> org.apache.beam.sdk.io.kafka.KafkaUnboundedSource.createReader(KafkaUnboundedSource.java:121)\n\tat >>>> org.apache.beam.sdk.io.kafka.KafkaUnboundedSource.createReader(KafkaUnboundedSource.java:43)\n\tat >>>> org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn.getSize(Read.java:465)\nCaused >>>> by: java.lang.ClassNotFoundException: >>>> org.springframework.expression.EvaluationContext\n\tat >>>> java.net.URLClassLoader.findClass(URLClassLoader.java:382)\n\tat >>>> java.lang.ClassLoader.loadClass(ClassLoader.java:419)\n\tat >>>> sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)\n\tat >>>> java.lang.ClassLoader.loadClass(ClassLoader.java:352)\n\tat >>>> org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.<init>(KafkaUnboundedReader.java:478)\n\tat >>>> org.apache.beam.sdk.io.kafka.KafkaUnboundedSource.createReader(KafkaUnboundedSource.java:121)\n\tat >>>> org.apache.beam.sdk.io.kafka.KafkaUnboundedSource.createReader(KafkaUnboundedSource.java:43)\n\tat >>>> org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn.getSize(Read.java:465)\n\tat >>>> org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn$DoFnInvoker.invokeGetSize(Unknown >>>> Source)\n\tat >>>> org.apache.beam.fn.harness.FnApiDoFnRunner$2.outputWithTimestamp(FnApiDoFnRunner.java:497)\n\tat >>>> org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.output(FnApiDoFnRunner.java:1335)\n\tat >>>> org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:75)\n\tat >>>> org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn.splitRestriction(Read.java:504)\n\tat >>>> org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn$DoFnInvoker.invokeSplitRestriction(Unknown >>>> Source)\n\tat >>>> org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForSplitRestriction(FnApiDoFnRunner.java:715)\n\tat >>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:216)\n\tat >>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:179)\n\tat >>>> org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:874)\n\tat >>>> org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForPairWithRestriction(FnApiDoFnRunner.java:688)\n\tat >>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:216)\n\tat >>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:179)\n\tat >>>> org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:874)\n\tat >>>> org.apache.beam.fn.harness.FnApiDoFnRunner.access$600(FnApiDoFnRunner.java:121)\n\tat >>>> org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:1340)\n\tat >>>> org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.output(FnApiDoFnRunner.java:1335)\n\tat >>>> org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:75)\n\tat >>>> org.apache.beam.sdk.transforms.MapElements$1.processElement(MapElements.java:139)\n\tat >>>> org.apache.beam.sdk.transforms.MapElements$1$DoFnInvoker.invokeProcessElement(Unknown >>>> Source)\n\tat >>>> org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:672)\n\tat >>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:216)\n\tat >>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:179)\n\tat >>>> org.apache.beam.fn.harness.BeamFnDataReadRunner.forwardElementToConsumer(BeamFnDataReadRunner.java:177)\n\tat >>>> org.apache.beam.fn.harness.data.QueueingBeamFnDataClient.drainAndBlock(QueueingBeamFnDataClient.java:106)\n\tat >>>> org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:294)\n\tat >>>> org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:173)\n\tat >>>> org.apache.beam.fn.harness.control.BeamFnControlClient.lambda$processInstructionRequests$0(BeamFnControlClient.java:157)\n\tat >>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat >>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat >>>> java.lang.Thread.run(Thread.java:748)\n" >>>> log_location: "org.apache.beam.fn.harness.data.QueueingBeamFnDataClient" >>>> >>>> On Fri, Jun 5, 2020 at 3:57 PM Piotr Filipiuk <piotr.filip...@gmail.com> >>>> wrote: >>>> >>>>> Thank you for the suggestions. >>>>> >>>>> Neither Kafka nor Flink run in a docker container, they all run >>>>> locally. Furthermore, the same issue happens for Direct Runner. That being >>>>> said changing from localhost:$PORT to $IP_ADDR:$PORT resulted in a >>>>> different error, see attached. >>>>> >>>>> On Fri, Jun 5, 2020 at 3:47 PM Venkat Muthuswamy < >>>>> venkat_pack...@yahoo.com> wrote: >>>>> >>>>>> Is Kafka itself running inside another container? If so inspect that >>>>>> container and see if it has a network alias and add that alias to your >>>>>> /etc/hosts file and map it to 127.0.0.1. >>>>>> >>>>>> >>>>>> >>>>>> *From:* Chamikara Jayalath <chamik...@google.com> >>>>>> *Sent:* Friday, June 5, 2020 2:58 PM >>>>>> *To:* Luke Cwik <lc...@google.com> >>>>>> *Cc:* user <u...@beam.apache.org>; dev <dev@beam.apache.org>; >>>>>> Heejong Lee <heej...@google.com> >>>>>> *Subject:* Re: Python SDK ReadFromKafka: Timeout expired while >>>>>> fetching topic metadata >>>>>> >>>>>> >>>>>> >>>>>> Is it possible that "'localhost:9092'" is not available from the >>>>>> Docker environment where the Flink step is executed from ? Can you try >>>>>> specifying the actual IP address of the node running the Kafka broker ? >>>>>> >>>>>> >>>>>> >>>>>> On Fri, Jun 5, 2020 at 2:53 PM Luke Cwik <lc...@google.com> wrote: >>>>>> >>>>>> +dev <dev@beam.apache.org> +Chamikara Jayalath <chamik...@google.com> >>>>>> +Heejong Lee <heej...@google.com> >>>>>> >>>>>> >>>>>> >>>>>> On Fri, Jun 5, 2020 at 8:29 AM Piotr Filipiuk < >>>>>> piotr.filip...@gmail.com> wrote: >>>>>> >>>>>> I am unable to read from Kafka and getting the following warnings & >>>>>> errors when calling kafka.ReadFromKafka() (Python SDK): >>>>>> >>>>>> >>>>>> >>>>>> WARNING:root:severity: WARN >>>>>> timestamp { >>>>>> seconds: 1591370012 >>>>>> nanos: 523000000 >>>>>> } >>>>>> message: "[Consumer clientId=consumer-2, groupId=] Connection to node >>>>>> -1 could not be established. Broker may not be available." >>>>>> log_location: "org.apache.kafka.clients.NetworkClient" >>>>>> thread: "18" >>>>>> >>>>>> >>>>>> >>>>>> Finally the pipeline fails with: >>>>>> >>>>>> >>>>>> >>>>>> RuntimeError: org.apache.beam.sdk.util.UserCodeException: >>>>>> java.lang.RuntimeException: >>>>>> org.apache.kafka.common.errors.TimeoutException: Timeout expired while >>>>>> fetching topic metadata >>>>>> >>>>>> >>>>>> >>>>>> See more complete log attached. >>>>>> >>>>>> >>>>>> >>>>>> The relevant code snippet: >>>>>> >>>>>> >>>>>> >>>>>> consumer_conf = {"bootstrap.servers": 'localhost:9092'} >>>>>> >>>>>> ... >>>>>> >>>>>> kafka.ReadFromKafka( >>>>>> consumer_config=consumer_conf, >>>>>> topics=[args.topic], >>>>>> >>>>>> ) >>>>>> >>>>>> ... >>>>>> >>>>>> >>>>>> >>>>>> Also see full python script attached. >>>>>> >>>>>> >>>>>> >>>>>> I am using Beam Version: 2.21.0 and DirectRunner. For Flink Runner I >>>>>> am also not able to read from topic. >>>>>> >>>>>> >>>>>> >>>>>> I am using kafka 2.5.0 and started the broker by following >>>>>> https://kafka.apache.org/quickstart - using default >>>>>> config/server.properties. >>>>>> >>>>>> >>>>>> >>>>>> Everything runs locally, and I verified that I can >>>>>> publish&consume from that topic using confluent_kafka library. >>>>>> >>>>>> >>>>>> >>>>>> -- >>>>>> >>>>>> Best regards, >>>>>> Piotr >>>>>> >>>>>> >>>>> >>>>> -- >>>>> Best regards, >>>>> Piotr >>>>> >>>> >>>> >>>> -- >>>> Best regards, >>>> Piotr >>>> >>> -- Best regards, Piotr
beam.log
Description: Binary data