Thank you for clarifying.

I attempted to use FlinkRunner with 2.22 and I am getting the following
error, which I am not sure how to debug:

ERROR:root:java.lang.UnsupportedOperationException: The ActiveBundle does
not have a registered bundle checkpoint handler.
INFO:apache_beam.runners.portability.portable_runner:Job state changed to
FAILED
Starting pipeline kafka = 192.168.1.219:9092, topic = piotr-test
Traceback (most recent call last):
  File "apache_beam/examples/streaming_wordcount_kafka.py", line 73, in
<module>
    run()
  File "apache_beam/examples/streaming_wordcount_kafka.py", line 68, in run
    | "WriteUserScoreSums" >> beam.io.WriteToText(args.output)
  File
"/Users/piotr.filipiuk/.virtualenvs/apache-beam/lib/python3.7/site-packages/apache_beam/pipeline.py",
line 547, in __exit__
    self.run().wait_until_finish()
  File
"/Users/piotr.filipiuk/.virtualenvs/apache-beam/lib/python3.7/site-packages/apache_beam/runners/portability/portable_runner.py",
line 583, in wait_until_finish
    raise self._runtime_exception
RuntimeError: Pipeline
BeamApp-piotr0filipiuk-0612212344-159c632b_45b947f8-bc64-4aad-a96c-6fb2bd834d60
failed in state FAILED: java.lang.UnsupportedOperationException: The
ActiveBundle does not have a registered bundle checkpoint handler.

My setup is (everything runs locally):
Beam Version: 2.22.0.
Kafka 2.5.0 (https://kafka.apache.org/quickstart - using default
config/server.properties)
Flink 1.10 (
https://ci.apache.org/projects/flink/flink-docs-stable/getting-started/tutorials/local_setup.html
)

I run the pipeline using the following command:

python apache_beam/examples/streaming_wordcount_kafka.py
--bootstrap_servers=192.168.1.219:9092 --topic=piotr-test
--runner=FlinkRunner --flink_version=1.10 --flink_master=192.168.1.219:8081
--environment_type=LOOPBACK

I can see the following error in the logs:

ERROR:apache_beam.runners.worker.data_plane:Failed to read inputs in the
data plane.
Traceback (most recent call last):
  File
"/Users/piotr.filipiuk/.virtualenvs/apache-beam/lib/python3.7/site-packages/apache_beam/runners/worker/data_plane.py",
line 528, in _read_inputs
    for elements in elements_iterator:
  File
"/Users/piotr.filipiuk/.virtualenvs/apache-beam/lib/python3.7/site-packages/grpc/_channel.py",
line 416, in __next__
    return self._next()
  File
"/Users/piotr.filipiuk/.virtualenvs/apache-beam/lib/python3.7/site-packages/grpc/_channel.py",
line 689, in _next
    raise self
grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC
that terminated with:
        status = StatusCode.UNAVAILABLE
        details = "DNS resolution failed"
        debug_error_string =
"{"created":"@1591997030.613849000","description":"Failed to pick
subchannel","file":"src/core/ext/filters/client_channel/client_channel.cc","file_line":3941,"referenced_errors":[{"created":"@1591997030.613847000","description":"Resolver
transient
failure","file":"src/core/ext/filters/client_channel/resolving_lb_policy.cc","file_line":262,"referenced_errors":[{"created":"@1591997030.613847000","description":"DNS
resolution
failed","file":"src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc","file_line":370,"grpc_status":14,"referenced_errors":[{"created":"@1591997030.613840000","description":"C-ares
status is not ARES_SUCCESS: Misformatted domain
name","file":"src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc","file_line":244,"referenced_errors":[{"created":"@1591997030.613728000","description":"C-ares
status is not ARES_SUCCESS: Misformatted domain
name","file":"src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc","file_line":244}]}]}]}]}"
>

Which I thought might be a culprit, however it also happens when running
the wordcount.py example
<https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/wordcount.py>
that succeeds. That error appears only for Flink 1.10, not for Flink 1.9.

Full log attached.

I would appreciate help and suggestions on how to proceed.


On Mon, Jun 8, 2020 at 5:49 PM Heejong Lee <heej...@google.com> wrote:

