+Heejong Lee <heej...@google.com> +Chamikara Jayalath <chamik...@google.com>
Do you know if your trial record has an empty key or value? If so, then you hit a bug and it seems as though there was a miss supporting this usecase. Heejong and Cham, It looks like the Javadoc for ByteArrayDeserializer and other Deserializers can return null[1, 2] and we aren't using NullableCoder.of(ByteArrayCoder.of()) in the expansion[3]. Note that the non XLang KafkaIO does this correctly in its regular coder inference logic[4]. I flied BEAM-10529[5] 1: https://kafka.apache.org/21/javadoc/org/apache/kafka/common/serialization/ByteArrayDeserializer.html#deserialize-java.lang.String-byte:A- 2: https://kafka.apache.org/21/javadoc/org/apache/kafka/common/serialization/StringDeserializer.html#deserialize-java.lang.String-byte:A- 3: https://github.com/apache/beam/blob/af2d6b0379d64b522ecb769d88e9e7e7b8900208/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L478 4: https://github.com/apache/beam/blob/af2d6b0379d64b522ecb769d88e9e7e7b8900208/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/LocalDeserializerProvider.java#L85 5: https://issues.apache.org/jira/browse/BEAM-10529 On Fri, Jul 17, 2020 at 8:51 AM ayush sharma <1705ay...@gmail.com> wrote: > Hi, > > I am trying to build a streaming beam pipeline in python which should > capture messages from kafka and then execute further stages of data > fetching from other sources and aggregation. The step-by-step process of > what I have built till now is: > > 1. > > Running Kafka instance on localhost:9092 > > ./bin/kafka-server-start.sh ./config/server.properties > 2. > > Run beam-flink job server using docker > > docker run --net=host apache/beam_flink1.10_job_server:latest > 3. > > Run beam-kafka pipeline > > import apache_beam as beamfrom apache_beam.io.external.kafka import > ReadFromKafka, WriteToKafkafrom apache_beam.options.pipeline_options import > PipelineOptions, StandardOptions > > if __name__ == '__main__': > options = PipelineOptions([ > "--job_endpoint=localhost:8099", > "--environment_type=LOOPBACK", > "--streaming", > "--environment_config={\"command\":\"/opt/apache/beam/boot\"}", > ]) > > options = options.view_as(StandardOptions) > options.streaming = True > > pipeline = beam.Pipeline(options=options) > > result = ( > pipeline > > | "Read from kafka" >> ReadFromKafka( > consumer_config={ > "bootstrap.servers": 'localhost:9092', > }, > topics=['mytopic'], > expansion_service='localhost:8097', > ) > > | beam.Map(print) > ) > > pipeline.run() > > > 1. Publish new message using kafka-producer.sh > > ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic > mytopic>tryme > > After publishing this trial message, the beam pipeline perceives the > message but crashes giving this error: > > RuntimeError: org.apache.beam.sdk.util.UserCodeException: > org.apache.beam.sdk.coders.CoderException: cannot encode a null byte[] > at > org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:36) > at > org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1014) > at > org.apache.beam.fn.harness.FnApiDoFnRunner.access$1500(FnApiDoFnRunner.java:132) > at > org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:1483) > at > org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.output(FnApiDoFnRunner.java:1478) > at > org.apache.beam.sdk.io.kafka.KafkaIO$TypedWithoutMetadata$1.processElement(KafkaIO.java:1042) > at > org.apache.beam.sdk.io.kafka.KafkaIO$TypedWithoutMetadata$1$DoFnInvoker.invokeProcessElement(Unknown > Source) > at > org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:740) > at > org.apache.beam.fn.harness.FnApiDoFnRunner.access$700(FnApiDoFnRunner.java:132) > at > org.apache.beam.fn.harness.FnApiDoFnRunner$Factory.lambda$createRunnerForPTransform$1(FnApiDoFnRunner.java:203) > at org.apache.beam > > Regards, > > Ayush Sharma. > >