Hi Filip, 

My point was not about the computation of the "maximum". My point was: You 
could hopefully read the stream sequentially and just assign punctuated 
watermarks to it. Once you have assigned the watermarks properly (And before 
you do your expensive computatation, like in this case parsing the entire event 
and building the sum), you could tell flink to repartition / key the data and 
shuffle it to the worker tasks in the network, so that the downstream 
operations are performed in parallel. Flink will afaik then take care of 
dealing with the watmark internally and everything is fine. 
I think it is a rare usecase that you have a sequential stream which can not be 
simply read sequentally. If its such a large stream, that you can't do on a 
single host: "Read, extract special event, shuffle to the network to other 
tasks", you probably have a larger issue and need to rethink on the source 
level already, e.g. change the method serialization to something which has a 
really lightweight parsing for finding the special events or such. 

Best regards 
Theo 


Von: "Filip Niksic" <fnik...@seas.upenn.edu> 
An: "Theo Diefenthal" <theo.diefent...@scoop-software.de> 
CC: "user" <user@flink.apache.org> 
Gesendet: Donnerstag, 10. Oktober 2019 00:08:38 
Betreff: Re: [QUESTION] How to parallelize with explicit punctuation in Flink? 

Hi Theo, 

It is a single sequential stream. 

If I read your response correctly, you are arguing that summing a bunch of 
numbers is not much more computationally intensive than assigning timestamps to 
those numbers, so if the latter has to be done sequentially anyway, then why 
should the former be done in parallel? To that I can only say that the example 
I gave is intentionally simple in order to make the problem conceptually clean. 
By understanding the conceptually clean version of the problem, we also gain 
insight into messier realistic versions where the operations we want to 
parallelize may be much more computationally intensive. 

Filip 



On Wed, Oct 9, 2019 at 1:28 PM [ mailto:theo.diefent...@scoop-software.de | 
theo.diefent...@scoop-software.de ] < [ 
mailto:theo.diefent...@scoop-software.de | theo.diefent...@scoop-software.de ] 
> wrote: 


Hi Filip, I don't really understand your problem here. 
Do you have a source with a single sequential stream, where from time to time, 
there is a barrier element? Or do you have a source like Kafka with multiple 
partitions? 
If you have case 2 with multiple partitions, what exactly do you mean by "order 
matters"? Will each partition have its own barrier? Or do you have just one 
barrier for all partitions? In that case, you will naturally have an ordering 
problem if your events itself contain no time data. 
If you have a "sequential source" why do you need parallelism? Won't it work 
out to read that partition data in one task (possibly skipping deserialization 
as much as possible to only recognize barrier events) and then add a downstream 
task with higher parallelism doing the full deserialization and other work? 
Best regardsTheo 
-------- Ursprüngliche Nachricht -------- 
Betreff: Re: [QUESTION] How to parallelize with explicit punctuation in Flink? 
Von: Yun Gao 
An: Filip Niksic ,user 
Cc: Chesnay Schepler 


Hi Filip, 

As a whole, I also think to increase the parallelism of the reduce to more than 
1, we should use a parallel window to compute the partial sum and then sum the 
partial sum with WindowAll. 

For the assignTimestampAndWatermarks, From my side I think the current usage 
should be OK and it works the same to the other operators. Besides, for the 
keyBy Partitioner, I think "% PARALLELISM" is not necessary and Flink will take 
care of the parallelism. In other words, I think you can use .keyBy(x -> 
x.getId()) directly. 

Best, 
Yun 


------------------------------------------------------------------ 
From:Filip Niksic < [ mailto:fnik...@seas.upenn.edu | fnik...@seas.upenn.edu ] 
> 
Send Time:2019 Oct. 9 (Wed.) 12:21 
To:user < [ mailto:user@flink.apache.org | user@flink.apache.org ] > 
Cc:Yun Gao < [ mailto:yungao...@aliyun.com | yungao...@aliyun.com ] >; Chesnay 
Schepler < [ mailto:ches...@apache.org | ches...@apache.org ] > 
Subject:Re: [QUESTION] How to parallelize with explicit punctuation in Flink? 

Here is the solution I currently have. It turned out to be more complicated 
than I expected. It would be great if a more experienced Flink user could 
comment and point out the shortcomings. And if you have other ideas for 
achieving the same thing, let me know! 

Let's start like in the original email, except now we set the time 
characteristic to EventTime and parallelism to a constant named PARALLELISM. 

StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment(); 
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); 

final int PARALLELISM = 2; 
env.setParallelism(PARALLELISM); 

DataStream<DataItem> stream = env.fromElements(DataItem.class, 
new Value(1), new Barrier(), new Value(3), new Value(-1), new Barrier()); 

