Yes, seems like this is due to the key being null. XLang KafkaIO has to be
updated to support this. You should not run into this error if you publish
keys and values that are not null.




On Fri, Jul 17, 2020 at 8:04 PM Luke Cwik <lc...@google.com> wrote:

> +dev <d...@beam.apache.org>
>
> On Fri, Jul 17, 2020 at 8:03 PM Luke Cwik <lc...@google.com> wrote:
>
>> +Heejong Lee <heej...@google.com> +Chamikara Jayalath
>> <chamik...@google.com>
>>
>> Do you know if your trial record has an empty key or value?
>> If so, then you hit a bug and it seems as though there was a miss
>> supporting this usecase.
>>
>> Heejong and Cham,
>> It looks like the Javadoc for ByteArrayDeserializer and other
>> Deserializers can return null[1, 2] and we aren't using
>> NullableCoder.of(ByteArrayCoder.of()) in the expansion[3]. Note that the
>> non XLang KafkaIO does this correctly in its regular coder inference
>> logic[4]. I flied BEAM-10529[5]
>>
>> 1:
>> https://kafka.apache.org/21/javadoc/org/apache/kafka/common/serialization/ByteArrayDeserializer.html#deserialize-java.lang.String-byte:A-
>> 2:
>> https://kafka.apache.org/21/javadoc/org/apache/kafka/common/serialization/StringDeserializer.html#deserialize-java.lang.String-byte:A-
>> 3:
>> https://github.com/apache/beam/blob/af2d6b0379d64b522ecb769d88e9e7e7b8900208/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L478
>> 4:
>> https://github.com/apache/beam/blob/af2d6b0379d64b522ecb769d88e9e7e7b8900208/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/LocalDeserializerProvider.java#L85
>> 5: https://issues.apache.org/jira/browse/BEAM-10529
>>
>>
>> On Fri, Jul 17, 2020 at 8:51 AM ayush sharma <1705ay...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> I am trying to build a streaming beam pipeline in python which should
>>> capture messages from kafka and then execute further stages of data
>>> fetching from other sources and aggregation. The step-by-step process of
>>> what I have built till now is:
>>>
>>>    1.
>>>
>>>    Running Kafka instance on localhost:9092
>>>
>>>    ./bin/kafka-server-start.sh ./config/server.properties
>>>    2.
>>>
>>>    Run beam-flink job server using docker
>>>
>>>    docker run --net=host apache/beam_flink1.10_job_server:latest
>>>    3.
>>>
>>>    Run beam-kafka pipeline
>>>
>>> import apache_beam as beamfrom apache_beam.io.external.kafka import 
>>> ReadFromKafka, WriteToKafkafrom apache_beam.options.pipeline_options import 
>>> PipelineOptions, StandardOptions
>>>
>>> if __name__ == '__main__':
>>>     options = PipelineOptions([
>>>         "--job_endpoint=localhost:8099",
>>>         "--environment_type=LOOPBACK",
>>>         "--streaming",
>>>         "--environment_config={\"command\":\"/opt/apache/beam/boot\"}",
>>>     ])
>>>
>>>     options = options.view_as(StandardOptions)
>>>     options.streaming = True
>>>
>>>     pipeline = beam.Pipeline(options=options)
>>>
>>>     result = (
>>>         pipeline
>>>
>>>         | "Read from kafka" >> ReadFromKafka(
>>>             consumer_config={
>>>                 "bootstrap.servers": 'localhost:9092',
>>>             },
>>>             topics=['mytopic'],
>>>             expansion_service='localhost:8097',
>>>         )
>>>
>>>         | beam.Map(print)
>>>     )
>>>
>>>     pipeline.run()
>>>
>>>
>>>    1. Publish new message using kafka-producer.sh
>>>
>>> ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic 
>>> mytopic>tryme
>>>
>>> After publishing this trial message, the beam pipeline perceives the
>>> message but crashes giving this error:
>>>
>>> RuntimeError: org.apache.beam.sdk.util.UserCodeException: 
>>> org.apache.beam.sdk.coders.CoderException: cannot encode a null byte[]
>>>     at 
>>> org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:36)
>>>     at 
>>> org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1014)
>>>     at 
>>> org.apache.beam.fn.harness.FnApiDoFnRunner.access$1500(FnApiDoFnRunner.java:132)
>>>     at 
>>> org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:1483)
>>>     at 
>>> org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.output(FnApiDoFnRunner.java:1478)
>>>     at 
>>> org.apache.beam.sdk.io.kafka.KafkaIO$TypedWithoutMetadata$1.processElement(KafkaIO.java:1042)
>>>     at 
>>> org.apache.beam.sdk.io.kafka.KafkaIO$TypedWithoutMetadata$1$DoFnInvoker.invokeProcessElement(Unknown
>>>  Source)
>>>     at 
>>> org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:740)
>>>     at 
>>> org.apache.beam.fn.harness.FnApiDoFnRunner.access$700(FnApiDoFnRunner.java:132)
>>>     at 
>>> org.apache.beam.fn.harness.FnApiDoFnRunner$Factory.lambda$createRunnerForPTransform$1(FnApiDoFnRunner.java:203)
>>>     at org.apache.beam
>>>
>>> Regards,
>>>
>>> Ayush Sharma.
>>>
>>>

Reply via email to