Hi Filip, I don't really understand your problem here.
Do you have a source with a single sequential stream, where from time to time, 
there is a barrier element? Or do you have a source like Kafka with multiple 
partitions?
If you have case 2 with multiple partitions, what exactly do you mean by "order 
matters"? Will each partition have its own barrier? Or do you have just one 
barrier for all partitions? In that case, you will naturally have an ordering 
problem if your events itself contain no time data.
If you have a "sequential source" why do you need parallelism? Won't it work 
out to read that partition data in one task (possibly skipping deserialization 
as much as possible to only recognize barrier events) and then add a downstream 
task with higher parallelism doing the full deserialization and other work?
Best regardsTheo
-------- Ursprüngliche Nachricht --------
Betreff: Re: [QUESTION] How to parallelize with explicit punctuation in Flink?
Von: Yun Gao
An: Filip Niksic ,user
Cc: Chesnay Schepler


      Hi Filip,

         As a whole, I also think to increase the parallelism of the reduce to 
more than 1, we should use a parallel window to compute the partial sum and 
then sum the partial sum with WindowAll. 

        For the assignTimestampAndWatermarks, From my side I think the current 
usage should be OK and it works the same to the other operators. Besides, for 
the keyBy Partitioner, I think "% PARALLELISM" is not necessary and Flink will 
take care of the parallelism. In other words, I think you can use .keyBy(x -> 
x.getId()) directly.

    Best, 
    Yun


------------------------------------------------------------------
From:Filip Niksic <fnik...@seas.upenn.edu>
Send Time:2019 Oct. 9 (Wed.) 12:21
To:user <user@flink.apache.org>
Cc:Yun Gao <yungao...@aliyun.com>; Chesnay Schepler <ches...@apache.org>
Subject:Re: [QUESTION] How to parallelize with explicit punctuation in Flink?

Here is the solution I currently have. It turned out to be more complicated 
than I expected. It would be great if a more experienced Flink user could 
comment and point out the shortcomings. And if you have other ideas for 
achieving the same thing, let me know!

Let's start like in the original email, except now we set the time 
characteristic to EventTime and parallelism to a constant named PARALLELISM.

StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

final int PARALLELISM = 2;
env.setParallelism(PARALLELISM);

DataStream<DataItem> stream = env.fromElements(DataItem.class,
        new Value(1), new Barrier(), new Value(3), new Value(-1), new 
Barrier());

The first step is to use a punctuation-based timestamp-and-watermark assigner 
as follows. We keep track of the number of barriers in the stream. We assign a 
timestamp n to the n-th barrier and all the values that immediately precede it, 
and we emit a watermark with timestamp n on the n-th barrier. This will allow 
us to define 1 millisecond tumbling windows that precisely capture the values 
between two barriers.

DataStream<DataItem> timedStream =
        stream.assignTimestampsAndWatermarks(new 
AssignerWithPunctuatedWatermarks<DataItem>() {
    private long barrierCount = 0;

    @Override
    public long extractTimestamp(DataItem item, long previousTimestamp) {
        return barrierCount;
    }

    @Nullable
    @Override
    public Watermark checkAndGetNextWatermark(DataItem item, long 
extractedTimestamp) {
        if (item instanceof Barrier) {
            barrierCount++;
            return new Watermark(extractedTimestamp);
        }
        return null;
    }
});

In the test input stream, the first value and barrier get a timestamp 0, and 
the next two values and the final barrier get a timestamp 1. Two watermarks 
with timestamps 0 and 1 are emitted.

To achieve parallelization, we partition the values by artificially generated 
keys. A value's key is based on its position in the stream, so we first wrap 
the values into a type that contains this information.

class ValueWithId {
    private final int val;
    private final long id;

    public ValueWithId(int val, long id) {
        this.val = val;
        this.id = id;
    }
    public int getVal() { return val; }
    public long getId() { return id; }
}

