Re: flink streaming - window chaining example

2016-03-27 Thread Balaji Rajagopalan
val stream:DataStream[String] = env
  .addSource(new FlinkKafkaConsumer08[String]("topic_name", new
SimpleStringSchema, prop))

val event:DataStream[SomeEventObj] = stream.map(MyMapFunction)

val tenMinute:DataStream[AggEvents] =
ridesByDeviceIdStream.timeWindowAll(Time.of(10,
TimeUnit.MINUTES).trigger


(ContinuousProcessingTimeTrigger.of(Time.minutes(1))).map(MyMapFunction1)

val oneHour = 
tenMinute.keyBy(_.mykey).TumblingEventTimeWindows.of(Time.minutes(60))).trigger
(MyTriggerFunction)


Above is pseduo code, may have some syntax errors but is should do
what you are looking for. There is dependency on the

tenminute window and one hour window function, so one will execute
after the other.


On Sun, Mar 27, 2016 at 2:20 PM, Chen Bekor  wrote:

> hi all!
>
> I'm just starting my way with flink and I have a design question.
>
> I'm trying to aggregate incoming events (source: kafka topic) on a 10min
> tumbling window in order to calculate the incoming events rate (total per
> minute).
>
> I would like to take this window and perform an additional window (60 min)
> in order to calculate percentiles, std deviation and some other statistics
> on that time window. finally I would like to trigger some business logic in
> case the calculation hits a certain threshold.
>
> my main challenge is - how to chain the two windows together.
>
> any help is appreciated (please send scala example code - I'm not using
> java :) for this project)
>


Storm topologies compatibility and exactly-once

2016-03-27 Thread Olivier Mallassi
hello all

I was reading the apache flink documentation and was particularly
interested in two things
- compatibility with storm api. AFAIU(i need to do more tests), you can
reuse storm defined topologies and run them on a Flink cluster. I was
wondering if this is "still a bêta" or production ready? I have not looked
at the implementation but I also assume there is a kind of bridge between
the Storm APIs and the Flink internals so that you only exexute the
spout/bolt code and do not inherit the Storm internals, right?
- exactly once semantic. I have to say this is a great feature :). I was
wondering if this semantic is still available when running a Storm defined
topology in a Flink cluster (cf my previous point)

Thanks a lot for your help

Cheers

Olivier


Re: for loop slow

2016-03-27 Thread Chiwan Park
Hi Lydia,

To build iterative algorithm on Flink, using API for iterations [1] would be 
better than using for-loop. Your program triggers multiple executions by 
multiple calling `next.gap.print()`. In each execution, Flink reads whole data 
redundantly and it cause performance to decrease.

Regards,
Chiwan Park

[1]: 
https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/batch/iterations.html

