What error did you run into with Dataflow ? Did you observe any errors in worker logs ? If you follow the steps given in the example here <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/README.md> it should work. Make sure Dataflow workers have access to Kafka bootstrap servers you provide.
Portable DirectRunner currently doesn't support streaming mode so you need to convert your pipeline to a batch pipeline and provide 'max_num_records' or 'max_read_time' to convert the Kafka source to a batch source. This is tracked in https://issues.apache.org/jira/browse/BEAM-7514. Also portable runners (Flink, Spark etc.) have a known issue related to SDF checkpointing in streaming mode which results in messages not being pushed to subsequent steps. This is tracked in https://issues.apache.org/jira/browse/BEAM-11998. Thanks, Cham On Wed, Jun 2, 2021 at 8:28 AM Ahmet Altay <al...@google.com> wrote: > /cc @Boyuan Zhang <boyu...@google.com> for kafka @Chamikara Jayalath > <chamik...@google.com> for multi language might be able to help. > > On Tue, Jun 1, 2021 at 9:39 PM Alex Koay <alexkoa...@gmail.com> wrote: > >> Hi all, >> >> I have created a simple snippet as such: >> >> import apache_beam as beam >> from apache_beam.io.kafka import ReadFromKafka >> from apache_beam.options.pipeline_options import PipelineOptions >> >> import logging >> logging.basicConfig(level=logging.WARNING) >> >> opts = direct_opts >> with beam.Pipeline(options=PipelineOptions(["--runner=DirectRunner", >> "--streaming"])) as p: >> ( >> p >> | "read" >> ReadFromKafka({"bootstrap.servers": >> f"localhost:9092"}, topics=["topic"]) >> | "log" >> beam.FlatMap(lambda x: logging.error("%s", str(x)) >> ) >> >> I've set up a Kafka single node similar to the kafka_taxi README, and run >> this both on DirectRunner and DataflowRunner but it doesn't work. What I >> mean by this is that the Transform seems to be capturing data, but doesn't >> pass it on to subsequent transforms. >> With DirectRunner, if I send a non-keyed Kafka message to the server it >> actually crashes (saying that it cannot encode null into a byte[]), hence >> why I believe the transform is actually running. >> >> My main objective really is to create a streaming ExternalTransform for >> MqttIO and SolaceIO (https://github.com/SolaceProducts/solace-apache-beam >> ). >> I've implemented the builder and registrars and they work in batch mode >> (with maxNumRecords) but otherwise it fails to read. >> >> With MqttIO, the streaming transform gets stuck waiting for one bundle to >> complete (if I continuously send messages into the MQTT server), and after >> stopping, the bundles finish but nothing gets passed on either. >> >> I appreciate any help I can get with this. >> Thanks! >> >> Cheers >> Alex >> >> >>