/cc @Boyuan Zhang <boyu...@google.com> for kafka @Chamikara Jayalath <chamik...@google.com> for multi language might be able to help.
On Tue, Jun 1, 2021 at 9:39 PM Alex Koay <alexkoa...@gmail.com> wrote: > Hi all, > > I have created a simple snippet as such: > > import apache_beam as beam > from apache_beam.io.kafka import ReadFromKafka > from apache_beam.options.pipeline_options import PipelineOptions > > import logging > logging.basicConfig(level=logging.WARNING) > > opts = direct_opts > with beam.Pipeline(options=PipelineOptions(["--runner=DirectRunner", > "--streaming"])) as p: > ( > p > | "read" >> ReadFromKafka({"bootstrap.servers": > f"localhost:9092"}, topics=["topic"]) > | "log" >> beam.FlatMap(lambda x: logging.error("%s", str(x)) > ) > > I've set up a Kafka single node similar to the kafka_taxi README, and run > this both on DirectRunner and DataflowRunner but it doesn't work. What I > mean by this is that the Transform seems to be capturing data, but doesn't > pass it on to subsequent transforms. > With DirectRunner, if I send a non-keyed Kafka message to the server it > actually crashes (saying that it cannot encode null into a byte[]), hence > why I believe the transform is actually running. > > My main objective really is to create a streaming ExternalTransform for > MqttIO and SolaceIO (https://github.com/SolaceProducts/solace-apache-beam > ). > I've implemented the builder and registrars and they work in batch mode > (with maxNumRecords) but otherwise it fails to read. > > With MqttIO, the streaming transform gets stuck waiting for one bundle to > complete (if I continuously send messages into the MQTT server), and after > stopping, the bundles finish but nothing gets passed on either. > > I appreciate any help I can get with this. > Thanks! > > Cheers > Alex > > >