CC-ing Chamikara as he got omitted from the reply all I did earlier.

On Thu, Jun 3, 2021 at 12:43 AM Alex Koay <alexkoa...@gmail.com> wrote:

> Yeah, I figured it wasn't supported correctly on DirectRunner. Stumbled
> upon several threads saying so.
>
> On Dataflow, I've encountered a few different kinds of issues.
> 1. For the kafka_taxi example, the pipeline would start, the PubSub to
> Kafka would run, but nothing gets read from Kafka (this seems to get
> expanded as Dataflow shows KafkaIO.Read + Remove Kafka Metadata
> sub-transforms.
> 2. For the snippet I shared above, I would vary it either with a "log"
> transform or a direct "write" back to Kafka. Neither seems to work (and the
> steps don't get expanded unlike the kafka_taxi example). With the "write"
> step, I believe it didn't get captured in the Dataflow graph a few times.
> 3. No errors appear in both Job Logs and Worker Logs, except for one
> message emitted from the "log" step if that happens.
>
> All this is happening while I am producing ~4 messages/sec in Kafka. I can
> verify that Kafka is working normally remotely and all (ran into some
> issues setting it up).
> I've also tested the KafkaIO.read transform in Java and can confirm that
> it works as expected.
>
> As an aside, I put together an ExternalTransform for MqttIO which you can
> find here:
> https://gist.github.com/alexkoay/df35eb12bc2afd8f502ef13bc915b33c
> I can confirm that it works in batch mode, but given that I couldn't get
> Kafka to work with Dataflow, I don't have much confidence in getting this
> to work.
>
> Thanks for your help.
>
> On Thu, Jun 3, 2021 at 12:05 AM Chamikara Jayalath <chamik...@google.com>
> wrote:
>
>> What error did you run into with Dataflow ? Did you observe any errors in
>> worker logs ?
>> If you follow the steps given in the example here
>> <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/README.md>
>> it should work. Make sure Dataflow workers have access to Kafka bootstrap
>> servers you provide.
>>
>> Portable DirectRunner currently doesn't support streaming mode so you
>> need to convert your pipeline to a batch pipeline and provide
>> 'max_num_records' or 'max_read_time' to convert the Kafka source to a batch
>> source.
>> This is tracked in https://issues.apache.org/jira/browse/BEAM-7514.
>>
>> Also portable runners (Flink, Spark etc.) have a known issue related to
>> SDF checkpointing in streaming mode which results in messages not being
>> pushed to subsequent steps. This is tracked in
>> https://issues.apache.org/jira/browse/BEAM-11998.
>>
>> Thanks,
>> Cham
>>
>> On Wed, Jun 2, 2021 at 8:28 AM Ahmet Altay <al...@google.com> wrote:
>>
>>> /cc @Boyuan Zhang <boyu...@google.com> for kafka @Chamikara Jayalath
>>> <chamik...@google.com> for multi language might be able to help.
>>>
>>> On Tue, Jun 1, 2021 at 9:39 PM Alex Koay <alexkoa...@gmail.com> wrote:
>>>
>>>> Hi all,
>>>>
>>>> I have created a simple snippet as such:
>>>>
>>>> import apache_beam as beam
>>>> from apache_beam.io.kafka import ReadFromKafka
>>>> from apache_beam.options.pipeline_options import PipelineOptions
>>>>
>>>> import logging
>>>> logging.basicConfig(level=logging.WARNING)
>>>>
>>>> opts = direct_opts
>>>> with beam.Pipeline(options=PipelineOptions(["--runner=DirectRunner",
>>>> "--streaming"])) as p:
>>>>     (
>>>>         p
>>>>         | "read" >> ReadFromKafka({"bootstrap.servers":
>>>> f"localhost:9092"}, topics=["topic"])
>>>>         | "log" >> beam.FlatMap(lambda x: logging.error("%s", str(x))
>>>>     )
>>>>
>>>> I've set up a Kafka single node similar to the kafka_taxi README, and
>>>> run this both on DirectRunner and DataflowRunner but it doesn't work. What
>>>> I mean by this is that the Transform seems to be capturing data, but
>>>> doesn't pass it on to subsequent transforms.
>>>> With DirectRunner, if I send a non-keyed Kafka message to the server it
>>>> actually crashes (saying that it cannot encode null into a byte[]), hence
>>>> why I believe the transform is actually running.
>>>>
>>>> My main objective really is to create a streaming ExternalTransform for
>>>> MqttIO and SolaceIO (
>>>> https://github.com/SolaceProducts/solace-apache-beam).
>>>> I've implemented the builder and registrars and they work in batch mode
>>>> (with maxNumRecords) but otherwise it fails to read.
>>>>
>>>> With MqttIO, the streaming transform gets stuck waiting for one bundle
>>>> to complete (if I continuously send messages into the MQTT server), and
>>>> after stopping, the bundles finish but nothing gets passed on either.
>>>>
>>>> I appreciate any help I can get with this.
>>>> Thanks!
>>>>
>>>> Cheers
>>>> Alex
>>>>
>>>>
>>>>

Reply via email to