Re: FlinkCEP 1.3 scala, cannot apply select function

2017-06-19 Thread Sonex
Thank you for your quick response. That worked and compiled but another error
came up. On runtime it gives the following error:

java.lang.ClassCastException: MyEventType cannot be cast to
scala.collection.IterableLike

The error is at line

val startEvent = pattern.get("first").get.head

of myFunction.



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/FlinkCEP-1-3-scala-cannot-apply-select-function-tp13824p13828.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


FlinkCEP 1.3 scala, cannot apply select function

2017-06-19 Thread Sonex
Hello I have created a simple pattern with FlinkCEP 1.3 as well as a simple
pattern select function. My simple function is as follows:

def myFunction(pattern: Map[String,Iterable[MyEventType]]): MyEventType = {
val startEvent = pattern.get("first").get.head
val endEvent = pattern.get("second").get.head
// dummy functionality for illustrating purposes
endEvent
}

When I apply the function above to a pattern in the following way:

CEP.pattern(myKeyedStream,myDummyPattern).select(myFunction(_)) it gives the
following error:

Cannot resolve reference myFunction with such signature.

Type mismatch, expected:
scala.Predef.Map[scala.Predef.String,scala.Iterable[MyEventType]], actual:
scala.collection.Map[scala.Predef.String,scala.Iterable[MyEventType]]

What is the reason of this behavior?





--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/FlinkCEP-1-3-scala-cannot-apply-select-function-tp13824.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: FlinkCEP latency/throughput

2017-05-19 Thread Sonex
Hello Kostas,

thanks for your response. Regarding throughput, it makes sense.

But there is still one question remaining. How can I measure the latency of
my FlinkCEP application ???

Maybe you answered it, but I didn`t quite get that. As far as your number 2
question about measuring latency, the answer is yes, the first element in
the matching pattern will wait inevitably longer than the last one

Thank you for your time!!!



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/FlinkCEP-latency-throughput-tp13170p13221.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


FlinkCEP latency/throughput

2017-05-16 Thread Sonex
Hello everyone,

I am testing some patterns with FlinkCEP and I want to measure latency and
throughput when using 1 or more processing cores. How can I do that ??

What I have done so far:
Latency: Each time an event arrives I store the system time
(System.currentTimeMillis). When flink calls the select function which means
we have a full pattern match, again I take the system time. The difference
of the system time taken from the first event of the complex event and the
system time taken when the function is called is the latency for now.

Throughput: I divide the total number of the events of the dataset by the
time taken to complete the experiment.



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/FlinkCEP-latency-throughput-tp13170.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: Windows emit results at the end of the stream

2017-03-28 Thread Sonex
I have prepared a small dummy dataset (data.txt) as follows:

Hello|5
Hi|15
WordsWithoutMeaning|25
AnotherWord|34
HelloWorld|46
HelloPlanet|67
HelloFlinkUsers|89
HelloProgrammers|98
DummyPhrase|105
AnotherDummy|123

And below is the code:

import org.apache.flink.api.java.io.TextInputFormat
import org.apache.flink.core.fs.Path
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.source.FileProcessingMode
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.WindowFunction
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector

val parallelism = 8
// sliding step (ms)
val slidingStep = 30
// window size (ms)
val windowSize = 30

// start the streaming environment
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.getConfig.disableSysoutLogging
// set the degree of parallelism
env.setParallelism(parallelism)
// set the time characteristic
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

val inputFormat = new TextInputFormat(new Path("data.txt"))
env.readFile(inputFormat,"data.txt",
FileProcessingMode.PROCESS_CONTINUOUSLY,1L)
  .map{
element =>
  val partsOfElement = element.split("[|]")
  (partsOfElement.head,partsOfElement.last.toLong)
  }.assignAscendingTimestamps(_._2)
  .keyBy(_._1)
 
.timeWindow(Time.milliseconds(windowSize),Time.milliseconds(slidingStep))
  .apply(new Test)

env.execute


And the test class is the following:

class Test extends WindowFunction[(String,Long),String,String,TimeWindow] {
override def apply(key: String, window: TimeWindow, input:
Iterable[(String, Long)], out: Collector[String]): Unit = {
  println(s"$key -- ${window.getEnd}")
  out.collect(input.head._1)
}
  }


Each window result is simply the first element from the iterable and when
the window is processed it prints the key with the end time of the window.
If we set the parallelism to 8 as above, it does nothing. If we decrease the
parallelism to 4, it only emits results from the first window. You can run
the above code and test it yourself.



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Windows-emit-results-at-the-end-of-the-stream-tp12337p12433.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: Windows emit results at the end of the stream

2017-03-27 Thread Sonex
The degree of parallelism in the experiments I mentioned is 8. If I decrease
the parallelism it emits more results. If I set the parallelism to 1 then it
emits results from the entire dataset (i.e., it behaves as expected). What
could be the reason of this?



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Windows-emit-results-at-the-end-of-the-stream-tp12337p12410.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: Windows emit results at the end of the stream

2017-03-24 Thread Sonex
Hello Till,

Yes elements do have a timestamp associated which is parsed in the first map
function.

Yes, indeed if all timestamps lie within 1 hour the triggering will happen
after the complete file has been read. I had wrong window size and sliding
step for a dataset I was testing (I tested it in different datasets).

Now, since I solved the non-triggering from the beginning, one problem
remains in all my tests. Assume that I have a dataset of 50 hours of
events/elements. The triggering happens as expected but now when it reaches
the 6th hour it stops and do not continue (all parallel operators remain
idle).

Thanx,
Sonex



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Windows-emit-results-at-the-end-of-the-stream-tp12337p12403.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: Windows emit results at the end of the stream

2017-03-23 Thread Sonex
Thank you for your response Yassine,

I forgot to mention that I use the Scala API. In Scala the equivalent code
is:

val inputFormat = new TextInputFormat(new Path("file/to/read.txt"))
env.readFile(inputFormat,"file/to/read.txt",
FileProcessingMode.PROCESS_CONTINUOUSLY,1L)

Am I correct?

But I noticed a weird behavior now. Sometimes, it never starts to process
the elements of the file and sometimes it stops at the middle of the file
without processing the rest of it. Why does that happen?



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Windows-emit-results-at-the-end-of-the-stream-tp12337p12356.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Windows emit results at the end of the stream

2017-03-23 Thread Sonex
Hi everyone,

I am using a simple window computation on a stream with event time. The code
looks like this:

streamData.readTextFile(...)
.map(...)
.assignAscendingTimestamps(_.timestamp)
.keyBy(_.id)
.timeWindow(Time.seconds(3600),Time.seconds(3600))
.apply(new MyWindowFunction)
.map(...)

By monitoring the memory usage and the flink web dashboard, I noticed that
flink applies the window function until the entire stream finishes (thus
storing all aggregations in memory) and then continues to the map
transformation. What I would expect is emission of window results to the map
transformation as soon as results of the window are ready.

Can anyone explain this behavior?



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Windows-emit-results-at-the-end-of-the-stream-tp12337.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: load balancing of keys to operators

2017-03-20 Thread Sonex
Thanx for your response.

When using time windows, doesn`t flink know the load per window? I have
observed this behavior in windows as well.



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/load-balancing-of-keys-to-operators-tp12303p12308.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


