Thank you for the suggestions. I tried using FlinkRunner and setting environment_type either to DOCKER or LOOPBACK gives an error - java.lang.UnsupportedOperationException: The ActiveBundle does not have a registered bundle checkpoint handler.
I found that this issue has been reported ( https://issues.apache.org/jira/browse/BEAM-6868) and hence upvoting it. Thank you for the prompt responses and looking forward to using this feature in the future. Regards, Ayush. On Sat, Jul 18, 2020 at 3:14 PM Chamikara Jayalath <chamik...@google.com> wrote: > > > On Fri, Jul 17, 2020 at 10:04 PM ayush sharma <1705ay...@gmail.com> wrote: > >> Thank you guys for the reply. I am really stuck and could not proceed >> further. >> Yes, the previous trial published message had null key. >> But when I send key:value pair through producer using >> >> ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic >> mytopic --property *"parse.key=true" --property "key.separator=:"* >> > tryKey:tryValue >> >> I do not get any error but beam does not print the received message. Here >> is how my pipeline looks like, >> result = ( >> pipeline >> >> | "Read from kafka" >> ReadFromKafka( >> consumer_config={ >> "bootstrap.servers": 'localhost:9092', >> }, >> topics=['mytopic'], >> expansion_service='localhost:8097', >> >> | "print" >> beam.Map(print) >> ) >> >> > I suspect DirectRunner in LOOPBACK mode might not be working for > cross-language transforms today. Please note that cross-language transforms > framework is fairly new [1] and we are adding support for various runners > and environment configurations. > Can you try with Flink in DOCKER mode ? > > >> If this is not the way we make beam and kafka communicate then please >> share a working example which showcases how a message published in kafka >> gets received by beam while streaming. >> > > I'm adding an example but I've only tested this with Dataflow yet. I hope > to test that example for more runners and add additional instructions > there. > https://github.com/apache/beam/pull/12188 > > Thanks, > Cham > > [1] https://beam.apache.org/roadmap/connectors-multi-sdk/ > >> >> Regards, >> Ayush Sharma >> >> On Fri, Jul 17, 2020 at 11:39 PM Chamikara Jayalath <chamik...@google.com> >> wrote: >> >>> Yes, seems like this is due to the key being null. XLang KafkaIO has to >>> be updated to support this. You should not run into this error if you >>> publish keys and values that are not null. >>> >>> >>> >>> >>> On Fri, Jul 17, 2020 at 8:04 PM Luke Cwik <lc...@google.com> wrote: >>> >>>> +dev <d...@beam.apache.org> >>>> >>>> On Fri, Jul 17, 2020 at 8:03 PM Luke Cwik <lc...@google.com> wrote: >>>> >>>>> +Heejong Lee <heej...@google.com> +Chamikara Jayalath >>>>> <chamik...@google.com> >>>>> >>>>> Do you know if your trial record has an empty key or value? >>>>> If so, then you hit a bug and it seems as though there was a miss >>>>> supporting this usecase. >>>>> >>>>> Heejong and Cham, >>>>> It looks like the Javadoc for ByteArrayDeserializer and other >>>>> Deserializers can return null[1, 2] and we aren't using >>>>> NullableCoder.of(ByteArrayCoder.of()) in the expansion[3]. Note that the >>>>> non XLang KafkaIO does this correctly in its regular coder inference >>>>> logic[4]. I flied BEAM-10529[5] >>>>> >>>>> 1: >>>>> https://kafka.apache.org/21/javadoc/org/apache/kafka/common/serialization/ByteArrayDeserializer.html#deserialize-java.lang.String-byte:A- >>>>> 2: >>>>> https://kafka.apache.org/21/javadoc/org/apache/kafka/common/serialization/StringDeserializer.html#deserialize-java.lang.String-byte:A- >>>>> 3: >>>>> https://github.com/apache/beam/blob/af2d6b0379d64b522ecb769d88e9e7e7b8900208/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L478 >>>>> 4: >>>>> https://github.com/apache/beam/blob/af2d6b0379d64b522ecb769d88e9e7e7b8900208/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/LocalDeserializerProvider.java#L85 >>>>> 5: https://issues.apache.org/jira/browse/BEAM-10529 >>>>> >>>>> >>>>> On Fri, Jul 17, 2020 at 8:51 AM ayush sharma <1705ay...@gmail.com> >>>>> wrote: >>>>> >>>>>> Hi, >>>>>> >>>>>> I am trying to build a streaming beam pipeline in python which should >>>>>> capture messages from kafka and then execute further stages of data >>>>>> fetching from other sources and aggregation. The step-by-step process of >>>>>> what I have built till now is: >>>>>> >>>>>> 1. >>>>>> >>>>>> Running Kafka instance on localhost:9092 >>>>>> >>>>>> ./bin/kafka-server-start.sh ./config/server.properties >>>>>> 2. >>>>>> >>>>>> Run beam-flink job server using docker >>>>>> >>>>>> docker run --net=host apache/beam_flink1.10_job_server:latest >>>>>> 3. >>>>>> >>>>>> Run beam-kafka pipeline >>>>>> >>>>>> import apache_beam as beamfrom apache_beam.io.external.kafka import >>>>>> ReadFromKafka, WriteToKafkafrom apache_beam.options.pipeline_options >>>>>> import PipelineOptions, StandardOptions >>>>>> >>>>>> if __name__ == '__main__': >>>>>> options = PipelineOptions([ >>>>>> "--job_endpoint=localhost:8099", >>>>>> "--environment_type=LOOPBACK", >>>>>> "--streaming", >>>>>> "--environment_config={\"command\":\"/opt/apache/beam/boot\"}", >>>>>> ]) >>>>>> >>>>>> options = options.view_as(StandardOptions) >>>>>> options.streaming = True >>>>>> >>>>>> pipeline = beam.Pipeline(options=options) >>>>>> >>>>>> result = ( >>>>>> pipeline >>>>>> >>>>>> | "Read from kafka" >> ReadFromKafka( >>>>>> consumer_config={ >>>>>> "bootstrap.servers": 'localhost:9092', >>>>>> }, >>>>>> topics=['mytopic'], >>>>>> expansion_service='localhost:8097', >>>>>> ) >>>>>> >>>>>> | beam.Map(print) >>>>>> ) >>>>>> >>>>>> pipeline.run() >>>>>> >>>>>> >>>>>> 1. Publish new message using kafka-producer.sh >>>>>> >>>>>> ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic >>>>>> mytopic>tryme >>>>>> >>>>>> After publishing this trial message, the beam pipeline perceives the >>>>>> message but crashes giving this error: >>>>>> >>>>>> RuntimeError: org.apache.beam.sdk.util.UserCodeException: >>>>>> org.apache.beam.sdk.coders.CoderException: cannot encode a null byte[] >>>>>> at >>>>>> org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:36) >>>>>> at >>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1014) >>>>>> at >>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner.access$1500(FnApiDoFnRunner.java:132) >>>>>> at >>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:1483) >>>>>> at >>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.output(FnApiDoFnRunner.java:1478) >>>>>> at >>>>>> org.apache.beam.sdk.io.kafka.KafkaIO$TypedWithoutMetadata$1.processElement(KafkaIO.java:1042) >>>>>> at >>>>>> org.apache.beam.sdk.io.kafka.KafkaIO$TypedWithoutMetadata$1$DoFnInvoker.invokeProcessElement(Unknown >>>>>> Source) >>>>>> at >>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:740) >>>>>> at >>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner.access$700(FnApiDoFnRunner.java:132) >>>>>> at >>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner$Factory.lambda$createRunnerForPTransform$1(FnApiDoFnRunner.java:203) >>>>>> at org.apache.beam >>>>>> >>>>>> Regards, >>>>>> >>>>>> Ayush Sharma. >>>>>> >>>>>>