> The main concern I have is that concurrent calls from the different pipelines might step on each other in some way

In your current solution this wouldn't happen because you'd actually end up with 3 separate sink operators.

> job1.union(job2).union(job3).addSink(sink)

This is the way to go.


On 23/09/2021 07:21, Antony Southworth wrote:
Hi

Firstly, apologies if I commit any faux-pas, I have never used a mailing list before. At least from Googling, reading Flink docs, and searching the mailing list archives for "fan-in" didn't turn up much so hoping someone can enlighten me here.

My use-case is similar to the following:

    DataStream<String> source = env.addSource(new FlinkKinesisConsumer("my-kinesis-stream", ...));    SinkFunction<String> sink = new MyCustomSNSSinkFunction("my-sns-topic", ...);

    // "fan-out" to several paths
    DataStream<String> job1 = source
        .assignTimestampsAndWatermarks(...)
        .keyBy(...)
        .window(TumblingEventTimeWindows.of(...))
        .process(new Job1ProcessFunction());

    DataStream<String> job2 = source
        .assignTimestampsAndWatermarks(...)
        .keyBy(...)
        .window(TumblingEventTimeWindows.of(...))
        .process(new Job2ProcessFunction());

    DataStream<String> job3 = source
        .assignTimestampsAndWatermarks(...)
        .keyBy(...)
        .window(TumblingEventTimeWindows.of(...))
        .process(new Job3ProcessFunction());

    // part I am curious about
    // add the same sink object to each path
    job1.addSink(sink);
    job2.addSink(sink);
    job3.addSink(sink);

    env.execute();

Hopefully it's clear, the situation I describe (though I omit a lot of details so please tell me if it's not so, or if any additional details could help).

I couldn't find any examples in my Googling or reading the Flink docs of situations like this, where there are several processing pipelines (`job1`, `job2`, and `job3` in my example code) all feeding in to the same `SinkFunction` object. The API docs don't really mention anything about this case either (again, unless I missed it, in which case please point it out to me :) ).

The main concern I have is that concurrent calls from the different pipelines might step on each other in some way; can anyone confirm or deny that the concern is valid? Should I be using `DataStream.union` first? E.g. like `job1.union(job2).union(job3).addSink(sink)`?

Appreciate any advice people have :)

---

*Antony Southworth*
Data Engineer - Halter Limited

/This message (including any attachments) may contain confidential, proprietary, privileged and/or private information. The information is intended to be for the use of the individual or entity designated above. If you are not the intended recipient of this message, please notify the sender immediately, and delete the message and any attachments. Any disclosure, reproduction, distribution or other use of this message or any attachments by an individual or entity other than the intended recipient is prohibited. /


Reply via email to