Yeah, this is a known issue. According to +Boyuan Zhang <boyu...@google.com>'s
comment in the bug you should still be able to read as long as Kafka
cluster is set up to auto-commit though and these errors can be safely
ignored. For example, you can set "enable.auto.commit" to "true" in the
consumer config passed to ReadFromKafka.
I haven't tried this myself though so please comment in the JIRA if this is
a true blocker for you for reading from Kafka which will help us to
identify the true priority of this JIRA.

Thanks,
Cham

On Mon, Jul 20, 2020 at 2:40 PM ayush sharma <1705ay...@gmail.com> wrote:

> Is there any workaround to this issue?
>
> On Mon, Jul 20, 2020 at 5:33 PM ayush sharma <1705ay...@gmail.com> wrote:
>
>> Thank you for the suggestions. I tried using FlinkRunner and
>> setting environment_type either to DOCKER or LOOPBACK gives an error -
>> java.lang.UnsupportedOperationException: The ActiveBundle does not have a
>> registered bundle checkpoint handler.
>>
>> I found that this issue has been reported (
>> https://issues.apache.org/jira/browse/BEAM-6868) and hence upvoting it.
>> Thank you for the prompt responses and looking forward to using this
>> feature in the future.
>>
>> Regards,
>> Ayush.
>>
>> On Sat, Jul 18, 2020 at 3:14 PM Chamikara Jayalath <chamik...@google.com>
>> wrote:
>>
>>>
>>>
>>> On Fri, Jul 17, 2020 at 10:04 PM ayush sharma <1705ay...@gmail.com>
>>> wrote:
>>>
>>>> Thank you guys for the reply. I am really stuck and could not proceed
>>>> further.
>>>> Yes, the previous trial published message had null key.
>>>> But when I send key:value pair through producer using
>>>>
>>>> ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic
>>>> mytopic --property *"parse.key=true" --property "key.separator=:"*
>>>> > tryKey:tryValue
>>>>
>>>> I do not get any error but beam does not print the received message.
>>>> Here is how my pipeline looks like,
>>>> result = (
>>>>         pipeline
>>>>
>>>>         | "Read from kafka" >> ReadFromKafka(
>>>>             consumer_config={
>>>>                 "bootstrap.servers": 'localhost:9092',
>>>>             },
>>>>             topics=['mytopic'],
>>>>             expansion_service='localhost:8097',
>>>>
>>>>         | "print" >> beam.Map(print)
>>>>         )
>>>>
>>>>
>>> I suspect DirectRunner in LOOPBACK mode might not be working for
>>> cross-language transforms today. Please note that cross-language transforms
>>> framework is fairly new [1] and we are adding support for various runners
>>> and environment configurations.
>>> Can you try with Flink in DOCKER mode ?
>>>
>>>
>>>> If this is not the way we make beam and kafka communicate then please
>>>> share a working example which showcases how a message published in kafka
>>>> gets received by beam while streaming.
>>>>
>>>
>>> I'm adding an example but I've only tested this with Dataflow yet. I
>>> hope to test that example for more runners and add additional instructions
>>> there.
>>> https://github.com/apache/beam/pull/12188
>>>
>>> Thanks,
>>> Cham
>>>
>>> [1] https://beam.apache.org/roadmap/connectors-multi-sdk/
>>>
>>>>
>>>> Regards,
>>>> Ayush Sharma
>>>>
>>>> On Fri, Jul 17, 2020 at 11:39 PM Chamikara Jayalath <
>>>> chamik...@google.com> wrote:
>>>>
>>>>> Yes, seems like this is due to the key being null. XLang KafkaIO has
>>>>> to be updated to support this. You should not run into this error if you
>>>>> publish keys and values that are not null.
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Fri, Jul 17, 2020 at 8:04 PM Luke Cwik <lc...@google.com> wrote:
>>>>>
>>>>>> +dev <d...@beam.apache.org>
>>>>>>
>>>>>> On Fri, Jul 17, 2020 at 8:03 PM Luke Cwik <lc...@google.com> wrote:
>>>>>>
>>>>>>> +Heejong Lee <heej...@google.com> +Chamikara Jayalath
>>>>>>> <chamik...@google.com>
>>>>>>>
>>>>>>> Do you know if your trial record has an empty key or value?
>>>>>>> If so, then you hit a bug and it seems as though there was a miss
>>>>>>> supporting this usecase.
>>>>>>>
>>>>>>> Heejong and Cham,
>>>>>>> It looks like the Javadoc for ByteArrayDeserializer and other
>>>>>>> Deserializers can return null[1, 2] and we aren't using
>>>>>>> NullableCoder.of(ByteArrayCoder.of()) in the expansion[3]. Note that the
>>>>>>> non XLang KafkaIO does this correctly in its regular coder inference
>>>>>>> logic[4]. I flied BEAM-10529[5]
>>>>>>>
>>>>>>> 1:
>>>>>>> https://kafka.apache.org/21/javadoc/org/apache/kafka/common/serialization/ByteArrayDeserializer.html#deserialize-java.lang.String-byte:A-
>>>>>>> 2:
>>>>>>> https://kafka.apache.org/21/javadoc/org/apache/kafka/common/serialization/StringDeserializer.html#deserialize-java.lang.String-byte:A-
>>>>>>> 3:
>>>>>>> https://github.com/apache/beam/blob/af2d6b0379d64b522ecb769d88e9e7e7b8900208/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L478
>>>>>>> 4:
>>>>>>> https://github.com/apache/beam/blob/af2d6b0379d64b522ecb769d88e9e7e7b8900208/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/LocalDeserializerProvider.java#L85
>>>>>>> 5: https://issues.apache.org/jira/browse/BEAM-10529
>>>>>>>
>>>>>>>
>>>>>>> On Fri, Jul 17, 2020 at 8:51 AM ayush sharma <1705ay...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi,
>>>>>>>>
>>>>>>>> I am trying to build a streaming beam pipeline in python which
>>>>>>>> should capture messages from kafka and then execute further stages of 
>>>>>>>> data
>>>>>>>> fetching from other sources and aggregation. The step-by-step process 
>>>>>>>> of
>>>>>>>> what I have built till now is:
>>>>>>>>
>>>>>>>>    1.
>>>>>>>>
>>>>>>>>    Running Kafka instance on localhost:9092
>>>>>>>>
>>>>>>>>    ./bin/kafka-server-start.sh ./config/server.properties
>>>>>>>>    2.
>>>>>>>>
>>>>>>>>    Run beam-flink job server using docker
>>>>>>>>
>>>>>>>>    docker run --net=host apache/beam_flink1.10_job_server:latest
>>>>>>>>    3.
>>>>>>>>
>>>>>>>>    Run beam-kafka pipeline
>>>>>>>>
>>>>>>>> import apache_beam as beamfrom apache_beam.io.external.kafka import 
>>>>>>>> ReadFromKafka, WriteToKafkafrom apache_beam.options.pipeline_options 
>>>>>>>> import PipelineOptions, StandardOptions
>>>>>>>>
>>>>>>>> if __name__ == '__main__':
>>>>>>>>     options = PipelineOptions([
>>>>>>>>         "--job_endpoint=localhost:8099",
>>>>>>>>         "--environment_type=LOOPBACK",
>>>>>>>>         "--streaming",
>>>>>>>>         "--environment_config={\"command\":\"/opt/apache/beam/boot\"}",
>>>>>>>>     ])
>>>>>>>>
>>>>>>>>     options = options.view_as(StandardOptions)
>>>>>>>>     options.streaming = True
>>>>>>>>
>>>>>>>>     pipeline = beam.Pipeline(options=options)
>>>>>>>>
>>>>>>>>     result = (
>>>>>>>>         pipeline
>>>>>>>>
>>>>>>>>         | "Read from kafka" >> ReadFromKafka(
>>>>>>>>             consumer_config={
>>>>>>>>                 "bootstrap.servers": 'localhost:9092',
>>>>>>>>             },
>>>>>>>>             topics=['mytopic'],
>>>>>>>>             expansion_service='localhost:8097',
>>>>>>>>         )
>>>>>>>>
>>>>>>>>         | beam.Map(print)
>>>>>>>>     )
>>>>>>>>
>>>>>>>>     pipeline.run()
>>>>>>>>
>>>>>>>>
>>>>>>>>    1. Publish new message using kafka-producer.sh
>>>>>>>>
>>>>>>>> ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic 
>>>>>>>> mytopic>tryme
>>>>>>>>
>>>>>>>> After publishing this trial message, the beam pipeline perceives
>>>>>>>> the message but crashes giving this error:
>>>>>>>>
>>>>>>>> RuntimeError: org.apache.beam.sdk.util.UserCodeException: 
>>>>>>>> org.apache.beam.sdk.coders.CoderException: cannot encode a null byte[]
>>>>>>>>     at 
>>>>>>>> org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:36)
>>>>>>>>     at 
>>>>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1014)
>>>>>>>>     at 
>>>>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner.access$1500(FnApiDoFnRunner.java:132)
>>>>>>>>     at 
>>>>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:1483)
>>>>>>>>     at 
>>>>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.output(FnApiDoFnRunner.java:1478)
>>>>>>>>     at 
>>>>>>>> org.apache.beam.sdk.io.kafka.KafkaIO$TypedWithoutMetadata$1.processElement(KafkaIO.java:1042)
>>>>>>>>     at 
>>>>>>>> org.apache.beam.sdk.io.kafka.KafkaIO$TypedWithoutMetadata$1$DoFnInvoker.invokeProcessElement(Unknown
>>>>>>>>  Source)
>>>>>>>>     at 
>>>>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:740)
>>>>>>>>     at 
>>>>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner.access$700(FnApiDoFnRunner.java:132)
>>>>>>>>     at 
>>>>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner$Factory.lambda$createRunnerForPTransform$1(FnApiDoFnRunner.java:203)
>>>>>>>>     at org.apache.beam
>>>>>>>>
>>>>>>>> Regards,
>>>>>>>>
>>>>>>>> Ayush Sharma.
>>>>>>>>
>>>>>>>>

Reply via email to