Hello all,

Currently, I examine the effects of stream partitioning on performance for 
simple state-full scenarios.

My toy application for the rest of my question will be the following: A stream 
of non-negative integers, each one annotated with a timestamp, and the goal is 
to get the top-10 most frequent non-negative integers on tumbling windows of 10 
seconds. In other words, my input is a stream of tuples with two fields, 
Tuple2<Long, Integer>(timestamp, key), where key is the non-negative integer 
value, and timestamp is used to assign each event to a window. The execution 
plan I am considering is to have a first phase (Phase 1), where the stream is 
partitioned and the partial aggregations are processed in parallel (set 
parallelism to N > 1). Afterwards, the second phase (Phase 2) involves 
gathering all partial aggregations on a single node (set parallelism to 1), and 
calculate the full aggregation for each key, order the keys based on windowed 
frequency and outputs the top-10 keys for each window.

As I mentioned earlier, my goal is to compare the performance of different 
partitioning policies on this toy application. Initially, I want to compare 
shuffle-grouping (round-robin) and hash-grouping and then move on to different 
partitioning policies by using Flink's CustomPartitioner API. After reading 
Flink's documentation, I managed to develop the toy application using 
hash-partitioning. Below, I present the different parts of my code:

// Phase 0: input setup
DataStream<Tuple3<Long, Integer, Integer>> stream = env.fromCollection(...)
               .assignTimestampsAndWatermarks(new 
AscendingTimestampExtractor<Tuple2<Long, Integer>>() {
                   @Override
                    public long extractAscendingTimestamp(Tuple2<Long, Integer> 
event) { return event.f0; }
                }).map( (Tuple2<Long, Integer> e) -> new Tuple3<Long, Integer, 
Integer>(e.f0, e.f1, 1));

On Phase 0, I collect the input stream, from an in-memory list, define the 
event timestamp which will be used for windowing, and extend each event with a 
value of 1 for calculating the appearance of each number on every window. 
Afterwards, for the parallel Phase 1, I use hash partitioning by first using 
.keyBy() operation on the key of each tuple (i.e., field 1), followed by a 
.window() operation, to assign each tuple on a different window, and end with a 
.sum(). My code for (parallel) Phase 1 is the following:

// Phase 1: parallel partial sum, with a parallelism of N (N > 1)
DataStream<Tuple3<Long, Integer, Integer> phaseOne = 
stream.keyBy(1).window(TumblingEventTimeWindows.of(Time.seconds(10)).sum(2).setParallelism(N);

Moving on to Phase 2, to aggregate all partial results of a single window in 
one operator for producing the full aggregation, ordering based on frequency, 
and return the top-10 keys, I have the following:

// Phase 2: serial full aggregation and ordering, with a parallelism of 1
DataStream<String> phaseTwo = phaseOne
                .windowAll(TumblingEventTimeWindows.of(Time.seconds(10))
                .apply(new AllWindowsFunction<Tuple3<Long, Integer, Integer>, 
String, TimeWindow>() {
                    @Override
                    public void apply(TimeWindow window, Iterable<Tuple3<Long, 
Integer, Integer>> values, Collector<String> out) throws Exception {
                        ...
                        List<Integer> topTenValues = ...;
                        StringBuilder strBuilder = new StringBuilder();
                        for (Integer t : topTenValues)
                            strBuilder.append(Integer.toString(t) + ",");
                        out.collect(strBuilder.toString());
                    });

The previous code makes use of hash-partitioning for its parallel phase. From 
what I understand, Flink allows the .window() operation only on a KeyedStream. 
Furthermore, the .customPartition() method transforms a DataStream to a 
DataStream (and the same is true for .shuffle() which round-robins events). 
Therefore, I am confused on how I can use a shuffle policy with windows. One 
Idea that came to me is to provide an irrelevant field on the .keyBy() method, 
or define my own KeySelector<IN, KEY> that will simulate shuffle grouping 
through key generation. Unfortunately, I have two concerns regarding the 
previous alternatives: For the keyBy() approach, I need to control the internal 
hashing mechanisms, which entails cherry-picking fields on different workloads 
and performing an exhaustive search on the behavior of different random fields 
(not practical). For the KeySelector<IN, KEY>approach, I need to maintain state 
among different calls of getKey(), which (as far as I know) is not offered by 
the KeySelector<IN, KEY> interface and I do not want to rely on external state 
that will lead to additional overhead. Therefore, my first question is how will 
I be able to effectively use round-robin grouping with windows on my toy 
application?

The bigger point I am trying to address revolves around custom partitioning 
policies and windows in general. My understanding is that the benefit of a 
custom partitioning policy is to have the ability to control the partitioning 
process based on a pre-defined set of resources (e.g., partitions, task slots 
etc.). Hence, I am confused on how I would be able to use partitionCustom() 
followed by .window() on the (parallel) phase one, to test the performance of 
different execution plans (i.e., partitioning policies).

I apologize for the long question, but I believe that I had to provide enough 
details for the points/questions I currently have (highlighted with bold). 
Thank you very much for your time.

Kind Regards,

Nikos R. Katsipoulakis,
Department of Computer Science
University of Pittsburgh

Reply via email to