Thank you guys for the reply. I am really stuck and could not proceed further. Yes, the previous trial published message had null key. But when I send key:value pair through producer using
./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic mytopic --property *"parse.key=true" --property "key.separator=:"* > tryKey:tryValue I do not get any error but beam does not print the received message. Here is how my pipeline looks like, result = ( pipeline | "Read from kafka" >> ReadFromKafka( consumer_config={ "bootstrap.servers": 'localhost:9092', }, topics=['mytopic'], expansion_service='localhost:8097', | "print" >> beam.Map(print) ) If this is not the way we make beam and kafka communicate then please share a working example which showcases how a message published in kafka gets received by beam while streaming. Regards, Ayush Sharma On Fri, Jul 17, 2020 at 11:39 PM Chamikara Jayalath <chamik...@google.com> wrote: > Yes, seems like this is due to the key being null. XLang KafkaIO has to be > updated to support this. You should not run into this error if you publish > keys and values that are not null. > > > > > On Fri, Jul 17, 2020 at 8:04 PM Luke Cwik <lc...@google.com> wrote: > >> +dev <d...@beam.apache.org> >> >> On Fri, Jul 17, 2020 at 8:03 PM Luke Cwik <lc...@google.com> wrote: >> >>> +Heejong Lee <heej...@google.com> +Chamikara Jayalath >>> <chamik...@google.com> >>> >>> Do you know if your trial record has an empty key or value? >>> If so, then you hit a bug and it seems as though there was a miss >>> supporting this usecase. >>> >>> Heejong and Cham, >>> It looks like the Javadoc for ByteArrayDeserializer and other >>> Deserializers can return null[1, 2] and we aren't using >>> NullableCoder.of(ByteArrayCoder.of()) in the expansion[3]. Note that the >>> non XLang KafkaIO does this correctly in its regular coder inference >>> logic[4]. I flied BEAM-10529[5] >>> >>> 1: >>> https://kafka.apache.org/21/javadoc/org/apache/kafka/common/serialization/ByteArrayDeserializer.html#deserialize-java.lang.String-byte:A- >>> 2: >>> https://kafka.apache.org/21/javadoc/org/apache/kafka/common/serialization/StringDeserializer.html#deserialize-java.lang.String-byte:A- >>> 3: >>> https://github.com/apache/beam/blob/af2d6b0379d64b522ecb769d88e9e7e7b8900208/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L478 >>> 4: >>> https://github.com/apache/beam/blob/af2d6b0379d64b522ecb769d88e9e7e7b8900208/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/LocalDeserializerProvider.java#L85 >>> 5: https://issues.apache.org/jira/browse/BEAM-10529 >>> >>> >>> On Fri, Jul 17, 2020 at 8:51 AM ayush sharma <1705ay...@gmail.com> >>> wrote: >>> >>>> Hi, >>>> >>>> I am trying to build a streaming beam pipeline in python which should >>>> capture messages from kafka and then execute further stages of data >>>> fetching from other sources and aggregation. The step-by-step process of >>>> what I have built till now is: >>>> >>>> 1. >>>> >>>> Running Kafka instance on localhost:9092 >>>> >>>> ./bin/kafka-server-start.sh ./config/server.properties >>>> 2. >>>> >>>> Run beam-flink job server using docker >>>> >>>> docker run --net=host apache/beam_flink1.10_job_server:latest >>>> 3. >>>> >>>> Run beam-kafka pipeline >>>> >>>> import apache_beam as beamfrom apache_beam.io.external.kafka import >>>> ReadFromKafka, WriteToKafkafrom apache_beam.options.pipeline_options >>>> import PipelineOptions, StandardOptions >>>> >>>> if __name__ == '__main__': >>>> options = PipelineOptions([ >>>> "--job_endpoint=localhost:8099", >>>> "--environment_type=LOOPBACK", >>>> "--streaming", >>>> "--environment_config={\"command\":\"/opt/apache/beam/boot\"}", >>>> ]) >>>> >>>> options = options.view_as(StandardOptions) >>>> options.streaming = True >>>> >>>> pipeline = beam.Pipeline(options=options) >>>> >>>> result = ( >>>> pipeline >>>> >>>> | "Read from kafka" >> ReadFromKafka( >>>> consumer_config={ >>>> "bootstrap.servers": 'localhost:9092', >>>> }, >>>> topics=['mytopic'], >>>> expansion_service='localhost:8097', >>>> ) >>>> >>>> | beam.Map(print) >>>> ) >>>> >>>> pipeline.run() >>>> >>>> >>>> 1. Publish new message using kafka-producer.sh >>>> >>>> ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic >>>> mytopic>tryme >>>> >>>> After publishing this trial message, the beam pipeline perceives the >>>> message but crashes giving this error: >>>> >>>> RuntimeError: org.apache.beam.sdk.util.UserCodeException: >>>> org.apache.beam.sdk.coders.CoderException: cannot encode a null byte[] >>>> at >>>> org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:36) >>>> at >>>> org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1014) >>>> at >>>> org.apache.beam.fn.harness.FnApiDoFnRunner.access$1500(FnApiDoFnRunner.java:132) >>>> at >>>> org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:1483) >>>> at >>>> org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.output(FnApiDoFnRunner.java:1478) >>>> at >>>> org.apache.beam.sdk.io.kafka.KafkaIO$TypedWithoutMetadata$1.processElement(KafkaIO.java:1042) >>>> at >>>> org.apache.beam.sdk.io.kafka.KafkaIO$TypedWithoutMetadata$1$DoFnInvoker.invokeProcessElement(Unknown >>>> Source) >>>> at >>>> org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:740) >>>> at >>>> org.apache.beam.fn.harness.FnApiDoFnRunner.access$700(FnApiDoFnRunner.java:132) >>>> at >>>> org.apache.beam.fn.harness.FnApiDoFnRunner$Factory.lambda$createRunnerForPTransform$1(FnApiDoFnRunner.java:203) >>>> at org.apache.beam >>>> >>>> Regards, >>>> >>>> Ayush Sharma. >>>> >>>>