Hi Yun,

The behavior with increased parallelism should be the same as with no
parallelism. In other words, for the input from the previous email, the
output should always be 1, 3, regardless of parallelism. Operationally, the
partial sums maintained in each subtask should somehow be aggregated before
they are output.

To answer the second question, I know that watermarks provide the same
functionality. Is there some way to convert the input with explicit
punctuation into one with watermarks? I see there is an interface called
AssignerWithPunctuatedWatermarks, maybe that's the solution. But I'm not
sure how this assigner would be used. For example, it could maintain the
number of previously seen Barriers and assign this number as a watermark to
each Value, but then this number becomes the state that needs to be shared
between multiple substreams. Or perhaps the Barriers can somehow be
duplicated and sent to each substream? Alternatively, is there some notion
of event-based windows that would be triggered by specific user-defined
elements in the stream? In such mechanism perhaps the watermarks would be
used internally, but they would not be explicitly exposed to the user?

Best regards,

Filip



On Tue, Oct 8, 2019 at 2:19 AM Yun Gao <yungao...@aliyun.com> wrote:

>
>       Hi Filip,
>
>            I have one question on the problem: what is the expected
> behavior when the  parallelism of the FlatMapFunction is increased to more
> than 1? Should each subtask maintains the partial sum of all values
> received, and whenever the barrier is received, then it just outputs the
> partial sum of the received value ?
>
>           Another question is that I think in Flink the watermark
> mechanism has provided the functionality similar to punctuation,  therefore
> is it possible to implement the same logic with the Flink Window directly?
>
>     Best,
>     Yun
>
> ------------------------------------------------------------------
> From:Filip Niksic <fnik...@seas.upenn.edu>
> Send Time:2019 Oct. 8 (Tue.) 08:56
> To:user <user@flink.apache.org>
> Subject:[QUESTION] How to parallelize with explicit punctuation in Flink?
>
> Hi all,
>
> What would be a natural way to implement a parallel version of the
> following Flink program?
>
> Suppose I have a stream of items of type DataItem with two concrete
> implementations of DataItem: Value and Barrier. Let’s say that Value holds
> an integer value, and Barrier acts as explicit punctuation.
>
> public interface DataItem {}
>
> public class Value implements DataItem {
>
>    private final int val;
>
>    public Value(int val) { this.val = val; }
>
>    public int getVal() { return val; }
>
> }
>
> public class Barrier implements DataItem {}
>
> The program should maintain a sum of values seen since the beginning of
> the stream. On each Barrier, the program should output the sum seen so far.
>
> An obvious way to implement this would be with a FlatMapFunction,
> maintaining the sum as state and emitting it on each Barrier.
>
> StreamExecutionEnvironment env = StreamExecutionEnvironment.
> getExecutionEnvironment();
>
> DataStream<DataItem> stream = env.fromElements(DataItem.class,
>
>        new Value(1), new Barrier(), new Value(3), new Value(-1), new
> Barrier());
>
> stream.flatMap(new FlatMapFunction<DataItem, Integer>() {
>
>    private int sum = 0;
>
>    @Override
>
>    public void flatMap(DataItem dataItem, Collector<Integer> collector) throws
> Exception {
>
>        if (dataItem instanceof Value) {
>
>            sum += ((Value) dataItem).getVal();
>
>        } else {
>
>            collector.collect(sum);
>
>        }
>
>    }
>
> }).setParallelism(1).print().setParallelism(1);
>
> env.execute();
>
> // We should see 1 followed by 3 as output
>
> However, such an operator cannot be parallelized, since the order of
> Values and Barriers matters. That’s why I need to set parallelism to 1
> above. Is there a way to rewrite this to exploit parallelism?
>
> (Another reason to set parallelism to 1 above is that I’m assuming there
> is a single instance of the FlatMapFunction. A proper implementation would
> take more care in using state. Feel free to comment on that as well.)
>
>
> Best regards,
>
>
> Filip Niksic
>
>
>

Reply via email to