Re: FlinkCEP 1.3 scala, cannot apply select function
Thank you for your quick response. That worked and compiled but another error came up. On runtime it gives the following error: java.lang.ClassCastException: MyEventType cannot be cast to scala.collection.IterableLike The error is at line val startEvent = pattern.get("first").get.head of myFunction. -- View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/FlinkCEP-1-3-scala-cannot-apply-select-function-tp13824p13828.html Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
FlinkCEP 1.3 scala, cannot apply select function
Hello I have created a simple pattern with FlinkCEP 1.3 as well as a simple pattern select function. My simple function is as follows: def myFunction(pattern: Map[String,Iterable[MyEventType]]): MyEventType = { val startEvent = pattern.get("first").get.head val endEvent = pattern.get("second").get.head // dummy functionality for illustrating purposes endEvent } When I apply the function above to a pattern in the following way: CEP.pattern(myKeyedStream,myDummyPattern).select(myFunction(_)) it gives the following error: Cannot resolve reference myFunction with such signature. Type mismatch, expected: scala.Predef.Map[scala.Predef.String,scala.Iterable[MyEventType]], actual: scala.collection.Map[scala.Predef.String,scala.Iterable[MyEventType]] What is the reason of this behavior? -- View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/FlinkCEP-1-3-scala-cannot-apply-select-function-tp13824.html Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
Re: FlinkCEP latency/throughput
Hello Kostas, thanks for your response. Regarding throughput, it makes sense. But there is still one question remaining. How can I measure the latency of my FlinkCEP application ??? Maybe you answered it, but I didn`t quite get that. As far as your number 2 question about measuring latency, the answer is yes, the first element in the matching pattern will wait inevitably longer than the last one Thank you for your time!!! -- View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/FlinkCEP-latency-throughput-tp13170p13221.html Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
FlinkCEP latency/throughput
Hello everyone, I am testing some patterns with FlinkCEP and I want to measure latency and throughput when using 1 or more processing cores. How can I do that ?? What I have done so far: Latency: Each time an event arrives I store the system time (System.currentTimeMillis). When flink calls the select function which means we have a full pattern match, again I take the system time. The difference of the system time taken from the first event of the complex event and the system time taken when the function is called is the latency for now. Throughput: I divide the total number of the events of the dataset by the time taken to complete the experiment. -- View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/FlinkCEP-latency-throughput-tp13170.html Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
Re: Windows emit results at the end of the stream
I have prepared a small dummy dataset (data.txt) as follows: Hello|5 Hi|15 WordsWithoutMeaning|25 AnotherWord|34 HelloWorld|46 HelloPlanet|67 HelloFlinkUsers|89 HelloProgrammers|98 DummyPhrase|105 AnotherDummy|123 And below is the code: import org.apache.flink.api.java.io.TextInputFormat import org.apache.flink.core.fs.Path import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.functions.source.FileProcessingMode import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.api.scala.function.WindowFunction import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.streaming.api.windowing.windows.TimeWindow import org.apache.flink.util.Collector val parallelism = 8 // sliding step (ms) val slidingStep = 30 // window size (ms) val windowSize = 30 // start the streaming environment val env = StreamExecutionEnvironment.getExecutionEnvironment env.getConfig.disableSysoutLogging // set the degree of parallelism env.setParallelism(parallelism) // set the time characteristic env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val inputFormat = new TextInputFormat(new Path("data.txt")) env.readFile(inputFormat,"data.txt", FileProcessingMode.PROCESS_CONTINUOUSLY,1L) .map{ element => val partsOfElement = element.split("[|]") (partsOfElement.head,partsOfElement.last.toLong) }.assignAscendingTimestamps(_._2) .keyBy(_._1) .timeWindow(Time.milliseconds(windowSize),Time.milliseconds(slidingStep)) .apply(new Test) env.execute And the test class is the following: class Test extends WindowFunction[(String,Long),String,String,TimeWindow] { override def apply(key: String, window: TimeWindow, input: Iterable[(String, Long)], out: Collector[String]): Unit = { println(s"$key -- ${window.getEnd}") out.collect(input.head._1) } } Each window result is simply the first element from the iterable and when the window is processed it prints the key with the end time of the window. If we set the parallelism to 8 as above, it does nothing. If we decrease the parallelism to 4, it only emits results from the first window. You can run the above code and test it yourself. -- View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Windows-emit-results-at-the-end-of-the-stream-tp12337p12433.html Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
Re: Windows emit results at the end of the stream
The degree of parallelism in the experiments I mentioned is 8. If I decrease the parallelism it emits more results. If I set the parallelism to 1 then it emits results from the entire dataset (i.e., it behaves as expected). What could be the reason of this? -- View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Windows-emit-results-at-the-end-of-the-stream-tp12337p12410.html Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
Re: Windows emit results at the end of the stream
Hello Till, Yes elements do have a timestamp associated which is parsed in the first map function. Yes, indeed if all timestamps lie within 1 hour the triggering will happen after the complete file has been read. I had wrong window size and sliding step for a dataset I was testing (I tested it in different datasets). Now, since I solved the non-triggering from the beginning, one problem remains in all my tests. Assume that I have a dataset of 50 hours of events/elements. The triggering happens as expected but now when it reaches the 6th hour it stops and do not continue (all parallel operators remain idle). Thanx, Sonex -- View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Windows-emit-results-at-the-end-of-the-stream-tp12337p12403.html Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
Re: Windows emit results at the end of the stream
Thank you for your response Yassine, I forgot to mention that I use the Scala API. In Scala the equivalent code is: val inputFormat = new TextInputFormat(new Path("file/to/read.txt")) env.readFile(inputFormat,"file/to/read.txt", FileProcessingMode.PROCESS_CONTINUOUSLY,1L) Am I correct? But I noticed a weird behavior now. Sometimes, it never starts to process the elements of the file and sometimes it stops at the middle of the file without processing the rest of it. Why does that happen? -- View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Windows-emit-results-at-the-end-of-the-stream-tp12337p12356.html Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
Windows emit results at the end of the stream
Hi everyone, I am using a simple window computation on a stream with event time. The code looks like this: streamData.readTextFile(...) .map(...) .assignAscendingTimestamps(_.timestamp) .keyBy(_.id) .timeWindow(Time.seconds(3600),Time.seconds(3600)) .apply(new MyWindowFunction) .map(...) By monitoring the memory usage and the flink web dashboard, I noticed that flink applies the window function until the entire stream finishes (thus storing all aggregations in memory) and then continues to the map transformation. What I would expect is emission of window results to the map transformation as soon as results of the window are ready. Can anyone explain this behavior? -- View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Windows-emit-results-at-the-end-of-the-stream-tp12337.html Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
Re: load balancing of keys to operators
Thanx for your response. When using time windows, doesn`t flink know the load per window? I have observed this behavior in windows as well. -- View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/load-balancing-of-keys-to-operators-tp12303p12308.html Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
load balancing of keys to operators
I am using a simple streaming job where I use keyBy on the stream to process events per key. The keys may vary in number (few keys to thousands). I have noticed a behavior of Flink and I need clarification on that. When we use keyBy on the stream, flink assigns keys to parallel operators so each operator can handle events per key independently. Once a key is assigned to an operator, can the key change the operator on which it is assigned? From what I`ve seen the answer is no. For example, let`s assume that keys 1 and 2 are assigned to operator A and keys 3 and 4 are assigned to operator B. If there is a burst of data for key 1 at some later time point, but keys 2,3 and 4 have only few data will key 2 be assigned to operator B to balance the load? If not is there a way to do that? And again if not, why flink does not do that? -- View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/load-balancing-of-keys-to-operators-tp12303.html Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
Re: 回复:Transfer information from one window to the next
I solved the state you were talking about. The solution would like like this (similar to what you wrote): stream.keyBy(...).timeWindow(...) .apply(new WindowFunction() { public void apply(K key, W window, Iterable elements, Collector out) { out.collect(new Tuple3<>(key, window, elements); }) .keyBy(0)// use the same key as the windows .mapWitState(...) // process the windows with shared information -- View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Transfer-information-from-one-window-to-the-next-tp11738p11805.html Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
Re: 回复:回复:Transfer information from one window to the next
Hi and thank you for your response, is it possible to give me a simple example? How can I put the variable into a state and then access the state to the next apply function? I am new to flink. Thank you. -- View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Transfer-information-from-one-window-to-the-next-tp11738p11766.html Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
Re: Is a new window created for each key/group?
Yes, you are correct. A window will be created for each key/group and then you can apply a function, or aggregate elements per key. -- View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Is-a-new-window-created-for-each-key-group-tp11745p11746.html Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
Re: 回复:Transfer information from one window to the next
I don`t think you understood the question correctly. I do not care about information between windows at the same time (i.e., start of window = 0, end of window 3600). I want to pass a variable, let`s say for key 1, from the apply function of window 0-3600 to the apply function of window 3600-7200, for key 1. -- View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Transfer-information-from-one-window-to-the-next-tp11738p11739.html Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
Transfer information from one window to the next
val stream = inputStream.assignAscendingTimestamps(_.eventTime).keyBy(_.inputKey).timeWindow(Time.seconds(3600),Time.seconds(3600)) stream.apply{...} Given the above example I want to transfer information (variables and values) from the current apply function to the apply function of the next window (of course with the same key). How can I do that? -- View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Transfer-information-from-one-window-to-the-next-tp11737.html Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
Re: Cartesian product over windows
Hi Till, when you say parallel windows, what do you mean? Do you mean the use of timeWindowAll which has all the elements of a window in a single task? -- View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Cartesian-product-over-windows-tp11676p11716.html Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.