Hey,

We are using Docker mode to run Kafka with beam python SDK and encounter an
error:

Caused by: org.apache.beam.sdk.coders.CoderException: cannot encode a null
byte[]
at org.apache.beam.sdk.coders.ByteArrayCoder.encode(ByteArrayCoder.java:63)
at org.apache.beam.sdk.coders.ByteArrayCoder.encode(ByteArrayCoder.java:56)
at org.apache.beam.sdk.coders.ByteArrayCoder.encode(ByteArrayCoder.java:41)
at org.apache.beam.sdk.coders.KvCoder.encode(KvCoder.java:72)
at org.apache.beam.sdk.coders.KvCoder.encode(KvCoder.java:37)
at
org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:591)
at
org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:582)
at
org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:542)
at
org.apache.beam.sdk.fn.data.BeamFnDataSizeBasedBufferingOutboundObserver.accept(BeamFnDataSizeBasedBufferingOutboundObserver.java:112)
at
org.apache.beam.fn.harness.BeamFnDataWriteRunner.consume(BeamFnDataWriteRunner.java:180)
at
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:222)
at
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:185)



The simple code we have:
from __future__ import absolute_import

import logging

import apache_beam as beam
from apache_beam.io.kafka import ReadFromKafka,WriteToKafka
from apache_beam.options.pipeline_options import PipelineOptions


def run_pipeline():
    def log_i(element):
        logging.info(element)


    with beam.Pipeline(options=PipelineOptions()) as p:
        (p
        | 'read from kafka' >> ReadFromKafka(
            consumer_config={'bootstrap.servers': 'x.x.x.x:9092'},
            topics=['gggg'])
        | "print" >> beam.FlatMap(lambda kv: log_i(kv[1]))
        )

if __name__ == '__main__':
  logging.getLogger().setLevel(logging.INFO)
  run_pipeline()


The step we do:
python -m sample --runner=FlinkRunner --flink_version 1.12
--flink_master=${FLINK_MASTER_URL} --environment_type=DOCKER
--save_main_session --flink_submit_uber_jar

When the job is up, we manually push word through kafka cli:
 ./kafka-console-producer.sh --broker-list x.x.x.x:9092 --topic gggg
--property *"parse.key=true" --property "key.separator=:"*
>tryKey:tryValue
>kkey:vvalue

After doing some google search, looks like this error may result from wrong
kafka input(or may not?)? Is there any suggestion on what exactly the
format we should put in Kafka? and is there any way we could handle JSON
data?

Also when we try to write to kafka, the error code is Unknown Coder URN
beam:coder:pickled_python:v1. Is there any suggestion for this one?

Thanks,
Yilun

Reply via email to