Thank you guys for the reply. I am really stuck and could not proceed
Yes, the previous trial published message had null key.
But when I send key:value pair through producer using

./bin/ --broker-list localhost:9092 --topic
mytopic --property *"parse.key=true" --property "key.separator=:"*
> tryKey:tryValue

I do not get any error but beam does not print the received message. Here
is how my pipeline looks like,
result = (

        | "Read from kafka" >> ReadFromKafka(
                "bootstrap.servers": 'localhost:9092',

        | "print" >> beam.Map(print)

If this is not the way we make beam and kafka communicate then please share
a working example which showcases how a message published in kafka gets
received by beam while streaming.

Ayush Sharma

On Fri, Jul 17, 2020 at 11:39 PM Chamikara Jayalath <>

> Yes, seems like this is due to the key being null. XLang KafkaIO has to be
> updated to support this. You should not run into this error if you publish
> keys and values that are not null.
> On Fri, Jul 17, 2020 at 8:04 PM Luke Cwik <> wrote:
>> +dev <>
>> On Fri, Jul 17, 2020 at 8:03 PM Luke Cwik <> wrote:
>>> +Heejong Lee <> +Chamikara Jayalath
>>> <>
>>> Do you know if your trial record has an empty key or value?
>>> If so, then you hit a bug and it seems as though there was a miss
>>> supporting this usecase.
>>> Heejong and Cham,
>>> It looks like the Javadoc for ByteArrayDeserializer and other
>>> Deserializers can return null[1, 2] and we aren't using
>>> NullableCoder.of(ByteArrayCoder.of()) in the expansion[3]. Note that the
>>> non XLang KafkaIO does this correctly in its regular coder inference
>>> logic[4]. I flied BEAM-10529[5]
>>> 1:
>>> 2:
>>> 3:
>>> 4:
>>> 5:
>>> On Fri, Jul 17, 2020 at 8:51 AM ayush sharma <>
>>> wrote:
>>>> Hi,
>>>> I am trying to build a streaming beam pipeline in python which should
>>>> capture messages from kafka and then execute further stages of data
>>>> fetching from other sources and aggregation. The step-by-step process of
>>>> what I have built till now is:
>>>>    1.
>>>>    Running Kafka instance on localhost:9092
>>>>    ./bin/ ./config/
>>>>    2.
>>>>    Run beam-flink job server using docker
>>>>    docker run --net=host apache/beam_flink1.10_job_server:latest
>>>>    3.
>>>>    Run beam-kafka pipeline
>>>> import apache_beam as beamfrom import 
>>>> ReadFromKafka, WriteToKafkafrom apache_beam.options.pipeline_options 
>>>> import PipelineOptions, StandardOptions
>>>> if __name__ == '__main__':
>>>>     options = PipelineOptions([
>>>>         "--job_endpoint=localhost:8099",
>>>>         "--environment_type=LOOPBACK",
>>>>         "--streaming",
>>>>         "--environment_config={\"command\":\"/opt/apache/beam/boot\"}",
>>>>     ])
>>>>     options = options.view_as(StandardOptions)
>>>>     options.streaming = True
>>>>     pipeline = beam.Pipeline(options=options)
>>>>     result = (
>>>>         pipeline
>>>>         | "Read from kafka" >> ReadFromKafka(
>>>>             consumer_config={
>>>>                 "bootstrap.servers": 'localhost:9092',
>>>>             },
>>>>             topics=['mytopic'],
>>>>             expansion_service='localhost:8097',
>>>>         )
>>>>         | beam.Map(print)
>>>>     )
>>>>    1. Publish new message using
>>>> ./bin/ --broker-list localhost:9092 --topic 
>>>> mytopic>tryme
>>>> After publishing this trial message, the beam pipeline perceives the
>>>> message but crashes giving this error:
>>>> RuntimeError: org.apache.beam.sdk.util.UserCodeException: 
>>>> org.apache.beam.sdk.coders.CoderException: cannot encode a null byte[]
>>>>     at 
>>>> org.apache.beam.sdk.util.UserCodeException.wrap(
>>>>     at 
>>>> org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(
>>>>     at 
>>>> org.apache.beam.fn.harness.FnApiDoFnRunner.access$1500(
>>>>     at 
>>>> org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.outputWithTimestamp(
>>>>     at 
>>>> org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.output(
>>>>     at 
>>>>     at 
>>>>  Source)
>>>>     at 
>>>> org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(
>>>>     at 
>>>> org.apache.beam.fn.harness.FnApiDoFnRunner.access$700(
>>>>     at 
>>>> org.apache.beam.fn.harness.FnApiDoFnRunner$Factory.lambda$createRunnerForPTransform$1(
>>>>     at org.apache.beam
>>>> Regards,
>>>> Ayush Sharma.

Reply via email to