Re: flink streaming - window chaining example
val stream:DataStream[String] = env .addSource(new FlinkKafkaConsumer08[String]("topic_name", new SimpleStringSchema, prop)) val event:DataStream[SomeEventObj] = stream.map(MyMapFunction) val tenMinute:DataStream[AggEvents] = ridesByDeviceIdStream.timeWindowAll(Time.of(10, TimeUnit.MINUTES).trigger (ContinuousProcessingTimeTrigger.of(Time.minutes(1))).map(MyMapFunction1) val oneHour = tenMinute.keyBy(_.mykey).TumblingEventTimeWindows.of(Time.minutes(60))).trigger (MyTriggerFunction) Above is pseduo code, may have some syntax errors but is should do what you are looking for. There is dependency on the tenminute window and one hour window function, so one will execute after the other. On Sun, Mar 27, 2016 at 2:20 PM, Chen Bekor wrote: > hi all! > > I'm just starting my way with flink and I have a design question. > > I'm trying to aggregate incoming events (source: kafka topic) on a 10min > tumbling window in order to calculate the incoming events rate (total per > minute). > > I would like to take this window and perform an additional window (60 min) > in order to calculate percentiles, std deviation and some other statistics > on that time window. finally I would like to trigger some business logic in > case the calculation hits a certain threshold. > > my main challenge is - how to chain the two windows together. > > any help is appreciated (please send scala example code - I'm not using > java :) for this project) >
Storm topologies compatibility and exactly-once
hello all I was reading the apache flink documentation and was particularly interested in two things - compatibility with storm api. AFAIU(i need to do more tests), you can reuse storm defined topologies and run them on a Flink cluster. I was wondering if this is "still a bêta" or production ready? I have not looked at the implementation but I also assume there is a kind of bridge between the Storm APIs and the Flink internals so that you only exexute the spout/bolt code and do not inherit the Storm internals, right? - exactly once semantic. I have to say this is a great feature :). I was wondering if this semantic is still available when running a Storm defined topology in a Flink cluster (cf my previous point) Thanks a lot for your help Cheers Olivier
Re: for loop slow
Hi Lydia, To build iterative algorithm on Flink, using API for iterations [1] would be better than using for-loop. Your program triggers multiple executions by multiple calling `next.gap.print()`. In each execution, Flink reads whole data redundantly and it cause performance to decrease. Regards, Chiwan Park [1]: https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/batch/iterations.html > On Mar 27, 2016, at 7:16 AM, Lydia Ickler wrote: > > Hi, > > I have an issue with a for-loop. > If I set the maximal iteration number i to more than 3 it gets stuck and I > cannot figure out why. > With 1, 2 or 3 it runs smoothly. > I attached the code below and marked the loop with //PROBLEM. > > Thanks in advance! > Lydia > > package org.apache.flink.contrib.lifescience.examples; > > import edu.princeton.cs.algs4.Graph; > import edu.princeton.cs.algs4.SymbolDigraph; > import org.apache.flink.api.common.functions.FilterFunction; > import org.apache.flink.api.common.functions.FlatJoinFunction; > import org.apache.flink.api.common.functions.MapFunction; > import org.apache.flink.api.java.DataSet; > import org.apache.flink.api.java.ExecutionEnvironment; > import org.apache.flink.api.java.aggregation.Aggregations; > import org.apache.flink.api.java.io.CsvReader; > import org.apache.flink.api.java.operators.DataSource; > import org.apache.flink.api.java.operators.IterativeDataSet; > import org.apache.flink.api.java.tuple.Tuple2; > import org.apache.flink.api.java.tuple.Tuple3; > import org.apache.flink.contrib.lifescience.networks.algos.DataSetUtils; > import > org.apache.flink.contrib.lifescience.networks.datatypes.networks.Network; > import > org.apache.flink.contrib.lifescience.networks.datatypes.networks.NetworkEdge; > import > org.apache.flink.contrib.lifescience.networks.datatypes.networks.NetworkNode; > import org.apache.flink.core.fs.FileSystem; > import org.apache.flink.util.Collector; > > import java.util.*; > > import static edu.princeton.cs.algs4.GraphGenerator.simple; > > public class PowerIteration { > > //path to input > static String input = null; > //path to output > static String output = null; > //number of iterations (default = 7) > static int iterations = 7; > //threshold > static double delta = 0.01; > > public void run() throws Exception { > ExecutionEnvironment env = > ExecutionEnvironment.getExecutionEnvironment(); > > //read input file > DataSet> matrixA = readMatrix(env, > input); > > DataSet> eigenVector; > DataSet> eigenValue; > > //initial: > //Approximate EigenVector by PowerIteration > eigenVector = PowerIteration_getEigenVector(matrixA); > //Approximate EigenValue by PowerIteration > eigenValue = PowerIteration_getEigenValue(matrixA,eigenVector); > //Deflate original matrix > matrixA = > PowerIteration_getNextMatrix(matrixA,eigenVector,eigenValue); > > MyResult initial = new MyResult(eigenVector,eigenValue,matrixA); > > MyResult next = null; > > //PROBLEM!!! get i eigenvalue gaps > for(int i=0;i<2;i++){ > next = PowerIteration_routine(initial); > initial = next; > next.gap.print(); > } > > env.execute("Power Iteration"); > } > > public static DataSource> > readMatrix(ExecutionEnvironment env, > > String filePath) { > CsvReader csvReader = env.readCsvFile(filePath); > csvReader.fieldDelimiter(","); > csvReader.includeFields("ttt"); > return csvReader.types(Integer.class, Integer.class, Double.class); > } > > public static final class ProjectJoinResultMapper implements > MapFunction, > Tuple3>, > Tuple3> { > @Override > public Tuple3 map( > Tuple2, Tuple3 Integer, Double>> value) > throws Exception { > Integer row = value.f0.f0; > Integer column = value.f1.f1; > Double product = value.f0.f2 * value.f1.f2; > return new Tuple3(row, column, product); > } > } > > public static final class RQ implements > MapFunction, > Tuple3>, > Tuple3> { > > @Override > public Tuple3 map( > Tuple2, Tuple3 Integer, Double>> value) > throws Exception { > > return new Tuple3 Double>(value.f0.f0,value.f0.f1,value.f0.f2/value.f1.f2); > } > } > > public static void main(String[] args) throws Exception { > if(args.length<2 || args.length > 4){ > System.err.println("Usage: PowerIteration path> optional: "); > System.exit(0); > } > > input = args[0]; > output = args[1]; > > if(args.length==3) { >
flink streaming - window chaining example
hi all! I'm just starting my way with flink and I have a design question. I'm trying to aggregate incoming events (source: kafka topic) on a 10min tumbling window in order to calculate the incoming events rate (total per minute). I would like to take this window and perform an additional window (60 min) in order to calculate percentiles, std deviation and some other statistics on that time window. finally I would like to trigger some business logic in case the calculation hits a certain threshold. my main challenge is - how to chain the two windows together. any help is appreciated (please send scala example code - I'm not using java :) for this project)
Re: Memory Leak using ProcessingTimeTimers?
Hi, you are right, this is a problem. In an earlier version we were only setting very few actual timers using the RuntimeContext because a firing timer will trigger all the timers with a lower timestamp that we have stored in the trigger queue. We have to change the lower level trigger service (in StreamTask) to only store one timer per very short time window, so that if the window operator registers thousands of timers for, say, time 15:30:03 it actually only saves one timer. I created a Jira Issue: https://issues.apache.org/jira/browse/FLINK-3669 Cheers, Aljoscha On Thu, 24 Mar 2016 at 11:30 Konstantin Knauf wrote: > Hi everyone, > > we were testing a Flink streaming job (1.0.0), with a GlobalWindow on a > KeyedStream with custom Trigger. > > On each element the trigger function registers a processing time timer > and deletes the currently registered processing time timer. So we are > registering a lot of timers, but also deleting most of them right away. > > The desired functionality is, that the window is purged (and all state > is set to null) after a timeout (last event for this key + timeout). > > The performance tests showed, that after a short time (5mins or so) all > the time went to garbage collection. From the heap dumnps, we can tell > that the problem were retained TriggerTasks (with reference to the > TriggerContext) off all the registered processing time timers. > > The problems seems to be that when deleting the TriggerTasks the > corresponding Callables are not removed form the queue, the > deleteProcessingTimeTimer-method only removes the Timer from the > set/queues of the TriggerContext itself, but not from the RuntimeContext. > > Is this a bug? Are we using ProcessingTimeTimers in a fundamentally > wrong way? If so, is there any other way to achieve the desired > functionality? > > We have a workaround in place now (basically just a timeout starting > with the first element in window instead of the last element in the > window). > > Cheers, > > Konstantin > > -- > Konstantin Knauf * konstantin.kn...@tngtech.com * +49-174-3413182 > TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring > Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke > Sitz: Unterföhring * Amtsgericht München * HRB 135082 >