CC-ing Chamikara as he got omitted from the reply all I did earlier. On Thu, Jun 3, 2021 at 12:43 AM Alex Koay <alexkoa...@gmail.com> wrote:
> Yeah, I figured it wasn't supported correctly on DirectRunner. Stumbled > upon several threads saying so. > > On Dataflow, I've encountered a few different kinds of issues. > 1. For the kafka_taxi example, the pipeline would start, the PubSub to > Kafka would run, but nothing gets read from Kafka (this seems to get > expanded as Dataflow shows KafkaIO.Read + Remove Kafka Metadata > sub-transforms. > 2. For the snippet I shared above, I would vary it either with a "log" > transform or a direct "write" back to Kafka. Neither seems to work (and the > steps don't get expanded unlike the kafka_taxi example). With the "write" > step, I believe it didn't get captured in the Dataflow graph a few times. > 3. No errors appear in both Job Logs and Worker Logs, except for one > message emitted from the "log" step if that happens. > > All this is happening while I am producing ~4 messages/sec in Kafka. I can > verify that Kafka is working normally remotely and all (ran into some > issues setting it up). > I've also tested the KafkaIO.read transform in Java and can confirm that > it works as expected. > > As an aside, I put together an ExternalTransform for MqttIO which you can > find here: > https://gist.github.com/alexkoay/df35eb12bc2afd8f502ef13bc915b33c > I can confirm that it works in batch mode, but given that I couldn't get > Kafka to work with Dataflow, I don't have much confidence in getting this > to work. > > Thanks for your help. > > On Thu, Jun 3, 2021 at 12:05 AM Chamikara Jayalath <chamik...@google.com> > wrote: > >> What error did you run into with Dataflow ? Did you observe any errors in >> worker logs ? >> If you follow the steps given in the example here >> <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/README.md> >> it should work. Make sure Dataflow workers have access to Kafka bootstrap >> servers you provide. >> >> Portable DirectRunner currently doesn't support streaming mode so you >> need to convert your pipeline to a batch pipeline and provide >> 'max_num_records' or 'max_read_time' to convert the Kafka source to a batch >> source. >> This is tracked in https://issues.apache.org/jira/browse/BEAM-7514. >> >> Also portable runners (Flink, Spark etc.) have a known issue related to >> SDF checkpointing in streaming mode which results in messages not being >> pushed to subsequent steps. This is tracked in >> https://issues.apache.org/jira/browse/BEAM-11998. >> >> Thanks, >> Cham >> >> On Wed, Jun 2, 2021 at 8:28 AM Ahmet Altay <al...@google.com> wrote: >> >>> /cc @Boyuan Zhang <boyu...@google.com> for kafka @Chamikara Jayalath >>> <chamik...@google.com> for multi language might be able to help. >>> >>> On Tue, Jun 1, 2021 at 9:39 PM Alex Koay <alexkoa...@gmail.com> wrote: >>> >>>> Hi all, >>>> >>>> I have created a simple snippet as such: >>>> >>>> import apache_beam as beam >>>> from apache_beam.io.kafka import ReadFromKafka >>>> from apache_beam.options.pipeline_options import PipelineOptions >>>> >>>> import logging >>>> logging.basicConfig(level=logging.WARNING) >>>> >>>> opts = direct_opts >>>> with beam.Pipeline(options=PipelineOptions(["--runner=DirectRunner", >>>> "--streaming"])) as p: >>>> ( >>>> p >>>> | "read" >> ReadFromKafka({"bootstrap.servers": >>>> f"localhost:9092"}, topics=["topic"]) >>>> | "log" >> beam.FlatMap(lambda x: logging.error("%s", str(x)) >>>> ) >>>> >>>> I've set up a Kafka single node similar to the kafka_taxi README, and >>>> run this both on DirectRunner and DataflowRunner but it doesn't work. What >>>> I mean by this is that the Transform seems to be capturing data, but >>>> doesn't pass it on to subsequent transforms. >>>> With DirectRunner, if I send a non-keyed Kafka message to the server it >>>> actually crashes (saying that it cannot encode null into a byte[]), hence >>>> why I believe the transform is actually running. >>>> >>>> My main objective really is to create a streaming ExternalTransform for >>>> MqttIO and SolaceIO ( >>>> https://github.com/SolaceProducts/solace-apache-beam). >>>> I've implemented the builder and registrars and they work in batch mode >>>> (with maxNumRecords) but otherwise it fails to read. >>>> >>>> With MqttIO, the streaming transform gets stuck waiting for one bundle >>>> to complete (if I continuously send messages into the MQTT server), and >>>> after stopping, the bundles finish but nothing gets passed on either. >>>> >>>> I appreciate any help I can get with this. >>>> Thanks! >>>> >>>> Cheers >>>> Alex >>>> >>>> >>>>