What error did you run into with Dataflow ? Did you observe any errors in
worker logs ?
If you follow the steps given in the example here
<https://github.com/apache/beam/blob/master/sdks/java/io/kafka/README.md>
it should work. Make sure Dataflow workers have access to Kafka bootstrap
servers you provide.

Portable DirectRunner currently doesn't support streaming mode so you need
to convert your pipeline to a batch pipeline and provide 'max_num_records'
or 'max_read_time' to convert the Kafka source to a batch source.
This is tracked in https://issues.apache.org/jira/browse/BEAM-7514.

Also portable runners (Flink, Spark etc.) have a known issue related to SDF
checkpointing in streaming mode which results in messages not being pushed
to subsequent steps. This is tracked in
https://issues.apache.org/jira/browse/BEAM-11998.

Thanks,
Cham

On Wed, Jun 2, 2021 at 8:28 AM Ahmet Altay <al...@google.com> wrote:

> /cc @Boyuan Zhang <boyu...@google.com> for kafka @Chamikara Jayalath
> <chamik...@google.com> for multi language might be able to help.
>
> On Tue, Jun 1, 2021 at 9:39 PM Alex Koay <alexkoa...@gmail.com> wrote:
>
>> Hi all,
>>
>> I have created a simple snippet as such:
>>
>> import apache_beam as beam
>> from apache_beam.io.kafka import ReadFromKafka
>> from apache_beam.options.pipeline_options import PipelineOptions
>>
>> import logging
>> logging.basicConfig(level=logging.WARNING)
>>
>> opts = direct_opts
>> with beam.Pipeline(options=PipelineOptions(["--runner=DirectRunner",
>> "--streaming"])) as p:
>>     (
>>         p
>>         | "read" >> ReadFromKafka({"bootstrap.servers":
>> f"localhost:9092"}, topics=["topic"])
>>         | "log" >> beam.FlatMap(lambda x: logging.error("%s", str(x))
>>     )
>>
>> I've set up a Kafka single node similar to the kafka_taxi README, and run
>> this both on DirectRunner and DataflowRunner but it doesn't work. What I
>> mean by this is that the Transform seems to be capturing data, but doesn't
>> pass it on to subsequent transforms.
>> With DirectRunner, if I send a non-keyed Kafka message to the server it
>> actually crashes (saying that it cannot encode null into a byte[]), hence
>> why I believe the transform is actually running.
>>
>> My main objective really is to create a streaming ExternalTransform for
>> MqttIO and SolaceIO (https://github.com/SolaceProducts/solace-apache-beam
>> ).
>> I've implemented the builder and registrars and they work in batch mode
>> (with maxNumRecords) but otherwise it fails to read.
>>
>> With MqttIO, the streaming transform gets stuck waiting for one bundle to
>> complete (if I continuously send messages into the MQTT server), and after
>> stopping, the bundles finish but nothing gets passed on either.
>>
>> I appreciate any help I can get with this.
>> Thanks!
>>
>> Cheers
>> Alex
>>
>>
>>

Reply via email to