Re: Custom Partitioning and windowing questions/concerns

2017-01-25 Thread Fabian Hueske
Hi Nikos,

yes, the hash function is not only used for partitioning but also to
organize the key-partitioned state.
My intuition is that the AbstractStreamOperator approach would be easier to
realize, because you don't need to worry about side effects of changing
Flink internals.

Best, Fabian

2017-01-25 18:50 GMT+01:00 Katsipoulakis, Nikolaos Romanos <
kat...@cs.pitt.edu>:

> Hello Fabian,
>
>
>
> Thank you for your response and there is no need for apologies J . As I
> mentioned in my previous email, my wording seemed confusing and it was only
> expected that you had an incomplete picture of my goal. Again, thank you
> for your help and your time.
>
>
>
> Moving on to my plan from this point on, I understand that I might have to
> implement some custom components myself (I prefer conducting my research on
> an actual system over regressing back to an awful simulation). To that end,
> I thought of implementing my own KeyedStream implementation that
> provides the option of using a different StreamPartitioner other than
> the HashPartitioner. This CustomKeyedStream will be triggered by a
> call to a custom method offered by DataStream (let’s say) 
> customKeyBy(int...
> fields, CustomPartitioner) and it will work exactly the same as 
> DataStream.keyBy(int...
> fields), but with the only difference that it will receive a custom
> partitioner instead of using the default hash partitioner. Do you think
> that this plan is feasible? I am not completely sure on whether the
> windowed key state be affected by the design in any way?
>
>
>
> In addition, I will consider your suggestion on extending the
> AbstractStreamOperator and implementing the OneInputStreamOperator. It
> looks like an easier way compared to the one I described above and I will
> try to dive into its implementation details.
>
>
>
> Again, thank you very much for your help and your constructive comments.
>
>
>
> Kind Regards,
>
>
>
> Nikos R. Katsipoulakis,
>
> Department of Computer Science
>
> University of Pittsburgh
>
>
>
> *From:* Fabian Hueske [mailto:fhue...@gmail.com]
> *Sent:* Wednesday, January 25, 2017 12:28 PM
>
> *To:* user@flink.apache.org
> *Subject:* Re: Custom Partitioning and windowing questions/concerns
>
>
>
> Hi Nikos,
>
> you are of course right. I forgot that ProcessFunction requires a
> KeyedStream. Sorry for this advice.
>
> The problem is that you need need to implement some kind of time-based
> function that emits partial counts every 10 seconds.
>
> AFAIK, the DataStream API does not offers built-in operator that gives you
> this except for windows and ProcessFunction.
>
> You could try to implement your own operator by extending
> AbstractStreamOperator and implementing the OneInputStreamOperator
> interface.
> This is a fairly low-level interface but gives you access to record
> timestamps and watermarks. Actually, the DataStream operators are built on
> this interface as well.
>
> A custom operator is applied by calling dataStream.transform().
>
> Best,
>
> Fabian
>
>
>
>
>
>
>
> 2017-01-24 17:18 GMT+01:00 Katsipoulakis, Nikolaos Romanos <
> kat...@cs.pitt.edu>:
>
> Hello Fabian,
>
>
>
> First, I would like to thank you for your suggestion and the additional
> information on determinism and partition policies. As I mentioned on my
> initial email, I am new to Flink and every additional piece of advice makes
> my “learning curve” less steep. In addition, I am aware that you (and
> everyone else who follows this thread) might wonder why am I following this
> unconventional path of performance partitioning, but, I have to inform you
> that my use-case’s goal is of academic nature.
>
>
>
> Turning to your suggestion, I took some time and go over version’s
> 1.2-SNAPSHOT code, and I read the online documentation on the Process
> Function API which I found at: https://ci.apache.org/
> projects/flink/flink-docs-master/dev/stream/process_function.html
> <https://na01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fci.apache.org%2Fprojects%2Fflink%2Fflink-docs-master%2Fdev%2Fstream%2Fprocess_function.html&data=01%7C01%7Ckatsip%40cs.pitt.edu%7C590cb15521f144b6ad4e08d44547a7dd%7C9ef9f489e0a04eeb87cc3a526112fd0d%7C1&sdata=G%2FbyjqsSfQq7Rm11yLbPqxoxPwgN7AQm9gRW5vB8vSw%3D&reserved=0>
> . From my understanding, the process() transformation can be applied only
> on a KeyedStream and not on a DataStream. Therefore, if I wanted to
> use a custom partition algorithm, I would have to first make a call to 
> partitionCustom()
> (DataStream -> DataStream), followed by a keyBy(…) (DataStream
> -> KeyedStream), and finally apply 