load balancing of keys to operators

2017-03-20 Thread Sonex
I am using a simple streaming job where I use keyBy on the stream to process
events per key. The keys may vary in number (few keys to thousands). I have
noticed a behavior of Flink and I need clarification on that. When we use
keyBy on the stream, flink assigns keys to parallel operators so each
operator can handle events per key independently. Once a key is assigned to
an operator, can the key change the operator on which it is assigned? From
what I`ve seen the answer is no.

For example, let`s assume that keys 1 and 2 are assigned to operator A and
keys 3 and 4 are assigned to operator B. If there is a burst of data for key
1 at some later time point, but keys 2,3 and 4 have only few data will key 2
be assigned to operator B to balance the load? If not is there a way to do
that? And again if not, why flink does not do that? 



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/load-balancing-of-keys-to-operators-tp12303.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: 回复:Transfer information from one window to the next

2017-02-22 Thread Sonex
I solved the state you were talking about.

The solution would like like this (similar to what you wrote):

stream.keyBy(...).timeWindow(...)
.apply(new WindowFunction() {
public void apply(K key, W window, Iterable elements,
Collector out) {
out.collect(new Tuple3<>(key, window, elements);
})
.keyBy(0)// use the same key as the windows
.mapWitState(...) // process the windows with shared information





--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Transfer-information-from-one-window-to-the-next-tp11738p11805.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: 回复:回复:Transfer information from one window to the next

2017-02-21 Thread Sonex
Hi and thank you for your response,

is it possible to give me a simple example? How can I put the variable into
a state and then access the state to the next apply function?

I am new to flink.

Thank you.



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Transfer-information-from-one-window-to-the-next-tp11738p11766.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: Is a new window created for each key/group?

2017-02-20 Thread Sonex
Yes, you are correct. A window will be created for each key/group and then
you can apply a function, or aggregate elements per key.



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Is-a-new-window-created-for-each-key-group-tp11745p11746.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: 回复:Transfer information from one window to the next

2017-02-20 Thread Sonex
I don`t think you understood the question correctly. I do not care about
information between windows at the same time (i.e., start of window = 0, end
of window 3600). I want to pass a variable, let`s say for key 1, from the
apply function of window 0-3600 to the apply function of window 3600-7200,
for key 1.



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Transfer-information-from-one-window-to-the-next-tp11738p11739.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Transfer information from one window to the next

2017-02-20 Thread Sonex
val stream =
inputStream.assignAscendingTimestamps(_.eventTime).keyBy(_.inputKey).timeWindow(Time.seconds(3600),Time.seconds(3600))

stream.apply{...}

Given the above example I want to transfer information (variables and
values) from the current apply function to the apply function of the next
window (of course with the same key).

How can I do that?



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Transfer-information-from-one-window-to-the-next-tp11737.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: Cartesian product over windows

2017-02-17 Thread Sonex
Hi Till,

when you say parallel windows, what do you mean? Do you mean the use of
timeWindowAll which has all the elements of a window in a single task?



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Cartesian-product-over-windows-tp11676p11716.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.