Yeah, I figured it wasn't supported correctly on DirectRunner. Stumbled
upon several threads saying so.

On Dataflow, I've encountered a few different kinds of issues.
1. For the kafka_taxi example, the pipeline would start, the PubSub to
Kafka would run, but nothing gets read from Kafka (this seems to get
expanded as Dataflow shows KafkaIO.Read + Remove Kafka Metadata
sub-transforms.
2. For the snippet I shared above, I would vary it either with a "log"
transform or a direct "write" back to Kafka. Neither seems to work (and the
steps don't get expanded unlike the kafka_taxi example). With the "write"
step, I believe it didn't get captured in the Dataflow graph a few times.
3. No errors appear in both Job Logs and Worker Logs, except for one
message emitted from the "log" step if that happens.

All this is happening while I am producing ~4 messages/sec in Kafka. I can
verify that Kafka is working normally remotely and all (ran into some
issues setting it up).
I've also tested the KafkaIO.read transform in Java and can confirm that it
works as expected.

As an aside, I put together an ExternalTransform for MqttIO which you can
find here: https://gist.github.com/alexkoay/df35eb12bc2afd8f502ef13bc915b33c
I can confirm that it works in batch mode, but given that I couldn't get
Kafka to work with Dataflow, I don't have much confidence in getting this
to work.

Thanks for your help.

On Thu, Jun 3, 2021 at 12:05 AM Chamikara Jayalath <chamik...@google.com>
wrote:

> What error did you run into with Dataflow ? Did you observe any errors in
> worker logs ?
> If you follow the steps given in the example here
> <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/README.md>
> it should work. Make sure Dataflow workers have access to Kafka bootstrap
> servers you provide.
>
> Portable DirectRunner currently doesn't support streaming mode so you need
> to convert your pipeline to a batch pipeline and provide 'max_num_records'
> or 'max_read_time' to convert the Kafka source to a batch source.
> This is tracked in https://issues.apache.org/jira/browse/BEAM-7514.
>
> Also portable runners (Flink, Spark etc.) have a known issue related to
> SDF checkpointing in streaming mode which results in messages not being
> pushed to subsequent steps. This is tracked in
> https://issues.apache.org/jira/browse/BEAM-11998.
>
> Thanks,
> Cham
>
> On Wed, Jun 2, 2021 at 8:28 AM Ahmet Altay <al...@google.com> wrote:
>
>> /cc @Boyuan Zhang <boyu...@google.com> for kafka @Chamikara Jayalath
>> <chamik...@google.com> for multi language might be able to help.
>>
>> On Tue, Jun 1, 2021 at 9:39 PM Alex Koay <alexkoa...@gmail.com> wrote:
>>
>>> Hi all,
>>>
>>> I have created a simple snippet as such:
>>>
>>> import apache_beam as beam
>>> from apache_beam.io.kafka import ReadFromKafka
>>> from apache_beam.options.pipeline_options import PipelineOptions
>>>
>>> import logging
>>> logging.basicConfig(level=logging.WARNING)
>>>
>>> opts = direct_opts
>>> with beam.Pipeline(options=PipelineOptions(["--runner=DirectRunner",
>>> "--streaming"])) as p:
>>>     (
>>>         p
>>>         | "read" >> ReadFromKafka({"bootstrap.servers":
>>> f"localhost:9092"}, topics=["topic"])
>>>         | "log" >> beam.FlatMap(lambda x: logging.error("%s", str(x))
>>>     )
>>>
>>> I've set up a Kafka single node similar to the kafka_taxi README, and
>>> run this both on DirectRunner and DataflowRunner but it doesn't work. What
>>> I mean by this is that the Transform seems to be capturing data, but
>>> doesn't pass it on to subsequent transforms.
>>> With DirectRunner, if I send a non-keyed Kafka message to the server it
>>> actually crashes (saying that it cannot encode null into a byte[]), hence
>>> why I believe the transform is actually running.
>>>
>>> My main objective really is to create a streaming ExternalTransform for
>>> MqttIO and SolaceIO (
>>> https://github.com/SolaceProducts/solace-apache-beam).
>>> I've implemented the builder and registrars and they work in batch mode
>>> (with maxNumRecords) but otherwise it fails to read.
>>>
>>> With MqttIO, the streaming transform gets stuck waiting for one bundle
>>> to complete (if I continuously send messages into the MQTT server), and
>>> after stopping, the bundles finish but nothing gets passed on either.
>>>
>>> I appreciate any help I can get with this.
>>> Thanks!
>>>
>>> Cheers
>>> Alex
>>>
>>>
>>>

Reply via email to