Yeah, this is a known issue. According to +Boyuan Zhang <boyu...@google.com>'s comment in the bug you should still be able to read as long as Kafka cluster is set up to auto-commit though and these errors can be safely ignored. For example, you can set "enable.auto.commit" to "true" in the consumer config passed to ReadFromKafka. I haven't tried this myself though so please comment in the JIRA if this is a true blocker for you for reading from Kafka which will help us to identify the true priority of this JIRA.
Thanks, Cham On Mon, Jul 20, 2020 at 2:40 PM ayush sharma <1705ay...@gmail.com> wrote: > Is there any workaround to this issue? > > On Mon, Jul 20, 2020 at 5:33 PM ayush sharma <1705ay...@gmail.com> wrote: > >> Thank you for the suggestions. I tried using FlinkRunner and >> setting environment_type either to DOCKER or LOOPBACK gives an error - >> java.lang.UnsupportedOperationException: The ActiveBundle does not have a >> registered bundle checkpoint handler. >> >> I found that this issue has been reported ( >> https://issues.apache.org/jira/browse/BEAM-6868) and hence upvoting it. >> Thank you for the prompt responses and looking forward to using this >> feature in the future. >> >> Regards, >> Ayush. >> >> On Sat, Jul 18, 2020 at 3:14 PM Chamikara Jayalath <chamik...@google.com> >> wrote: >> >>> >>> >>> On Fri, Jul 17, 2020 at 10:04 PM ayush sharma <1705ay...@gmail.com> >>> wrote: >>> >>>> Thank you guys for the reply. I am really stuck and could not proceed >>>> further. >>>> Yes, the previous trial published message had null key. >>>> But when I send key:value pair through producer using >>>> >>>> ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic >>>> mytopic --property *"parse.key=true" --property "key.separator=:"* >>>> > tryKey:tryValue >>>> >>>> I do not get any error but beam does not print the received message. >>>> Here is how my pipeline looks like, >>>> result = ( >>>> pipeline >>>> >>>> | "Read from kafka" >> ReadFromKafka( >>>> consumer_config={ >>>> "bootstrap.servers": 'localhost:9092', >>>> }, >>>> topics=['mytopic'], >>>> expansion_service='localhost:8097', >>>> >>>> | "print" >> beam.Map(print) >>>> ) >>>> >>>> >>> I suspect DirectRunner in LOOPBACK mode might not be working for >>> cross-language transforms today. Please note that cross-language transforms >>> framework is fairly new [1] and we are adding support for various runners >>> and environment configurations. >>> Can you try with Flink in DOCKER mode ? >>> >>> >>>> If this is not the way we make beam and kafka communicate then please >>>> share a working example which showcases how a message published in kafka >>>> gets received by beam while streaming. >>>> >>> >>> I'm adding an example but I've only tested this with Dataflow yet. I >>> hope to test that example for more runners and add additional instructions >>> there. >>> https://github.com/apache/beam/pull/12188 >>> >>> Thanks, >>> Cham >>> >>> [1] https://beam.apache.org/roadmap/connectors-multi-sdk/ >>> >>>> >>>> Regards, >>>> Ayush Sharma >>>> >>>> On Fri, Jul 17, 2020 at 11:39 PM Chamikara Jayalath < >>>> chamik...@google.com> wrote: >>>> >>>>> Yes, seems like this is due to the key being null. XLang KafkaIO has >>>>> to be updated to support this. You should not run into this error if you >>>>> publish keys and values that are not null. >>>>> >>>>> >>>>> >>>>> >>>>> On Fri, Jul 17, 2020 at 8:04 PM Luke Cwik <lc...@google.com> wrote: >>>>> >>>>>> +dev <d...@beam.apache.org> >>>>>> >>>>>> On Fri, Jul 17, 2020 at 8:03 PM Luke Cwik <lc...@google.com> wrote: >>>>>> >>>>>>> +Heejong Lee <heej...@google.com> +Chamikara Jayalath >>>>>>> <chamik...@google.com> >>>>>>> >>>>>>> Do you know if your trial record has an empty key or value? >>>>>>> If so, then you hit a bug and it seems as though there was a miss >>>>>>> supporting this usecase. >>>>>>> >>>>>>> Heejong and Cham, >>>>>>> It looks like the Javadoc for ByteArrayDeserializer and other >>>>>>> Deserializers can return null[1, 2] and we aren't using >>>>>>> NullableCoder.of(ByteArrayCoder.of()) in the expansion[3]. Note that the >>>>>>> non XLang KafkaIO does this correctly in its regular coder inference >>>>>>> logic[4]. I flied BEAM-10529[5] >>>>>>> >>>>>>> 1: >>>>>>> https://kafka.apache.org/21/javadoc/org/apache/kafka/common/serialization/ByteArrayDeserializer.html#deserialize-java.lang.String-byte:A- >>>>>>> 2: >>>>>>> https://kafka.apache.org/21/javadoc/org/apache/kafka/common/serialization/StringDeserializer.html#deserialize-java.lang.String-byte:A- >>>>>>> 3: >>>>>>> https://github.com/apache/beam/blob/af2d6b0379d64b522ecb769d88e9e7e7b8900208/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L478 >>>>>>> 4: >>>>>>> https://github.com/apache/beam/blob/af2d6b0379d64b522ecb769d88e9e7e7b8900208/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/LocalDeserializerProvider.java#L85 >>>>>>> 5: https://issues.apache.org/jira/browse/BEAM-10529 >>>>>>> >>>>>>> >>>>>>> On Fri, Jul 17, 2020 at 8:51 AM ayush sharma <1705ay...@gmail.com> >>>>>>> wrote: >>>>>>> >>>>>>>> Hi, >>>>>>>> >>>>>>>> I am trying to build a streaming beam pipeline in python which >>>>>>>> should capture messages from kafka and then execute further stages of >>>>>>>> data >>>>>>>> fetching from other sources and aggregation. The step-by-step process >>>>>>>> of >>>>>>>> what I have built till now is: >>>>>>>> >>>>>>>> 1. >>>>>>>> >>>>>>>> Running Kafka instance on localhost:9092 >>>>>>>> >>>>>>>> ./bin/kafka-server-start.sh ./config/server.properties >>>>>>>> 2. >>>>>>>> >>>>>>>> Run beam-flink job server using docker >>>>>>>> >>>>>>>> docker run --net=host apache/beam_flink1.10_job_server:latest >>>>>>>> 3. >>>>>>>> >>>>>>>> Run beam-kafka pipeline >>>>>>>> >>>>>>>> import apache_beam as beamfrom apache_beam.io.external.kafka import >>>>>>>> ReadFromKafka, WriteToKafkafrom apache_beam.options.pipeline_options >>>>>>>> import PipelineOptions, StandardOptions >>>>>>>> >>>>>>>> if __name__ == '__main__': >>>>>>>> options = PipelineOptions([ >>>>>>>> "--job_endpoint=localhost:8099", >>>>>>>> "--environment_type=LOOPBACK", >>>>>>>> "--streaming", >>>>>>>> "--environment_config={\"command\":\"/opt/apache/beam/boot\"}", >>>>>>>> ]) >>>>>>>> >>>>>>>> options = options.view_as(StandardOptions) >>>>>>>> options.streaming = True >>>>>>>> >>>>>>>> pipeline = beam.Pipeline(options=options) >>>>>>>> >>>>>>>> result = ( >>>>>>>> pipeline >>>>>>>> >>>>>>>> | "Read from kafka" >> ReadFromKafka( >>>>>>>> consumer_config={ >>>>>>>> "bootstrap.servers": 'localhost:9092', >>>>>>>> }, >>>>>>>> topics=['mytopic'], >>>>>>>> expansion_service='localhost:8097', >>>>>>>> ) >>>>>>>> >>>>>>>> | beam.Map(print) >>>>>>>> ) >>>>>>>> >>>>>>>> pipeline.run() >>>>>>>> >>>>>>>> >>>>>>>> 1. Publish new message using kafka-producer.sh >>>>>>>> >>>>>>>> ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic >>>>>>>> mytopic>tryme >>>>>>>> >>>>>>>> After publishing this trial message, the beam pipeline perceives >>>>>>>> the message but crashes giving this error: >>>>>>>> >>>>>>>> RuntimeError: org.apache.beam.sdk.util.UserCodeException: >>>>>>>> org.apache.beam.sdk.coders.CoderException: cannot encode a null byte[] >>>>>>>> at >>>>>>>> org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:36) >>>>>>>> at >>>>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1014) >>>>>>>> at >>>>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner.access$1500(FnApiDoFnRunner.java:132) >>>>>>>> at >>>>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:1483) >>>>>>>> at >>>>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.output(FnApiDoFnRunner.java:1478) >>>>>>>> at >>>>>>>> org.apache.beam.sdk.io.kafka.KafkaIO$TypedWithoutMetadata$1.processElement(KafkaIO.java:1042) >>>>>>>> at >>>>>>>> org.apache.beam.sdk.io.kafka.KafkaIO$TypedWithoutMetadata$1$DoFnInvoker.invokeProcessElement(Unknown >>>>>>>> Source) >>>>>>>> at >>>>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:740) >>>>>>>> at >>>>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner.access$700(FnApiDoFnRunner.java:132) >>>>>>>> at >>>>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner$Factory.lambda$createRunnerForPTransform$1(FnApiDoFnRunner.java:203) >>>>>>>> at org.apache.beam >>>>>>>> >>>>>>>> Regards, >>>>>>>> >>>>>>>> Ayush Sharma. >>>>>>>> >>>>>>>>