The first step is to use a punctuation-based timestamp-and-watermark assigner 
as follows. We keep track of the number of barriers in the stream. We assign a 
timestamp n to the n-th barrier and all the values that immediately precede it, 
and we emit a watermark with timestamp n on the n-th barrier. This will allow 
us to define 1 millisecond tumbling windows that precisely capture the values 
between two barriers. 

DataStream<DataItem> timedStream = 
stream.assignTimestampsAndWatermarks(new 
AssignerWithPunctuatedWatermarks<DataItem>() { 
private long barrierCount = 0; 

@Override 
public long extractTimestamp(DataItem item, long previousTimestamp) { 
return barrierCount; 
} 

@Nullable 
@Override 
public Watermark checkAndGetNextWatermark(DataItem item, long 
extractedTimestamp) { 
if (item instanceof Barrier) { 
barrierCount++; 
return new Watermark(extractedTimestamp); 
} 
return null; 
} 
}); 

In the test input stream, the first value and barrier get a timestamp 0, and 
the next two values and the final barrier get a timestamp 1. Two watermarks 
with timestamps 0 and 1 are emitted. 

To achieve parallelization, we partition the values by artificially generated 
keys. A value's key is based on its position in the stream, so we first wrap 
the values into a type that contains this information. 

class ValueWithId { 
private final int val; 
private final long id; 

public ValueWithId(int val, long id) { 
this.val = val; 
[ http://this.id/ | this.id ] = id; 
} 
public int getVal() { return val; } 
public long getId() { return id; } 
} 

Here is the mapping. At the same time we can drop the barriers, since we no 
longer need them. Note that we need to explicitly set the mapping operator's 
parallelism to 1, since the operator is stateful. 

DataStream<ValueWithId> wrappedStream = 
timedStream.flatMap(new FlatMapFunction<DataItem, ValueWithId>() { 
private long count = 0L; 

@Override 
public void flatMap(DataItem item, Collector<ValueWithId> collector) throws 
Exception { 
if (item instanceof Value) { 
int val = ((Value) item).getVal(); 
collector.collect(new ValueWithId(val, count++)); 
} 
} 
}).setParallelism(1); 

Now we're ready to do the key-based partitioning. A value's key is its id as 
assigned above modulo PARALLELISM. We follow the partitioning by splitting the 
stream into 1 millisecond tumbling windows. Then we simply aggregate the 
partial sums, first for each key separately (and importantly, in parallel), and 
then for each window. 

DataStream<Integer> partialSums = wrappedStream.keyBy(x -> x.getId() % 
PARALLELISM) 
.timeWindow(Time.of(1L, TimeUnit.MILLISECONDS)) 
.aggregate(new AggregateFunction<ValueWithId, Integer, Integer>() { 
@Override 
public Integer createAccumulator() { return 0; } 

@Override 
public Integer add(ValueWithId valueWithId, Integer acc) { return acc + 
valueWithId.getVal(); } 

@Override 
public Integer getResult(Integer acc) { return acc; } 

@Override 
public Integer merge(Integer acc1, Integer acc2) { return acc1 + acc2; } 
}) 
.timeWindowAll(Time.of(1L, TimeUnit.MILLISECONDS)) 
.reduce((x, y) -> x + y); 

Finally, in the original problem I asked for cumulative sums since the start of 
the stream, so we perform the last set of transformations to achieve that. 

DataStream<Integer> cumulativeSums = partialSums 
.windowAll(GlobalWindows.create()) 
.trigger(CountTrigger.of(1)) 
.reduce((x, y) -> x + y); 
cumulativeSums.print().setParallelism(1); 
env.execute(); 
// We should see 1 followed by 3 as output 

I am not completely sure if my usage of state in the timestamp-and-watermark 
assigner and the mapper is correct. Is it possible for Flink to duplicate the 
assigner, move it around and somehow mess up the timestamps? Likewise, is it 
possible for things to go wrong with the mapper? 

Another concern I have is that my key-based partitions depend on the constant 
PARALLELISM. Ideally, the program should be flexible about the parallelism that 
happens to be available during runtime. 

Finally, if anyone notices that I am in any part reinventing the wheel and that 
Flink already has a feature implementing some of the above, or that something 
can be done more elegantly, let me know! 

Best regards, 

Filip 


On Tue, Oct 8, 2019 at 11:12 AM Filip Niksic < [ mailto:fnik...@seas.upenn.edu 
| fnik...@seas.upenn.edu ] > wrote: 

Hi Chesnay, 

Thanks for the reply. While your solution ultimately does use multiple 
partitions, from what I can tell the underlying processing is still sequential. 
Imagine a stream where barriers are quite rare, say a million values is 
followed by a barrier. Then these million values all end up at the same 
partition and are added up sequentially, and while they are being processed, 
the other partitions are waiting for their turn. A truly parallel solution 
would partition the million values, process each partition in parallel to get 
the partial sums, and on each barrier aggregate the partial sums into a total 
sum. 

Filip 


On Tue, Oct 8, 2019 at 9:09 AM Chesnay Schepler < [ mailto:ches...@apache.org | 
ches...@apache.org ] > wrote: 
In other words, you need a way to partition the stream such that a series of 
items followed by a barrier are never interrupted. 

I'm wondering whether you could just apply DataStream#partitionCustom to your 
source: 
public static class BarrierPartitioner implements Partitioner<DataItem> { 

private int currentPartition = 0; 
@Override 
public int partition(DataItem key, int numPartitions) { 
if (key instanceof Barrier) { 
int partitionToReturn = currentPartition; 
currentPartition = (currentPartition + 1) % numPartitions; 
return partitionToReturn; 
} else { 
return currentPartition; 
} 
} 
} 

DataStream<DataItem> stream = ...; 
DataStream<DataItem> partitionedStream = stream.partitionCustom(new 
BarrierPartitioner(), item -> item); 

On 08/10/2019 14:55, Filip Niksic wrote: 
Hi Yun, 

The behavior with increased parallelism should be the same as with no 
parallelism. In other words, for the input from the previous email, the output 
should always be 1, 3, regardless of parallelism. Operationally, the partial 
sums maintained in each subtask should somehow be aggregated before they are 
output. 

To answer the second question, I know that watermarks provide the same 
functionality. Is there some way to convert the input with explicit punctuation 
into one with watermarks? I see there is an interface called 
AssignerWithPunctuatedWatermarks, maybe that's the solution. But I'm not sure 
how this assigner would be used. For example, it could maintain the number of 
previously seen Barriers and assign this number as a watermark to each Value, 
but then this number becomes the state that needs to be shared between multiple 
substreams. Or perhaps the Barriers can somehow be duplicated and sent to each 
substream? Alternatively, is there some notion of event-based windows that 
would be triggered by specific user-defined elements in the stream? In such 
mechanism perhaps the watermarks would be used internally, but they would not 
be explicitly exposed to the user? 

Best regards, 

Filip 


On Tue, Oct 8, 2019 at 2:19 AM Yun Gao < [ mailto:yungao...@aliyun.com | 
yungao...@aliyun.com ] > wrote: 

Hi Filip, 
I have one question on the problem: what is the expected behavior when the 
parallelism of the FlatMapFunction is increased to more than 1? Should each 
subtask maintains the partial sum of all values received, and whenever the 
barrier is received, then it just outputs the partial sum of the received value 
? 

Another question is that I think in Flink the watermark mechanism has provided 
the functionality similar to punctuation, therefore is it possible to implement 
the same logic with the Flink Window directly? 
Best, 
Yun 

------------------------------------------------------------------ 
From:Filip Niksic < [ mailto:fnik...@seas.upenn.edu | fnik...@seas.upenn.edu ] 
> 
Send Time:2019 Oct. 8 (Tue.) 08:56 
To:user < [ mailto:user@flink.apache.org | user@flink.apache.org ] > 
Subject:[QUESTION] How to parallelize with explicit punctuation in Flink? 

Hi all, 
What would be a natural way to implement a parallel version of the following 
Flink program? 
Suppose I have a stream of items of type DataItem with two concrete 
implementations of DataItem: Value and Barrier. Let’s say that Value holds an 
integer value, and Barrier acts as explicit punctuation. 
public interface DataItem {} 
public class Value implements DataItem { 
private final int val; 
public Value(int val) { this.val = val; } 
public int getVal() { return val; } 
} 
public class Barrier implements DataItem {} 
The program should maintain a sum of values seen since the beginning of the 
stream. On each Barrier, the program should output the sum seen so far. 
An obvious way to implement this would be with a FlatMapFunction, maintaining 
the sum as state and emitting it on each Barrier. 
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment(); 
DataStream<DataItem> stream = env.fromElements(DataItem.class, 
new Value(1), new Barrier(), new Value(3), new Value(-1), new Barrier()); 
stream.flatMap(new FlatMapFunction<DataItem, Integer>() { 
private int sum = 0; 
@Override 
public void flatMap(DataItem dataItem, Collector<Integer> collector) throws 
Exception { 
if (dataItem instanceof Value) { 
sum += ((Value) dataItem).getVal(); 
} else { 
collector.collect(sum); 
} 
} 
}).setParallelism(1).print().setParallelism(1); 
env.execute(); 
// We should see 1 followed by 3 as output 
However, such an operator cannot be parallelized, since the order of Values and 
Barriers matters. That’s why I need to set parallelism to 1 above. Is there a 
way to rewrite this to exploit parallelism? 
(Another reason to set parallelism to 1 above is that I’m assuming there is a 
single instance of the FlatMapFunction. A proper implementation would take more 
care in using state. Feel free to comment on that as well.) 

Best regards, 

Filip Niksic 






Reply via email to