RE: Custom Partitioning and windowing questions/concerns

2017-01-25 Thread Katsipoulakis, Nikolaos Romanos
Hello Fabian,

Thank you for your response and there is no need for apologies ☺ . As I 
mentioned in my previous email, my wording seemed confusing and it was only 
expected that you had an incomplete picture of my goal. Again, thank you for 
your help and your time.

Moving on to my plan from this point on, I understand that I might have to 
implement some custom components myself (I prefer conducting my research on an 
actual system over regressing back to an awful simulation). To that end, I 
thought of implementing my own KeyedStream implementation that provides the 
option of using a different StreamPartitioner other than the 
HashPartitioner. This CustomKeyedStream will be triggered by a call to a 
custom method offered by DataStream (let’s say) customKeyBy(int... fields, 
CustomPartitioner) and it will work exactly the same as 
DataStream.keyBy(int... fields), but with the only difference that it will 
receive a custom partitioner instead of using the default hash partitioner. Do 
you think that this plan is feasible? I am not completely sure on whether the 
windowed key state be affected by the design in any way?


In addition, I will consider your suggestion on extending the 
AbstractStreamOperator and implementing the OneInputStreamOperator. It looks 
like an easier way compared to the one I described above and I will try to dive 
into its implementation details.

Again, thank you very much for your help and your constructive comments.

Kind Regards,

Nikos R. Katsipoulakis,
Department of Computer Science
University of Pittsburgh

From: Fabian Hueske [mailto:fhue...@gmail.com]
Sent: Wednesday, January 25, 2017 12:28 PM
To: user@flink.apache.org
Subject: Re: Custom Partitioning and windowing questions/concerns

Hi Nikos,
you are of course right. I forgot that ProcessFunction requires a KeyedStream. 
Sorry for this advice.
The problem is that you need need to implement some kind of time-based function 
that emits partial counts every 10 seconds.
AFAIK, the DataStream API does not offers built-in operator that gives you this 
except for windows and ProcessFunction.
You could try to implement your own operator by extending 
AbstractStreamOperator and implementing the OneInputStreamOperator interface.
This is a fairly low-level interface but gives you access to record timestamps 
and watermarks. Actually, the DataStream operators are built on this interface 
as well.
A custom operator is applied by calling dataStream.transform().
Best,
Fabian



2017-01-24 17:18 GMT+01:00 Katsipoulakis, Nikolaos Romanos 
mailto:kat...@cs.pitt.edu>>:
Hello Fabian,

First, I would like to thank you for your suggestion and the additional 
information on determinism and partition policies. As I mentioned on my initial 
email, I am new to Flink and every additional piece of advice makes my 
“learning curve” less steep. In addition, I am aware that you (and everyone 
else who follows this thread) might wonder why am I following this 
unconventional path of performance partitioning, but, I have to inform you that 
my use-case’s goal is of academic nature.

Turning to your suggestion, I took some time and go over version’s 1.2-SNAPSHOT 
code, and I read the online documentation on the Process Function API which I 
found at: 
https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/process_function.html<https://na01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fci.apache.org%2Fprojects%2Fflink%2Fflink-docs-master%2Fdev%2Fstream%2Fprocess_function.html&data=01%7C01%7Ckatsip%40cs.pitt.edu%7C590cb15521f144b6ad4e08d44547a7dd%7C9ef9f489e0a04eeb87cc3a526112fd0d%7C1&sdata=G%2FbyjqsSfQq7Rm11yLbPqxoxPwgN7AQm9gRW5vB8vSw%3D&reserved=0>
 . From my understanding, the process() transformation can be applied only on a 
KeyedStream and not on a DataStream. Therefore, if I wanted to use a 
custom partition algorithm, I would have to first make a call to 
partitionCustom() (DataStream -> DataStream), followed by a keyBy(…) 
(DataStream -> KeyedStream), and finally apply my first pre-aggregation 
step (i.e., call to process()). Concretely, my code would turn to something 
like the following:
// Phase 1: parallel partial sum, with a parallelism of N (N > 1)
DataStream phaseOne = stream
.partitionCustom(new CustomPartitioner(...)) // or .rebalance() 
or .shuffle()
.keyBy(1)
.process(new CustomProcessFunction(..., Time.seconds(10),...))
.sum(2).setParallelism(N);

Unfortunately, you can understand that the above would be problematic for two 
reasons: First, a call to keyBy() defeats the purpose of a custom partitioner, 
because stream will be (ultimately) partitioned based on the keys and not on my 
CustomPartitioner.selectChannels() method. Second, using process() does not 
solve my problem, because the issue with my use-case is to avoid calling 
keyBy(). If I could do that, then I might as well call window()and not use the 
process A

