On Fri, Mar 26, 2021 at 12:11 AM yilun zhang <ilyak1...@gmail.com> wrote:

> Hey,
>
> We are using Docker mode to run Kafka with beam python SDK and encounter
> an error:
>
> Caused by: org.apache.beam.sdk.coders.CoderException: cannot encode a null
> byte[]
> at org.apache.beam.sdk.coders.ByteArrayCoder.encode(ByteArrayCoder.java:63)
> at org.apache.beam.sdk.coders.ByteArrayCoder.encode(ByteArrayCoder.java:56)
> at org.apache.beam.sdk.coders.ByteArrayCoder.encode(ByteArrayCoder.java:41)
> at org.apache.beam.sdk.coders.KvCoder.encode(KvCoder.java:72)
> at org.apache.beam.sdk.coders.KvCoder.encode(KvCoder.java:37)
> at
> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:591)
> at
> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:582)
> at
> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:542)
> at
> org.apache.beam.sdk.fn.data.BeamFnDataSizeBasedBufferingOutboundObserver.accept(BeamFnDataSizeBasedBufferingOutboundObserver.java:112)
> at
> org.apache.beam.fn.harness.BeamFnDataWriteRunner.consume(BeamFnDataWriteRunner.java:180)
> at
> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:222)
> at
> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:185)
>
>
>
> The simple code we have:
> from __future__ import absolute_import
>
> import logging
>
> import apache_beam as beam
> from apache_beam.io.kafka import ReadFromKafka,WriteToKafka
> from apache_beam.options.pipeline_options import PipelineOptions
>
>
> def run_pipeline():
>     def log_i(element):
>         logging.info(element)
>
>
>     with beam.Pipeline(options=PipelineOptions()) as p:
>         (p
>         | 'read from kafka' >> ReadFromKafka(
>             consumer_config={'bootstrap.servers': 'x.x.x.x:9092'},
>             topics=['gggg'])
>         | "print" >> beam.FlatMap(lambda kv: log_i(kv[1]))
>         )
>
> if __name__ == '__main__':
>   logging.getLogger().setLevel(logging.INFO)
>   run_pipeline()
>
>
> The step we do:
> python -m sample --runner=FlinkRunner --flink_version 1.12
> --flink_master=${FLINK_MASTER_URL} --environment_type=DOCKER
> --save_main_session --flink_submit_uber_jar
>
> When the job is up, we manually push word through kafka cli:
>  ./kafka-console-producer.sh --broker-list x.x.x.x:9092 --topic gggg
> --property *"parse.key=true" --property "key.separator=:"*
> >tryKey:tryValue
> >kkey:vvalue
>
> After doing some google search, looks like this error may result from
> wrong kafka input(or may not?)? Is there any suggestion on what exactly the
> format we should put in Kafka? and is there any way we could handle JSON
> data?
>

This is a known issue tracked by
https://issues.apache.org/jira/browse/BEAM-10529. Currently you cannot use
null values for keys when using Python Kafka source. The way you push KVs
above should not result in null keys but could it be that the Kafka topic
already has some values with null keys in the queue resulting in the
failure of the program ?


>
> Also when we try to write to kafka, the error code is Unknown Coder URN
> beam:coder:pickled_python:v1. Is there any suggestion for this one?
>

Do you have a pipeline that would reproduce this ?


>
> Thanks,
> Yilun
>

Reply via email to