> DirectRunner is not well-tested for xlang transforms and you need to
> specify jar_packages experimental flag for Java dependencies from Python
> SDK. I'd recommend using 2.22 + FlinkRunner for xlang pipelines.
>
> On Mon, Jun 8, 2020 at 3:27 PM Chamikara Jayalath <chamik...@google.com>
> wrote:
>
>> To clarify, Kafka dependency was already available as an embedded
>> dependency in Java SDK Harness but not sure if this worked for
>> DirectRunner. starting 2.22 we'll be staging dependencies from the
>> environment during pipeline submission.
>>
>> On Mon, Jun 8, 2020 at 3:23 PM Chamikara Jayalath <chamik...@google.com>
>> wrote:
>>
>>> Seems like Java dependency is not being properly set up when running the
>>> cross-language Kafka step. I don't think this was available for Beam 2.21.
>>> Can you try with the latest Beam HEAD or Beam 2.22 when it's released ?
>>> +Heejong Lee <heej...@google.com>
>>>
>>> On Mon, Jun 8, 2020 at 12:39 PM Piotr Filipiuk <piotr.filip...@gmail.com>
>>> wrote:
>>>
>>>> Pasting the error inline:
>>>>
>>>> ERROR:root:severity: ERROR
>>>> timestamp {
>>>>   seconds: 1591405163
>>>>   nanos: 815000000
>>>> }
>>>> message: "Client failed to dequeue and process the value"
>>>> trace: "org.apache.beam.sdk.util.UserCodeException:
>>>> java.lang.NoClassDefFoundError:
>>>> org/springframework/expression/EvaluationContext\n\tat
>>>> org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:36)\n\tat
>>>> org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn$DoFnInvoker.invokeGetSize(Unknown
>>>> Source)\n\tat
>>>> org.apache.beam.fn.harness.FnApiDoFnRunner$2.outputWithTimestamp(FnApiDoFnRunner.java:497)\n\tat
>>>> org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.output(FnApiDoFnRunner.java:1335)\n\tat
>>>> org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:75)\n\tat
>>>> org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn.splitRestriction(Read.java:504)\n\tat
>>>> org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn$DoFnInvoker.invokeSplitRestriction(Unknown
>>>> Source)\n\tat
>>>> org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForSplitRestriction(FnApiDoFnRunner.java:715)\n\tat
>>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:216)\n\tat
>>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:179)\n\tat
>>>> org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:874)\n\tat
>>>> org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForPairWithRestriction(FnApiDoFnRunner.java:688)\n\tat
>>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:216)\n\tat
>>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:179)\n\tat
>>>> org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:874)\n\tat
>>>> org.apache.beam.fn.harness.FnApiDoFnRunner.access$600(FnApiDoFnRunner.java:121)\n\tat
>>>> org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:1340)\n\tat
>>>> org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.output(FnApiDoFnRunner.java:1335)\n\tat
>>>> org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:75)\n\tat
>>>> org.apache.beam.sdk.transforms.MapElements$1.processElement(MapElements.java:139)\n\tat
>>>> org.apache.beam.sdk.transforms.MapElements$1$DoFnInvoker.invokeProcessElement(Unknown
>>>> Source)\n\tat
>>>> org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:672)\n\tat
>>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:216)\n\tat
>>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:179)\n\tat
>>>> org.apache.beam.fn.harness.BeamFnDataReadRunner.forwardElementToConsumer(BeamFnDataReadRunner.java:177)\n\tat
>>>> org.apache.beam.fn.harness.data.QueueingBeamFnDataClient.drainAndBlock(QueueingBeamFnDataClient.java:106)\n\tat
>>>> org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:294)\n\tat
>>>> org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:173)\n\tat
>>>> org.apache.beam.fn.harness.control.BeamFnControlClient.lambda$processInstructionRequests$0(BeamFnControlClient.java:157)\n\tat
>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat
>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat
>>>> java.lang.Thread.run(Thread.java:748)\nCaused by:
>>>> java.lang.NoClassDefFoundError:
>>>> org/springframework/expression/EvaluationContext\n\tat
>>>> org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.<init>(KafkaUnboundedReader.java:478)\n\tat
>>>> org.apache.beam.sdk.io.kafka.KafkaUnboundedSource.createReader(KafkaUnboundedSource.java:121)\n\tat
>>>> org.apache.beam.sdk.io.kafka.KafkaUnboundedSource.createReader(KafkaUnboundedSource.java:43)\n\tat
>>>> org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn.getSize(Read.java:465)\nCaused
>>>> by: java.lang.ClassNotFoundException:
>>>> org.springframework.expression.EvaluationContext\n\tat
>>>> java.net.URLClassLoader.findClass(URLClassLoader.java:382)\n\tat
>>>> java.lang.ClassLoader.loadClass(ClassLoader.java:419)\n\tat
>>>> sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)\n\tat
>>>> java.lang.ClassLoader.loadClass(ClassLoader.java:352)\n\tat
>>>> org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.<init>(KafkaUnboundedReader.java:478)\n\tat
>>>> org.apache.beam.sdk.io.kafka.KafkaUnboundedSource.createReader(KafkaUnboundedSource.java:121)\n\tat
>>>> org.apache.beam.sdk.io.kafka.KafkaUnboundedSource.createReader(KafkaUnboundedSource.java:43)\n\tat
>>>> org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn.getSize(Read.java:465)\n\tat
>>>> org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn$DoFnInvoker.invokeGetSize(Unknown
>>>> Source)\n\tat
>>>> org.apache.beam.fn.harness.FnApiDoFnRunner$2.outputWithTimestamp(FnApiDoFnRunner.java:497)\n\tat
>>>> org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.output(FnApiDoFnRunner.java:1335)\n\tat
>>>> org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:75)\n\tat
>>>> org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn.splitRestriction(Read.java:504)\n\tat
>>>> org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn$DoFnInvoker.invokeSplitRestriction(Unknown
>>>> Source)\n\tat
>>>> org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForSplitRestriction(FnApiDoFnRunner.java:715)\n\tat
>>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:216)\n\tat
>>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:179)\n\tat
>>>> org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:874)\n\tat
>>>> org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForPairWithRestriction(FnApiDoFnRunner.java:688)\n\tat
>>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:216)\n\tat
>>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:179)\n\tat
>>>> org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:874)\n\tat
>>>> org.apache.beam.fn.harness.FnApiDoFnRunner.access$600(FnApiDoFnRunner.java:121)\n\tat
>>>> org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:1340)\n\tat
>>>> org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.output(FnApiDoFnRunner.java:1335)\n\tat
>>>> org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:75)\n\tat
>>>> org.apache.beam.sdk.transforms.MapElements$1.processElement(MapElements.java:139)\n\tat
>>>> org.apache.beam.sdk.transforms.MapElements$1$DoFnInvoker.invokeProcessElement(Unknown
>>>> Source)\n\tat
>>>> org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:672)\n\tat
>>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:216)\n\tat
>>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:179)\n\tat
>>>> org.apache.beam.fn.harness.BeamFnDataReadRunner.forwardElementToConsumer(BeamFnDataReadRunner.java:177)\n\tat
>>>> org.apache.beam.fn.harness.data.QueueingBeamFnDataClient.drainAndBlock(QueueingBeamFnDataClient.java:106)\n\tat
>>>> org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:294)\n\tat
>>>> org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:173)\n\tat
>>>> org.apache.beam.fn.harness.control.BeamFnControlClient.lambda$processInstructionRequests$0(BeamFnControlClient.java:157)\n\tat
>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat
>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat
>>>> java.lang.Thread.run(Thread.java:748)\n"
>>>> log_location: "org.apache.beam.fn.harness.data.QueueingBeamFnDataClient"
>>>>
>>>> On Fri, Jun 5, 2020 at 3:57 PM Piotr Filipiuk <piotr.filip...@gmail.com>
>>>> wrote:
>>>>
>>>>> Thank you for the suggestions.
>>>>>
>>>>> Neither Kafka nor Flink run in a docker container, they all run
>>>>> locally. Furthermore, the same issue happens for Direct Runner. That being
>>>>> said changing from localhost:$PORT to $IP_ADDR:$PORT resulted in a
>>>>> different error, see attached.
>>>>>
>>>>> On Fri, Jun 5, 2020 at 3:47 PM Venkat Muthuswamy <
>>>>> venkat_pack...@yahoo.com> wrote:
>>>>>
>>>>>> Is Kafka itself running inside another container? If so inspect that
>>>>>> container and see if it has a network alias and add that alias to your
>>>>>> /etc/hosts file and map it to 127.0.0.1.
>>>>>>
>>>>>>
>>>>>>
>>>>>> *From:* Chamikara Jayalath <chamik...@google.com>
>>>>>> *Sent:* Friday, June 5, 2020 2:58 PM
>>>>>> *To:* Luke Cwik <lc...@google.com>
>>>>>> *Cc:* user <u...@beam.apache.org>; dev <dev@beam.apache.org>;
>>>>>> Heejong Lee <heej...@google.com>
>>>>>> *Subject:* Re: Python SDK ReadFromKafka: Timeout expired while
>>>>>> fetching topic metadata
>>>>>>
>>>>>>
>>>>>>
>>>>>> Is it possible that "'localhost:9092'" is not available from the
>>>>>> Docker environment where the Flink step is executed from ? Can you try
>>>>>> specifying the actual IP address of the node running the Kafka broker ?
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Fri, Jun 5, 2020 at 2:53 PM Luke Cwik <lc...@google.com> wrote:
>>>>>>
>>>>>> +dev <dev@beam.apache.org> +Chamikara Jayalath <chamik...@google.com>
>>>>>>  +Heejong Lee <heej...@google.com>
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Fri, Jun 5, 2020 at 8:29 AM Piotr Filipiuk <
>>>>>> piotr.filip...@gmail.com> wrote:
>>>>>>
>>>>>> I am unable to read from Kafka and getting the following warnings &
>>>>>> errors when calling kafka.ReadFromKafka() (Python SDK):
>>>>>>
>>>>>>
>>>>>>
>>>>>> WARNING:root:severity: WARN
>>>>>> timestamp {
>>>>>>   seconds: 1591370012
>>>>>>   nanos: 523000000
>>>>>> }
>>>>>> message: "[Consumer clientId=consumer-2, groupId=] Connection to node
>>>>>> -1 could not be established. Broker may not be available."
>>>>>> log_location: "org.apache.kafka.clients.NetworkClient"
>>>>>> thread: "18"
>>>>>>
>>>>>>
>>>>>>
>>>>>> Finally the pipeline fails with:
>>>>>>
>>>>>>
>>>>>>
>>>>>> RuntimeError: org.apache.beam.sdk.util.UserCodeException:
>>>>>> java.lang.RuntimeException:
>>>>>> org.apache.kafka.common.errors.TimeoutException: Timeout expired while
>>>>>> fetching topic metadata
>>>>>>
>>>>>>
>>>>>>
>>>>>> See more complete log attached.
>>>>>>
>>>>>>
>>>>>>
>>>>>> The relevant code snippet:
>>>>>>
>>>>>>
>>>>>>
>>>>>> consumer_conf = {"bootstrap.servers": 'localhost:9092'}
>>>>>>
>>>>>> ...
>>>>>>
>>>>>> kafka.ReadFromKafka(
>>>>>>                 consumer_config=consumer_conf,
>>>>>>                 topics=[args.topic],
>>>>>>
>>>>>> )
>>>>>>
>>>>>> ...
>>>>>>
>>>>>>
>>>>>>
>>>>>> Also see full python script attached.
>>>>>>
>>>>>>
>>>>>>
>>>>>> I am using Beam Version: 2.21.0 and DirectRunner. For Flink Runner I
>>>>>> am also not able to read from topic.
>>>>>>
>>>>>>
>>>>>>
>>>>>> I am using kafka 2.5.0 and started the broker by following
>>>>>> https://kafka.apache.org/quickstart - using default
>>>>>> config/server.properties.
>>>>>>
>>>>>>
>>>>>>
>>>>>> Everything runs locally, and I verified that I can
>>>>>> publish&consume from that topic using confluent_kafka library.
>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>>
>>>>>> Best regards,
>>>>>> Piotr
>>>>>>
>>>>>>
>>>>>
>>>>> --
>>>>> Best regards,
>>>>> Piotr
>>>>>
>>>>
>>>>
>>>> --
>>>> Best regards,
>>>> Piotr
>>>>
>>>

-- 
Best regards,
Piotr

Attachment: beam.log
Description: Binary data

Reply via email to