Hi Nikos,

you are of course right. I forgot that ProcessFunction requires a
KeyedStream. Sorry for this advice.
The problem is that you need need to implement some kind of time-based
function that emits partial counts every 10 seconds.
AFAIK, the DataStream API does not offers built-in operator that gives you
this except for windows and ProcessFunction.

You could try to implement your own operator by extending
AbstractStreamOperator and implementing the OneInputStreamOperator
interface.
This is a fairly low-level interface but gives you access to record
timestamps and watermarks. Actually, the DataStream operators are built on
this interface as well.
A custom operator is applied by calling dataStream.transform().

Best,
Fabian



2017-01-24 17:18 GMT+01:00 Katsipoulakis, Nikolaos Romanos <
kat...@cs.pitt.edu>:

> Hello Fabian,
>
>
>
> First, I would like to thank you for your suggestion and the additional
> information on determinism and partition policies. As I mentioned on my
> initial email, I am new to Flink and every additional piece of advice makes
> my “learning curve” less steep. In addition, I am aware that you (and
> everyone else who follows this thread) might wonder why am I following this
> unconventional path of performance partitioning, but, I have to inform you
> that my use-case’s goal is of academic nature.
>
>
>
> Turning to your suggestion, I took some time and go over version’s
> 1.2-SNAPSHOT code, and I read the online documentation on the Process
> Function API which I found at: https://ci.apache.org/
> projects/flink/flink-docs-master/dev/stream/process_function.html . From
> my understanding, the process() transformation can be applied only on a
> KeyedStream<T> and not on a DataStream<T>. Therefore, if I wanted to use
> a custom partition algorithm, I would have to first make a call to 
> partitionCustom()
> (DataStream<T> -> DataStream<T>), followed by a keyBy(…) (DataStream<T>
> -> KeyedStream<T>), and finally apply my first pre-aggregation step
> (i.e., call to process()). Concretely, my code would turn to something
> like the following:
>
> // Phase 1: parallel partial sum, with a parallelism of N (N > 1)
>
> DataStream<Tuple3<Long, Integer, Integer> phaseOne = stream
>
>                 .partitionCustom(new CustomPartitioner(...)) // or
> .rebalance() or .shuffle()
>
>                 *.keyBy(1)*
>
>                 .process(new CustomProcessFunction(...,
> Time.seconds(10),...))
>
>                 .sum(2).setParallelism(N);
>
>
>
> Unfortunately, you can understand that the above would be problematic for
> two reasons: First, a call to keyBy() defeats the purpose of a custom
> partitioner, because stream will be (ultimately) partitioned based on the
> keys and not on my CustomPartitioner.selectChannels() method. Second,
> using process() does not solve my problem, because the issue with my
> use-case is to avoid calling keyBy(). If I could do that, then I might as
> well call window()and not use the process API in the first place. To be
> more precise, if I could use a KeyedStream<T>, then I could do the
> following:
>
>
>
> // Phase 1: parallel partial sum, with a parallelism of N (N > 1)
>
> DataStream<Tuple3<Long, Integer, Integer> phaseOne = stream
>
>                 .partitionCustom(new CustomPartitioner(...))
>
>                 *.keyBy(1)*
>
>                 *.window*(TumblingEventTimeWindows.of(Time.seconds(10))
>
>                 .sum(2).setParallelism(N);
>
>
>
> Therefore, I don’t think using a Process Function would solve my problem.
> Am I understanding your suggestion correctly? If yes, I would be grateful
> if you could explain to me in more detail. On top of that, after reading my
> initial email again, I believe that the intentions for my use-case were not
> quite clear. Please, do not hesitate to ask me for any clarifications.
>
>
>
> Again, thank you very much for your interest and your time.
>
>
>
> Kind Regards,
>
>
>
> Nikos R. Katsipoulakis,
>
> Department of Computer Science
>
> University of Pittsburgh
>
>
>
> *From:* Fabian Hueske [mailto:fhue...@gmail.com]
> *Sent:* Tuesday, January 24, 2017 5:15 AM
> *To:* user@flink.apache.org
> *Subject:* Re: Custom Partitioning and windowing questions/concerns
>
>
>
> Hi Nikos,
>
> Flink's windows require a KeyedStream because they use the keys to manage
> their internal state (each in-progress window has some state that needs to
> be persisted and checkpointed).
>
> Moreover, Flink's event-time window operators return a deterministic
> result. In your use-case, the result of the pre-aggregation (phase 1)
> should not deterministic because it would depend on the partitioning of the
> input.
>
> I would suggest to implement the pre-aggregation not with a window but
> with a ProcessFunction (available in Flink 1.2-SNAPSHOT which will be
> release soon).
>
> ProcessFunction allows you to register timers which can be used to emit
> results every 10 seconds.
>
> Hope this helps,
>
> Fabian
>
>
>
>
>
> 2017-01-23 17:50 GMT+01:00 Katsipoulakis, Nikolaos Romanos <
> kat...@cs.pitt.edu>:
>
> Hello all,
>
>
>
> Currently, I examine the effects of stream partitioning on performance for
> simple state-full scenarios.
>
>
>
> My toy application for the rest of my question will be the following: A
> stream of non-negative integers, each one annotated with a timestamp, and
> the goal is to get the top-10 most frequent non-negative integers on
> tumbling windows of 10 seconds. In other words, my input is a stream of
> tuples with two fields, Tuple2<Long, Integer>(timestamp, key), where key
> is the non-negative integer value, and timestamp is used to assign each
> event to a window. The execution plan I am considering is to have a *first
> phase (Phase 1)*, where the stream is partitioned and the partial
> aggregations are processed in parallel (set parallelism to N > 1).
> Afterwards, the *second phase (Phase 2)* involves gathering all partial
> aggregations on a single node (set parallelism to 1), and calculate the
> full aggregation for each key, order the keys based on windowed frequency
> and outputs the top-10 keys for each window.
>
>
>
> As I mentioned earlier, my goal is to compare the performance of different
> partitioning policies on this toy application. Initially, I want to compare
> shuffle-grouping (round-robin) and hash-grouping and then move on to
> different partitioning policies by using Flink’s CustomPartitioner API.
> After reading Flink’s documentation, I managed to develop the toy
> application using hash-partitioning. Below, I present the different parts
> of my code:
>
>
>
> // Phase 0: input setup
>
> DataStream<Tuple3<Long, Integer, Integer>> stream = env.fromCollection(…)
>
>                .assignTimestampsAndWatermarks(new
> AscendingTimestampExtractor<Tuple2<Long, Integer>>() {
>
>                    @Override
>
>                     public long extractAscendingTimestamp(Tuple2<Long,
> Integer> event) { return event.f0; }
>
>                 }).map( (Tuple2<Long, Integer> e) -> new Tuple3<Long,
> Integer, Integer>(e.f0, e.f1, 1));
>
>
>
> On Phase 0, I collect the input stream, from an in-memory list, define the
> event timestamp which will be used for windowing, and extend each event
> with a value of 1 for calculating the appearance of each number on every
> window. Afterwards, for the parallel Phase 1, I use hash partitioning by
> first using .keyBy() operation on the key of each tuple (i.e., field 1),
> followed by a .window() operation, to assign each tuple on a different
> window, and end with a .sum(). My code for (parallel) Phase 1 is the
> following:
>
>
>
> // Phase 1: parallel partial sum, with a parallelism of N (N > 1)
>
> DataStream<Tuple3<Long, Integer, Integer> phaseOne =
> stream.keyBy(1).window(TumblingEventTimeWindows.of(
> Time.seconds(10)).sum(2).setParallelism(N);
>
>
>
> Moving on to Phase 2, to aggregate all partial results of a single window
> in one operator for producing the full aggregation, ordering based on
> frequency, and return the top-10 keys, I have the following:
>
>
>
> // Phase 2: serial full aggregation and ordering, with a parallelism of 1
>
> DataStream<String> phaseTwo = phaseOne
>
>                 .windowAll(TumblingEventTimeWindows.of(Time.seconds(10))
>
>                 .apply(new AllWindowsFunction<Tuple3<Long, Integer,
> Integer>, String, TimeWindow>() {
>
>                     @Override
>
>                     public void apply(TimeWindow window,
> Iterable<Tuple3<Long, Integer, Integer>> values, Collector<String> out)
> throws Exception {
>
>                         ...
>
>                         List<Integer> topTenValues = ...;
>
>                         StringBuilder strBuilder = new StringBuilder();
>
>                         for (Integer t : topTenValues)
>
>                             strBuilder.append(Integer.toString(t) + “,”);
>
>                         out.collect(strBuilder.toString());
>
>                     });
>
>
>
> The previous code makes use of hash-partitioning for its parallel phase.
> From what I understand, Flink allows the .window() operation only on a
> KeyedStream. Furthermore, the .customPartition() method transforms a
> DataStream to a DataStream (and the same is true for .shuffle() which
> round-robins events). Therefore, *I am confused on how I can use a
> shuffle policy with windows*. One Idea that came to me is to provide an
> irrelevant field on the .keyBy() method, or define my own KeySelector<IN,
> KEY> that will simulate shuffle grouping through key generation.
> Unfortunately, I have two concerns regarding the previous alternatives: For
> the keyBy() approach, I need to control the internal hashing mechanisms,
> which entails cherry-picking fields on different workloads and performing
> an exhaustive search on the behavior of different random fields (not
> practical). For the KeySelector<IN, KEY>approach, I need to maintain
> state among different calls of getKey(), which (as far as I know) is not
> offered by the KeySelector<IN, KEY> interface and I do not want to rely
> on external state that will lead to additional overhead. Therefore, *my
> first question is how will I be able to effectively use round-robin
> grouping with windows on my toy application?*
>
>
>
> The bigger point I am trying to address revolves around custom
> partitioning policies and windows in general. My understanding is that the
> benefit of a custom partitioning policy is to have the ability to control
> the partitioning process based on a pre-defined set of resources (e.g.,
> partitions, task slots etc.). Hence, *I am confused on how I would be
> able to use **partitionCustom() followed by **.window() on the (parallel)
> phase one, to test the performance of different execution plans (i.e.,
> partitioning policies).*
>
>
>
> I apologize for the long question, but I believe that I had to provide
> enough details for the points/questions I currently have (highlighted with
> bold). Thank you very much for your time.
>
>
>
> Kind Regards,
>
>
>
> Nikos R. Katsipoulakis,
>
> Department of Computer Science
>
> University of Pittsburgh
>
>
>
>
>

Reply via email to