Here is the mapping. At the same time we can drop the barriers, since we no 
longer need them. Note that we need to explicitly set the mapping operator's 
parallelism to 1, since the operator is stateful.

DataStream<ValueWithId> wrappedStream =
        timedStream.flatMap(new FlatMapFunction<DataItem, ValueWithId>() {
    private long count = 0L;

    @Override
    public void flatMap(DataItem item, Collector<ValueWithId> collector) throws 
Exception {
        if (item instanceof Value) {
            int val = ((Value) item).getVal();
            collector.collect(new ValueWithId(val, count++));
        }
    }
}).setParallelism(1);

Now we're ready to do the key-based partitioning. A value's key is its id as 
assigned above modulo PARALLELISM. We follow the partitioning by splitting the 
stream into 1 millisecond tumbling windows. Then we simply aggregate the 
partial sums, first for each key separately (and importantly, in parallel), and 
then for each window.

DataStream<Integer> partialSums = wrappedStream.keyBy(x -> x.getId() % 
PARALLELISM)
        .timeWindow(Time.of(1L, TimeUnit.MILLISECONDS))
        .aggregate(new AggregateFunction<ValueWithId, Integer, Integer>() {
            @Override
            public Integer createAccumulator() { return 0; }

            @Override
            public Integer add(ValueWithId valueWithId, Integer acc) { return 
acc + valueWithId.getVal(); }

            @Override
            public Integer getResult(Integer acc) { return acc; }

            @Override
            public Integer merge(Integer acc1, Integer acc2) { return acc1 + 
acc2; }
        })
        .timeWindowAll(Time.of(1L, TimeUnit.MILLISECONDS))
        .reduce((x, y) -> x + y);

Finally, in the original problem I asked for cumulative sums since the start of 
the stream, so we perform the last set of transformations to achieve that.

DataStream<Integer> cumulativeSums = partialSums
        .windowAll(GlobalWindows.create())
        .trigger(CountTrigger.of(1))
        .reduce((x, y) -> x + y);
cumulativeSums.print().setParallelism(1);
env.execute();
// We should see 1 followed by 3 as output

I am not completely sure if my usage of state in the timestamp-and-watermark 
assigner and the mapper is correct. Is it possible for Flink to duplicate the 
assigner, move it around and somehow mess up the timestamps? Likewise, is it 
possible for things to go wrong with the mapper?

Another concern I have is that my key-based partitions depend on the constant 
PARALLELISM. Ideally, the program should be flexible about the parallelism that 
happens to be available during runtime.

Finally, if anyone notices that I am in any part reinventing the wheel and that 
Flink already has a feature implementing some of the above, or that something 
can be done more elegantly, let me know!

Best regards,

Filip


On Tue, Oct 8, 2019 at 11:12 AM Filip Niksic <fnik...@seas.upenn.edu> wrote:

Hi Chesnay,

Thanks for the reply. While your solution ultimately does use multiple 
partitions, from what I can tell the underlying processing is still sequential. 
Imagine a stream where barriers are quite rare, say a million values is 
followed by a barrier. Then these million values all end up at the same 
partition and are added up sequentially, and while they are being processed, 
the other partitions are waiting for their turn. A truly parallel solution 
would partition the million values, process each partition in parallel to get 
the partial sums, and on each barrier aggregate the partial sums into a total 
sum.

Filip


On Tue, Oct 8, 2019 at 9:09 AM Chesnay Schepler <ches...@apache.org> wrote:
In other words, you need a way to partition the stream such that a series of 
items followed by a barrier are never interrupted.

I'm wondering whether you could just apply DataStream#partitionCustom to your 
source:
public static class BarrierPartitioner implements Partitioner<DataItem> {

   private int currentPartition = 0;
   @Override
   public int partition(DataItem key, int numPartitions) {
      if (key instanceof Barrier) {
         int partitionToReturn = currentPartition;
         currentPartition = (currentPartition + 1) % numPartitions;
         return partitionToReturn;
      } else {
         return currentPartition;
      }
   }
}
 