> On Mar 27, 2016, at 7:16 AM, Lydia Ickler  wrote:
> 
> Hi,
> 
> I have an issue with a for-loop.
> If I set the maximal iteration number i to more than 3 it gets stuck and I 
> cannot figure out why.
> With 1, 2 or 3 it runs smoothly.
> I attached the code below and marked the loop with //PROBLEM.
> 
> Thanks in advance!
> Lydia
> 
> package org.apache.flink.contrib.lifescience.examples;
> 
> import edu.princeton.cs.algs4.Graph;
> import edu.princeton.cs.algs4.SymbolDigraph;
> import org.apache.flink.api.common.functions.FilterFunction;
> import org.apache.flink.api.common.functions.FlatJoinFunction;
> import org.apache.flink.api.common.functions.MapFunction;
> import org.apache.flink.api.java.DataSet;
> import org.apache.flink.api.java.ExecutionEnvironment;
> import org.apache.flink.api.java.aggregation.Aggregations;
> import org.apache.flink.api.java.io.CsvReader;
> import org.apache.flink.api.java.operators.DataSource;
> import org.apache.flink.api.java.operators.IterativeDataSet;
> import org.apache.flink.api.java.tuple.Tuple2;
> import org.apache.flink.api.java.tuple.Tuple3;
> import org.apache.flink.contrib.lifescience.networks.algos.DataSetUtils;
> import 
> org.apache.flink.contrib.lifescience.networks.datatypes.networks.Network;
> import 
> org.apache.flink.contrib.lifescience.networks.datatypes.networks.NetworkEdge;
> import 
> org.apache.flink.contrib.lifescience.networks.datatypes.networks.NetworkNode;
> import org.apache.flink.core.fs.FileSystem;
> import org.apache.flink.util.Collector;
> 
> import java.util.*;
> 
> import static edu.princeton.cs.algs4.GraphGenerator.simple;
> 
> public class PowerIteration {
> 
> //path to input
> static String input = null;
> //path to output
> static String output = null;
> //number of iterations (default = 7)
> static int iterations = 7;
> //threshold
> static double delta = 0.01;
> 
> public void run() throws Exception {
> ExecutionEnvironment env = 
> ExecutionEnvironment.getExecutionEnvironment();
> 
> //read input file
> DataSet> matrixA = readMatrix(env, 
> input);
> 
> DataSet> eigenVector;
> DataSet> eigenValue;
> 
> //initial:
> //Approximate EigenVector by PowerIteration
> eigenVector = PowerIteration_getEigenVector(matrixA);
> //Approximate EigenValue by PowerIteration
> eigenValue = PowerIteration_getEigenValue(matrixA,eigenVector);
> //Deflate original matrix
> matrixA = 
> PowerIteration_getNextMatrix(matrixA,eigenVector,eigenValue);
> 
> MyResult initial = new MyResult(eigenVector,eigenValue,matrixA);
> 
> MyResult next = null;
> 
> //PROBLEM!!! get i eigenvalue gaps
> for(int i=0;i<2;i++){
> next = PowerIteration_routine(initial);
> initial = next;
> next.gap.print();
> }
> 
> env.execute("Power Iteration");
> }
> 
> public static DataSource> 
> readMatrix(ExecutionEnvironment env,
>   
> String filePath) {
> CsvReader csvReader = env.readCsvFile(filePath);
> csvReader.fieldDelimiter(",");
> csvReader.includeFields("ttt");
> return csvReader.types(Integer.class, Integer.class, Double.class);
> }
> 
> public static final class ProjectJoinResultMapper implements
> MapFunction,
> Tuple3>,
> Tuple3> {
> @Override
> public Tuple3 map(
> Tuple2, Tuple3 Integer, Double>> value)
> throws Exception {
> Integer row = value.f0.f0;
> Integer column = value.f1.f1;
> Double product = value.f0.f2 * value.f1.f2;
> return new Tuple3(row, column, product);
> }
> }
> 
> public static final class RQ implements
> MapFunction, 
> Tuple3>,
> Tuple3> {
> 
> @Override
> public Tuple3 map(
> Tuple2, Tuple3 Integer, Double>> value)
> throws Exception {
> 
> return new Tuple3 Double>(value.f0.f0,value.f0.f1,value.f0.f2/value.f1.f2);
> }
> }
> 
> public static void main(String[] args) throws Exception {
> if(args.length<2 || args.length > 4){
> System.err.println("Usage: PowerIteration   path> optional:  ");
> System.exit(0);
> }
> 
> input = args[0];
> output = args[1];
> 
> if(args.length==3) {
>

flink streaming - window chaining example

2016-03-27 Thread Chen Bekor
hi all!

I'm just starting my way with flink and I have a design question.

I'm trying to aggregate incoming events (source: kafka topic) on a 10min
tumbling window in order to calculate the incoming events rate (total per
minute).

I would like to take this window and perform an additional window (60 min)
in order to calculate percentiles, std deviation and some other statistics
on that time window. finally I would like to trigger some business logic in
case the calculation hits a certain threshold.

my main challenge is - how to chain the two windows together.

any help is appreciated (please send scala example code - I'm not using
java :) for this project)


Re: Memory Leak using ProcessingTimeTimers?

2016-03-27 Thread Aljoscha Krettek
Hi,
you are right, this is a problem. In an earlier version we were only
setting very few actual timers using the RuntimeContext because a firing
timer will trigger all the timers with a lower timestamp that we have
stored in the trigger queue. We have to change the lower level trigger
service (in StreamTask) to only store one timer per very short time window,
so that if the window operator registers thousands of timers for, say, time
15:30:03 it actually only saves one timer.

I created a Jira Issue: https://issues.apache.org/jira/browse/FLINK-3669

Cheers,
Aljoscha

On Thu, 24 Mar 2016 at 11:30 Konstantin Knauf 
wrote:

> Hi everyone,
>
> we were testing a Flink streaming job (1.0.0), with a GlobalWindow on a
> KeyedStream with custom Trigger.
>
> On each element the trigger function registers a processing time timer
> and deletes the currently registered processing time timer. So we are
> registering a lot of timers, but also deleting most of them right away.
>
> The desired functionality is, that the window is purged (and all state
> is set to null) after a timeout (last event for this key + timeout).
>
> The performance tests showed, that after a short time (5mins or so) all
> the time went to garbage collection. From the heap dumnps, we can tell
> that the problem were retained TriggerTasks (with reference to the
> TriggerContext) off all the registered processing time timers.
>
> The problems seems to be that when deleting the TriggerTasks the
> corresponding Callables are not removed form the queue, the
> deleteProcessingTimeTimer-method only removes the Timer from the
> set/queues of the TriggerContext itself, but not from the RuntimeContext.
>
> Is this a bug? Are we using ProcessingTimeTimers in a fundamentally
> wrong way? If so, is there any other way to achieve the desired
> functionality?
>
> We have a workaround in place now (basically just a timeout starting
> with the first element in window instead of the last element in the
> window).
>
> Cheers,
>
> Konstantin
>
> --
> Konstantin Knauf * konstantin.kn...@tngtech.com * +49-174-3413182
> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
> Sitz: Unterföhring * Amtsgericht München * HRB 135082
>