To clarify, Kafka dependency was already available as an embedded
dependency in Java SDK Harness but not sure if this worked for
DirectRunner. starting 2.22 we'll be staging dependencies from the
environment during pipeline submission.

On Mon, Jun 8, 2020 at 3:23 PM Chamikara Jayalath <chamik...@google.com>
wrote:

> Seems like Java dependency is not being properly set up when running the
> cross-language Kafka step. I don't think this was available for Beam 2.21.
> Can you try with the latest Beam HEAD or Beam 2.22 when it's released ?
> +Heejong Lee <heej...@google.com>
>
> On Mon, Jun 8, 2020 at 12:39 PM Piotr Filipiuk <piotr.filip...@gmail.com>
> wrote:
>
>> Pasting the error inline:
>>
>> ERROR:root:severity: ERROR
>> timestamp {
>>   seconds: 1591405163
>>   nanos: 815000000
>> }
>> message: "Client failed to dequeue and process the value"
>> trace: "org.apache.beam.sdk.util.UserCodeException:
>> java.lang.NoClassDefFoundError:
>> org/springframework/expression/EvaluationContext\n\tat
>> org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:36)\n\tat
>> org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn$DoFnInvoker.invokeGetSize(Unknown
>> Source)\n\tat
>> org.apache.beam.fn.harness.FnApiDoFnRunner$2.outputWithTimestamp(FnApiDoFnRunner.java:497)\n\tat
>> org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.output(FnApiDoFnRunner.java:1335)\n\tat
>> org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:75)\n\tat
>> org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn.splitRestriction(Read.java:504)\n\tat
>> org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn$DoFnInvoker.invokeSplitRestriction(Unknown
>> Source)\n\tat
>> org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForSplitRestriction(FnApiDoFnRunner.java:715)\n\tat
>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:216)\n\tat
>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:179)\n\tat
>> org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:874)\n\tat
>> org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForPairWithRestriction(FnApiDoFnRunner.java:688)\n\tat
>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:216)\n\tat
>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:179)\n\tat
>> org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:874)\n\tat
>> org.apache.beam.fn.harness.FnApiDoFnRunner.access$600(FnApiDoFnRunner.java:121)\n\tat
>> org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:1340)\n\tat
>> org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.output(FnApiDoFnRunner.java:1335)\n\tat
>> org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:75)\n\tat
>> org.apache.beam.sdk.transforms.MapElements$1.processElement(MapElements.java:139)\n\tat
>> org.apache.beam.sdk.transforms.MapElements$1$DoFnInvoker.invokeProcessElement(Unknown
>> Source)\n\tat
>> org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:672)\n\tat
>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:216)\n\tat
>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:179)\n\tat
>> org.apache.beam.fn.harness.BeamFnDataReadRunner.forwardElementToConsumer(BeamFnDataReadRunner.java:177)\n\tat
>> org.apache.beam.fn.harness.data.QueueingBeamFnDataClient.drainAndBlock(QueueingBeamFnDataClient.java:106)\n\tat
>> org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:294)\n\tat
>> org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:173)\n\tat
>> org.apache.beam.fn.harness.control.BeamFnControlClient.lambda$processInstructionRequests$0(BeamFnControlClient.java:157)\n\tat
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat
>> java.lang.Thread.run(Thread.java:748)\nCaused by:
>> java.lang.NoClassDefFoundError:
>> org/springframework/expression/EvaluationContext\n\tat
>> org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.<init>(KafkaUnboundedReader.java:478)\n\tat
>> org.apache.beam.sdk.io.kafka.KafkaUnboundedSource.createReader(KafkaUnboundedSource.java:121)\n\tat
>> org.apache.beam.sdk.io.kafka.KafkaUnboundedSource.createReader(KafkaUnboundedSource.java:43)\n\tat
>> org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn.getSize(Read.java:465)\nCaused
>> by: java.lang.ClassNotFoundException:
>> org.springframework.expression.EvaluationContext\n\tat
>> java.net.URLClassLoader.findClass(URLClassLoader.java:382)\n\tat
>> java.lang.ClassLoader.loadClass(ClassLoader.java:419)\n\tat
>> sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)\n\tat
>> java.lang.ClassLoader.loadClass(ClassLoader.java:352)\n\tat
>> org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.<init>(KafkaUnboundedReader.java:478)\n\tat
>> org.apache.beam.sdk.io.kafka.KafkaUnboundedSource.createReader(KafkaUnboundedSource.java:121)\n\tat
>> org.apache.beam.sdk.io.kafka.KafkaUnboundedSource.createReader(KafkaUnboundedSource.java:43)\n\tat
>> org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn.getSize(Read.java:465)\n\tat
>> org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn$DoFnInvoker.invokeGetSize(Unknown
>> Source)\n\tat
>> org.apache.beam.fn.harness.FnApiDoFnRunner$2.outputWithTimestamp(FnApiDoFnRunner.java:497)\n\tat
>> org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.output(FnApiDoFnRunner.java:1335)\n\tat
>> org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:75)\n\tat
>> org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn.splitRestriction(Read.java:504)\n\tat
>> org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn$DoFnInvoker.invokeSplitRestriction(Unknown
>> Source)\n\tat
>> org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForSplitRestriction(FnApiDoFnRunner.java:715)\n\tat
>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:216)\n\tat
>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:179)\n\tat
>> org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:874)\n\tat
>> org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForPairWithRestriction(FnApiDoFnRunner.java:688)\n\tat
>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:216)\n\tat
>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:179)\n\tat
>> org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:874)\n\tat
>> org.apache.beam.fn.harness.FnApiDoFnRunner.access$600(FnApiDoFnRunner.java:121)\n\tat
>> org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:1340)\n\tat
>> org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.output(FnApiDoFnRunner.java:1335)\n\tat
>> org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:75)\n\tat
>> org.apache.beam.sdk.transforms.MapElements$1.processElement(MapElements.java:139)\n\tat
>> org.apache.beam.sdk.transforms.MapElements$1$DoFnInvoker.invokeProcessElement(Unknown
>> Source)\n\tat
>> org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:672)\n\tat
>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:216)\n\tat
>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:179)\n\tat
>> org.apache.beam.fn.harness.BeamFnDataReadRunner.forwardElementToConsumer(BeamFnDataReadRunner.java:177)\n\tat
>> org.apache.beam.fn.harness.data.QueueingBeamFnDataClient.drainAndBlock(QueueingBeamFnDataClient.java:106)\n\tat
>> org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:294)\n\tat
>> org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:173)\n\tat
>> org.apache.beam.fn.harness.control.BeamFnControlClient.lambda$processInstructionRequests$0(BeamFnControlClient.java:157)\n\tat
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat
>> java.lang.Thread.run(Thread.java:748)\n"
>> log_location: "org.apache.beam.fn.harness.data.QueueingBeamFnDataClient"
>>
>> On Fri, Jun 5, 2020 at 3:57 PM Piotr Filipiuk <piotr.filip...@gmail.com>
>> wrote:
>>
>>> Thank you for the suggestions.
>>>
>>> Neither Kafka nor Flink run in a docker container, they all run locally.
>>> Furthermore, the same issue happens for Direct Runner. That being said
>>> changing from localhost:$PORT to $IP_ADDR:$PORT resulted in a different
>>> error, see attached.
>>>
>>> On Fri, Jun 5, 2020 at 3:47 PM Venkat Muthuswamy <
>>> venkat_pack...@yahoo.com> wrote:
>>>
>>>> Is Kafka itself running inside another container? If so inspect that
>>>> container and see if it has a network alias and add that alias to your
>>>> /etc/hosts file and map it to 127.0.0.1.
>>>>
>>>>
>>>>
>>>> *From:* Chamikara Jayalath <chamik...@google.com>
>>>> *Sent:* Friday, June 5, 2020 2:58 PM
>>>> *To:* Luke Cwik <lc...@google.com>
>>>> *Cc:* user <user@beam.apache.org>; dev <d...@beam.apache.org>; Heejong
>>>> Lee <heej...@google.com>
>>>> *Subject:* Re: Python SDK ReadFromKafka: Timeout expired while
>>>> fetching topic metadata
>>>>
>>>>
>>>>
>>>> Is it possible that "'localhost:9092'" is not available from the Docker
>>>> environment where the Flink step is executed from ? Can you try specifying
>>>> the actual IP address of the node running the Kafka broker ?
>>>>
>>>>
>>>>
>>>> On Fri, Jun 5, 2020 at 2:53 PM Luke Cwik <lc...@google.com> wrote:
>>>>
>>>> +dev <d...@beam.apache.org> +Chamikara Jayalath <chamik...@google.com> 
>>>> +Heejong
>>>> Lee <heej...@google.com>
>>>>
>>>>
>>>>
>>>> On Fri, Jun 5, 2020 at 8:29 AM Piotr Filipiuk <piotr.filip...@gmail.com>
>>>> wrote:
>>>>
>>>> I am unable to read from Kafka and getting the following warnings &
>>>> errors when calling kafka.ReadFromKafka() (Python SDK):
>>>>
>>>>
>>>>
>>>> WARNING:root:severity: WARN
>>>> timestamp {
>>>>   seconds: 1591370012
>>>>   nanos: 523000000
>>>> }
>>>> message: "[Consumer clientId=consumer-2, groupId=] Connection to node
>>>> -1 could not be established. Broker may not be available."
>>>> log_location: "org.apache.kafka.clients.NetworkClient"
>>>> thread: "18"
>>>>
>>>>
>>>>
>>>> Finally the pipeline fails with:
>>>>
>>>>
>>>>
>>>> RuntimeError: org.apache.beam.sdk.util.UserCodeException:
>>>> java.lang.RuntimeException:
>>>> org.apache.kafka.common.errors.TimeoutException: Timeout expired while
>>>> fetching topic metadata
>>>>
>>>>
>>>>
>>>> See more complete log attached.
>>>>
>>>>
>>>>
>>>> The relevant code snippet:
>>>>
>>>>
>>>>
>>>> consumer_conf = {"bootstrap.servers": 'localhost:9092'}
>>>>
>>>> ...
>>>>
>>>> kafka.ReadFromKafka(
>>>>                 consumer_config=consumer_conf,
>>>>                 topics=[args.topic],
>>>>
>>>> )
>>>>
>>>> ...
>>>>
>>>>
>>>>
>>>> Also see full python script attached.
>>>>
>>>>
>>>>
>>>> I am using Beam Version: 2.21.0 and DirectRunner. For Flink Runner I am
>>>> also not able to read from topic.
>>>>
>>>>
>>>>
>>>> I am using kafka 2.5.0 and started the broker by following
>>>> https://kafka.apache.org/quickstart - using default
>>>> config/server.properties.
>>>>
>>>>
>>>>
>>>> Everything runs locally, and I verified that I can publish&consume from
>>>> that topic using confluent_kafka library.
>>>>
>>>>
>>>>
>>>> --
>>>>
>>>> Best regards,
>>>> Piotr
>>>>
>>>>
>>>
>>> --
>>> Best regards,
>>> Piotr
>>>
>>
>>
>> --
>> Best regards,
>> Piotr
>>
>

Reply via email to