Re: Custom Partitioning and windowing questions/concerns

2017-01-25 Thread Fabian Hueske
Hi Nikos,

you are of course right. I forgot that ProcessFunction requires a
KeyedStream. Sorry for this advice.
The problem is that you need need to implement some kind of time-based
function that emits partial counts every 10 seconds.
AFAIK, the DataStream API does not offers built-in operator that gives you
this except for windows and ProcessFunction.

You could try to implement your own operator by extending
AbstractStreamOperator and implementing the OneInputStreamOperator
interface.
This is a fairly low-level interface but gives you access to record
timestamps and watermarks. Actually, the DataStream operators are built on
this interface as well.
A custom operator is applied by calling dataStream.transform().

Best,
Fabian



2017-01-24 17:18 GMT+01:00 Katsipoulakis, Nikolaos Romanos <
kat...@cs.pitt.edu>:

> Hello Fabian,
>
>
>
> First, I would like to thank you for your suggestion and the additional
> information on determinism and partition policies. As I mentioned on my
> initial email, I am new to Flink and every additional piece of advice makes
> my “learning curve” less steep. In addition, I am aware that you (and
> everyone else who follows this thread) might wonder why am I following this
> unconventional path of performance partitioning, but, I have to inform you
> that my use-case’s goal is of academic nature.
>
>
>
> Turning to your suggestion, I took some time and go over version’s
> 1.2-SNAPSHOT code, and I read the online documentation on the Process
> Function API which I found at: https://ci.apache.org/
> projects/flink/flink-docs-master/dev/stream/process_function.html . From
> my understanding, the process() transformation can be applied only on a
> KeyedStream and not on a DataStream. Therefore, if I wanted to use
> a custom partition algorithm, I would have to first make a call to 
> partitionCustom()
> (DataStream -> DataStream), followed by a keyBy(…) (DataStream
> -> KeyedStream), and finally apply my first pre-aggregation step
> (i.e., call to process()). Concretely, my code would turn to something
> like the following:
>
> // Phase 1: parallel partial sum, with a parallelism of N (N > 1)
>
> DataStream phaseOne = stream
>
> .partitionCustom(new CustomPartitioner(...)) // or
> .rebalance() or .shuffle()
>
> *.keyBy(1)*
>
> .process(new CustomProcessFunction(...,
> Time.seconds(10),...))
>
> .sum(2).setParallelism(N);
>
>
>
> Unfortunately, you can understand that the above would be problematic for
> two reasons: First, a call to keyBy() defeats the purpose of a custom
> partitioner, because stream will be (ultimately) partitioned based on the
> keys and not on my CustomPartitioner.selectChannels() method. Second,
> using process() does not solve my problem, because the issue with my
> use-case is to avoid calling keyBy(). If I could do that, then I might as
> well call window()and not use the process API in the first place. To be
> more precise, if I could use a KeyedStream, then I could do the
> following:
>
>
>
> // Phase 1: parallel partial sum, with a parallelism of N (N > 1)
>
> DataStream phaseOne = stream
>
> .partitionCustom(new CustomPartitioner(...))
>
> *.keyBy(1)*
>
> *.window*(TumblingEventTimeWindows.of(Time.seconds(10))
>
> .sum(2).setParallelism(N);
>
>
>
> Therefore, I don’t think using a Process Function would solve my problem.
> Am I understanding your suggestion correctly? If yes, I would be grateful
> if you could explain to me in more detail. On top of that, after reading my
> initial email again, I believe that the intentions for my use-case were not
> quite clear. Please, do not hesitate to ask me for any clarifications.
>
>
>
> Again, thank you very much for your interest and your time.
>
>
>
> Kind Regards,
>
>
>
> Nikos R. Katsipoulakis,
>
> Department of Computer Science
>
> University of Pittsburgh
>
>
>
> *From:* Fabian Hueske [mailto:fhue...@gmail.com]
> *Sent:* Tuesday, January 24, 2017 5:15 AM
> *To:* user@flink.apache.org
> *Subject:* Re: Custom Partitioning and windowing questions/concerns
>
>
>
> Hi Nikos,
>
> Flink's windows require a KeyedStream because they use the keys to manage
> their internal state (each in-progress window has some state that needs to
> be persisted and checkpointed).
>
> Moreover, Flink's event-time window operators return a deterministic
> result. In your use-case, the result of the pre-aggregation (phase 1)
> should not deterministic because it would depend on the partitioning of the
> input.
>
> I would suggest to implem

RE: Custom Partitioning and windowing questions/concerns

2017-01-24 Thread Katsipoulakis, Nikolaos Romanos
Hello Fabian,

First, I would like to thank you for your suggestion and the additional 
information on determinism and partition policies. As I mentioned on my initial 
email, I am new to Flink and every additional piece of advice makes my 
“learning curve” less steep. In addition, I am aware that you (and everyone 
else who follows this thread) might wonder why am I following this 
unconventional path of performance partitioning, but, I have to inform you that 
my use-case’s goal is of academic nature.

Turning to your suggestion, I took some time and go over version’s 1.2-SNAPSHOT 
code, and I read the online documentation on the Process Function API which I 
found at: 
https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/process_function.html
 . From my understanding, the process() transformation can be applied only on a 
KeyedStream and not on a DataStream. Therefore, if I wanted to use a 
custom partition algorithm, I would have to first make a call to 
partitionCustom() (DataStream -> DataStream), followed by a keyBy(…) 
(DataStream -> KeyedStream), and finally apply my first pre-aggregation 
step (i.e., call to process()). Concretely, my code would turn to something 
like the following:
// Phase 1: parallel partial sum, with a parallelism of N (N > 1)
DataStream phaseOne = stream
.partitionCustom(new CustomPartitioner(...)) // or .rebalance() 
or .shuffle()
.keyBy(1)
.process(new CustomProcessFunction(..., Time.seconds(10),...))
.sum(2).setParallelism(N);

Unfortunately, you can understand that the above would be problematic for two 
reasons: First, a call to keyBy() defeats the purpose of a custom partitioner, 
because stream will be (ultimately) partitioned based on the keys and not on my 
CustomPartitioner.selectChannels() method. Second, using process() does not 
solve my problem, because the issue with my use-case is to avoid calling 
keyBy(). If I could do that, then I might as well call window()and not use the 
process API in the first place. To be more precise, if I could use a 
KeyedStream, then I could do the following:

// Phase 1: parallel partial sum, with a parallelism of N (N > 1)
DataStream phaseOne = stream
.partitionCustom(new CustomPartitioner(...))
.keyBy(1)
.window(TumblingEventTimeWindows.of(Time.seconds(10))
.sum(2).setParallelism(N);

Therefore, I don’t think using a Process Function would solve my problem. Am I 
understanding your suggestion correctly? If yes, I would be grateful if you 
could explain to me in more detail. On top of that, after reading my initial 
email again, I believe that the intentions for my use-case were not quite 
clear. Please, do not hesitate to ask me for any clarifications.

Again, thank you very much for your interest and your time.

Kind Regards,

Nikos R. Katsipoulakis,
Department of Computer Science
University of Pittsburgh

From: Fabian Hueske [mailto:fhue...@gmail.com]
Sent: Tuesday, January 24, 2017 5:15 AM
To: user@flink.apache.org
Subject: Re: Custom Partitioning and windowing questions/concerns

Hi Nikos,
Flink's windows require a KeyedStream because they use the keys to manage their 
internal state (each in-progress window has some state that needs to be 
persisted and checkpointed).
Moreover, Flink's event-time window operators return a deterministic result. In 
your use-case, the result of the pre-aggregation (phase 1) should not 
deterministic because it would depend on the partitioning of the input.
I would suggest to implement the pre-aggregation not with a window but with a 
ProcessFunction (available in Flink 1.2-SNAPSHOT which will be release soon).
ProcessFunction allows you to register timers which can be used to emit results 
every 10 seconds.
Hope this helps,
Fabian


2017-01-23 17:50 GMT+01:00 Katsipoulakis, Nikolaos Romanos 
mailto:kat...@cs.pitt.edu>>:
Hello all,

Currently, I examine the effects of stream partitioning on performance for 
simple state-full scenarios.

My toy application for the rest of my question will be the following: A stream 
of non-negative integers, each one annotated with a timestamp, and the goal is 
to get the top-10 most frequent non-negative integers on tumbling windows of 10 
seconds. In other words, my input is a stream of tuples with two fields, 
Tuple2(timestamp, key), where key is the non-negative integer 
value, and timestamp is used to assign each event to a window. The execution 
plan I am considering is to have a first phase (Phase 1), where the stream is 
partitioned and the partial aggregations are processed in parallel (set 
parallelism to N > 1). Afterwards, the second phase (Phase 2) involves 
gathering all partial aggregations on a single node (set parallelism to 1), and 
calculate the full aggregation for each key, order the keys based on windowed 
frequency and outputs the top-10 keys for each window.

As 

Re: Custom Partitioning and windowing questions/concerns

2017-01-24 Thread Fabian Hueske
Hi Nikos,

Flink's windows require a KeyedStream because they use the keys to manage
their internal state (each in-progress window has some state that needs to
be persisted and checkpointed).
Moreover, Flink's event-time window operators return a deterministic
result. In your use-case, the result of the pre-aggregation (phase 1)
should not deterministic because it would depend on the partitioning of the
input.

I would suggest to implement the pre-aggregation not with a window but with
a ProcessFunction (available in Flink 1.2-SNAPSHOT which will be release
soon).
ProcessFunction allows you to register timers which can be used to emit
results every 10 seconds.

Hope this helps,
Fabian



2017-01-23 17:50 GMT+01:00 Katsipoulakis, Nikolaos Romanos <
kat...@cs.pitt.edu>:

> Hello all,
>
>
>
> Currently, I examine the effects of stream partitioning on performance for
> simple state-full scenarios.
>
>
>
> My toy application for the rest of my question will be the following: A
> stream of non-negative integers, each one annotated with a timestamp, and
> the goal is to get the top-10 most frequent non-negative integers on
> tumbling windows of 10 seconds. In other words, my input is a stream of
> tuples with two fields, Tuple2(timestamp, key), where key
> is the non-negative integer value, and timestamp is used to assign each
> event to a window. The execution plan I am considering is to have a *first
> phase (Phase 1)*, where the stream is partitioned and the partial
> aggregations are processed in parallel (set parallelism to N > 1).
> Afterwards, the *second phase (Phase 2)* involves gathering all partial
> aggregations on a single node (set parallelism to 1), and calculate the
> full aggregation for each key, order the keys based on windowed frequency
> and outputs the top-10 keys for each window.
>
>
>
> As I mentioned earlier, my goal is to compare the performance of different
> partitioning policies on this toy application. Initially, I want to compare
> shuffle-grouping (round-robin) and hash-grouping and then move on to
> different partitioning policies by using Flink’s CustomPartitioner API.
> After reading Flink’s documentation, I managed to develop the toy
> application using hash-partitioning. Below, I present the different parts
> of my code:
>
>
>
> // Phase 0: input setup
>
> DataStream> stream = env.fromCollection(…)
>
>.assignTimestampsAndWatermarks(new
> AscendingTimestampExtractor>() {
>
>@Override
>
> public long extractAscendingTimestamp(Tuple2 Integer> event) { return event.f0; }
>
> }).map( (Tuple2 e) -> new Tuple3 Integer, Integer>(e.f0, e.f1, 1));
>
>
>
> On Phase 0, I collect the input stream, from an in-memory list, define the
> event timestamp which will be used for windowing, and extend each event
> with a value of 1 for calculating the appearance of each number on every
> window. Afterwards, for the parallel Phase 1, I use hash partitioning by
> first using .keyBy() operation on the key of each tuple (i.e., field 1),
> followed by a .window() operation, to assign each tuple on a different
> window, and end with a .sum(). My code for (parallel) Phase 1 is the
> following:
>
>
>
> // Phase 1: parallel partial sum, with a parallelism of N (N > 1)
>
> DataStream phaseOne =
> stream.keyBy(1).window(TumblingEventTimeWindows.of(
> Time.seconds(10)).sum(2).setParallelism(N);
>
>
>
> Moving on to Phase 2, to aggregate all partial results of a single window
> in one operator for producing the full aggregation, ordering based on
> frequency, and return the top-10 keys, I have the following:
>
>
>
> // Phase 2: serial full aggregation and ordering, with a parallelism of 1
>
> DataStream phaseTwo = phaseOne
>
> .windowAll(TumblingEventTimeWindows.of(Time.seconds(10))
>
> .apply(new AllWindowsFunction Integer>, String, TimeWindow>() {
>
> @Override
>
> public void apply(TimeWindow window,
> Iterable> values, Collector out)
> throws Exception {
>
> ...
>
> List topTenValues = ...;
>
> StringBuilder strBuilder = new StringBuilder();
>
> for (Integer t : topTenValues)
>
> strBuilder.append(Integer.toString(t) + “,”);
>
> out.collect(strBuilder.toString());
>
> });
>
>
>
> The previous code makes use of hash-partitioning for its parallel phase.
> From what I understand, Flink allows the .window() operation only on a
> KeyedStream. Furthermore, the .customPartition() method transforms a
> DataStream to a DataStream (and the same is true for .shuffle() which
> round-robins events). Therefore, *I am confused on how I can use a
> shuffle policy with windows*. One Idea that came to me is to provide an
> irrelevant field on the .keyBy() method, or define my own KeySelector KEY> that will simulate sh