DataStream<DataItem> stream = ...;
DataStream<DataItem> partitionedStream = stream.partitionCustom(new 
BarrierPartitioner(), item -> item); 

On 08/10/2019 14:55, Filip Niksic wrote:
Hi Yun,

The behavior with increased parallelism should be the same as with no 
parallelism. In other words, for the input from the previous email, the output 
should always be 1, 3, regardless of parallelism. Operationally, the partial 
sums maintained in each subtask should somehow be aggregated before they are 
output.

To answer the second question, I know that watermarks provide the same 
functionality. Is there some way to convert the input with explicit punctuation 
into one with watermarks? I see there is an interface called 
AssignerWithPunctuatedWatermarks, maybe that's the solution. But I'm not sure 
how this assigner would be used. For example, it could maintain the number of 
previously seen Barriers and assign this number as a watermark to each Value, 
but then this number becomes the state that needs to be shared between multiple 
substreams. Or perhaps the Barriers can somehow be duplicated and sent to each 
substream? Alternatively, is there some notion of event-based windows that 
would be triggered by specific user-defined elements in the stream? In such 
mechanism perhaps the watermarks would be used internally, but they would not 
be explicitly exposed to the user?

Best regards,

Filip


On Tue, Oct 8, 2019 at 2:19 AM Yun Gao <yungao...@aliyun.com> wrote:

       Hi Filip,
           I have one question on the problem: what is the expected behavior 
when the  parallelism of the FlatMapFunction is increased to more than 1? 
Should each subtask maintains the partial sum of all values received, and 
whenever the barrier is received, then it just outputs the partial sum of the 
received value ? 

          Another question is that I think in Flink the watermark mechanism has 
provided the functionality similar to punctuation,  therefore is it possible to 
implement the same logic with the Flink Window directly?
    Best,
    Yun 

------------------------------------------------------------------
From:Filip Niksic <fnik...@seas.upenn.edu>
Send Time:2019 Oct. 8 (Tue.) 08:56
To:user <user@flink.apache.org>
Subject:[QUESTION] How to parallelize with explicit punctuation in Flink?

Hi all,
What would be a natural way to implement a parallel version of the following 
Flink program?
Suppose I have a stream of items of type DataItem with two concrete 
implementations of DataItem: Value and Barrier. Let’s say that Value holds an 
integer value, and Barrier acts as explicit punctuation.
public interface DataItem {} 
public class Value implements DataItem {
 private final int val;
 public Value(int val) { this.val = val; }
 public int getVal() { return val; }
} 
public class Barrier implements DataItem {}
The program should maintain a sum of values seen since the beginning of the 
stream. On each Barrier, the program should output the sum seen so far.
An obvious way to implement this would be with a FlatMapFunction, maintaining 
the sum as state and emitting it on each Barrier.
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment(); 
DataStream<DataItem> stream = env.fromElements(DataItem.class,
 new Value(1), new Barrier(), new Value(3), new Value(-1), new Barrier()); 
stream.flatMap(new FlatMapFunction<DataItem, Integer>() {
 private int sum = 0; 
 @Override
 public void flatMap(DataItem dataItem, Collector<Integer> collector) throws 
Exception {
 if (dataItem instanceof Value) {
 sum += ((Value) dataItem).getVal();
       } else {
           collector.collect(sum);
       }
   }
}).setParallelism(1).print().setParallelism(1);
env.execute();
// We should see 1 followed by 3 as output
However, such an operator cannot be parallelized, since the order of Values and 
Barriers matters. That’s why I need to set parallelism to 1 above. Is there a 
way to rewrite this to exploit parallelism? 
(Another reason to set parallelism to 1 above is that I’m assuming there is a 
single instance of the FlatMapFunction. A proper implementation would take more 
care in using state. Feel free to comment on that as well.)

Best regards,

Filip Niksic



Reply via email to