Hi,

I am trying to build a streaming beam pipeline in python which should
capture messages from kafka and then execute further stages of data
fetching from other sources and aggregation. The step-by-step process of
what I have built till now is:

   1.

   Running Kafka instance on localhost:9092

   ./bin/kafka-server-start.sh ./config/server.properties
   2.

   Run beam-flink job server using docker

   docker run --net=host apache/beam_flink1.10_job_server:latest
   3.

   Run beam-kafka pipeline

import apache_beam as beamfrom apache_beam.io.external.kafka import
ReadFromKafka, WriteToKafkafrom apache_beam.options.pipeline_options
import PipelineOptions, StandardOptions

if __name__ == '__main__':
    options = PipelineOptions([
        "--job_endpoint=localhost:8099",
        "--environment_type=LOOPBACK",
        "--streaming",
        "--environment_config={\"command\":\"/opt/apache/beam/boot\"}",
    ])

    options = options.view_as(StandardOptions)
    options.streaming = True

    pipeline = beam.Pipeline(options=options)

    result = (
        pipeline

        | "Read from kafka" >> ReadFromKafka(
            consumer_config={
                "bootstrap.servers": 'localhost:9092',
            },
            topics=['mytopic'],
            expansion_service='localhost:8097',
        )

        | beam.Map(print)
    )

    pipeline.run()


   1. Publish new message using kafka-producer.sh

./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic
mytopic>tryme

After publishing this trial message, the beam pipeline perceives the
message but crashes giving this error:

RuntimeError: org.apache.beam.sdk.util.UserCodeException:
org.apache.beam.sdk.coders.CoderException: cannot encode a null byte[]
    at 
org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:36)
    at 
org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1014)
    at 
org.apache.beam.fn.harness.FnApiDoFnRunner.access$1500(FnApiDoFnRunner.java:132)
    at 
org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:1483)
    at 
org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.output(FnApiDoFnRunner.java:1478)
    at 
org.apache.beam.sdk.io.kafka.KafkaIO$TypedWithoutMetadata$1.processElement(KafkaIO.java:1042)
    at 
org.apache.beam.sdk.io.kafka.KafkaIO$TypedWithoutMetadata$1$DoFnInvoker.invokeProcessElement(Unknown
Source)
    at 
org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:740)
    at 
org.apache.beam.fn.harness.FnApiDoFnRunner.access$700(FnApiDoFnRunner.java:132)
    at 
org.apache.beam.fn.harness.FnApiDoFnRunner$Factory.lambda$createRunnerForPTransform$1(FnApiDoFnRunner.java:203)
    at org.apache.beam

Regards,

Ayush Sharma.

Reply via email to