Re: How to run a flink wordcount program

2017-08-17 Thread Chao Wang

The following quickstart offers an end-to-end instruction I think:

https://ci.apache.org/projects/flink/flink-docs-release-1.3/quickstart/setup_quickstart.html

Chao


On 08/17/2017 08:25 AM, P. Ramanjaneya Reddy wrote:



On Thu, Aug 17, 2017 at 6:42 PM, P. Ramanjaneya Reddy 
> wrote:


Hi ALL,

I'm new to flink and understanding the flink wordcount program.

so downloaded the git hub
https://github.com/apache/flink.git


Can somebody help how to run wordcount example?

Thanks
Ramanjaneya






Re: Flink multithreading, CoFlatMapFunction, CoProcessFunction, internal state

2017-08-16 Thread Chao Wang

Thank you! Nico. That helps me a lot!

2a) That really clarifies my understanding about Flink. Yes, I think I 
have used static references, since I invoked a native function 
(implemented through JNI) which I believe only has one instance per 
process. And I guess the reason why those Java synchronization 
mechanisms were in vain is because of separate function objects at 
runtime, which results in separate lock objects. Now I use c++ mutex 
within the native function and it resolves my case.


BTW, could you elaborate a bit more about what do you mean by 
"per-record base"? what do you mean by a record?


3) I do not intend to store the CoProcessFunction.Context. I was just 
wondering that since the document said it is only valid during the 
invocation, for maintaining custom states of my program logic I guess I 
cannot use it.



Thank you,
Chao


On 08/16/2017 03:31 AM, Nico Kruber wrote:

Hi Chao,

1) regarding the difference of CoFlatMapFunction/CoProcessFunction, let me
quote the javadoc of the CoProcessFunction:

"Contrary to the {@link CoFlatMapFunction}, this function can also query the
time (both event and processing) and set timers, through the provided {@link
Context}. When reacting to the firing of set timers the function can emit yet
more elements."

So, imho, both deliver a different level of abstraction and control (high- vs.
low-level). Also note the different methods available for you to implement.

2a) In general, Flink calls functions on a per-record base in a serialized
fashion per task. For each task at a TaskManager, in case of it having
multiple slots, separate function objects are used where you should only get
in trouble if you share static references. Otherwise you do not need to worry
about thread-safety.

2b) From what I see in the code (StreamTwoInputProcessor), the same should
apply to CoFlatMapFunction and CoProcessFunction so that calls to flatMap1/2
and processElement1/2 are not called in parallel!

3) why would you want to store the CoProcessFunction.Context?


Nico

On Monday, 14 August 2017 18:36:38 CEST Chao Wang wrote:

Hi,

I'd like to know if CoFlatMapFunction/CoProcessFunction is thread-safe,
and to what extent? What's the difference between the two Functions? and
in general, how does Flink prevent race conditions? Here's my case:

I tried to condition on two input streams and produce the third stream
if the condition is met. I implemented CoFlatMapFunction and tried to
monitor a state using a field in the implemented class (I want to
isolate my application from the checkpointing feature, and therefore I
do not use the states as documented here
https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/state
.html). The field served as a flag indicating whether there are some pending
data from either input stream, and if yes, processing it along with the
arriving data from the other input stream (the processing invokes a native
function).

But then I got double free error and segmentation fault, which I believe
was due to unintentional concurrent access to the native function. Then
I tried to wrap the access into a synchronized method, as well as
explicitly lock/unlock the flatMap1/flatMap2 methods, but all in vain
and the error remained.

I considered using CoProcessFunction in my case, but seems to me that it
does not handle customary internal states, stating in the javadoc "The
context [CoProcessFunction.Context] is only valid during the invocation
of this method, do not store it."



Thanks,
Chao




Flink multithreading, CoFlatMapFunction, CoProcessFunction, internal state

2017-08-14 Thread Chao Wang

Hi,

I'd like to know if CoFlatMapFunction/CoProcessFunction is thread-safe, 
and to what extent? What's the difference between the two Functions? and 
in general, how does Flink prevent race conditions? Here's my case:


I tried to condition on two input streams and produce the third stream 
if the condition is met. I implemented CoFlatMapFunction and tried to 
monitor a state using a field in the implemented class (I want to 
isolate my application from the checkpointing feature, and therefore I 
do not use the states as documented here 
https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/state.html). 
The field served as a flag indicating whether there are some pending 
data from either input stream, and if yes, processing it along with the 
arriving data from the other input stream (the processing invokes a 
native function).


But then I got double free error and segmentation fault, which I believe 
was due to unintentional concurrent access to the native function. Then 
I tried to wrap the access into a synchronized method, as well as 
explicitly lock/unlock the flatMap1/flatMap2 methods, but all in vain 
and the error remained.


I considered using CoProcessFunction in my case, but seems to me that it 
does not handle customary internal states, stating in the javadoc "The 
context [CoProcessFunction.Context] is only valid during the invocation 
of this method, do not store it."




Thanks,
Chao


Re: Experiencing long latency while using sockets

2017-08-09 Thread Chao Wang
It seems that the observed long latencies were due to certain one-time 
internal mechanism that only occurred after Flink has received the first 
message. Based on my measurement that mechanism took around 100 ms.


Now I setup my application the following way, and I observed that the 
end-to-end latency is similar to that of using raw sockets (off by less 
than 1 ms): Send the first message to Flink and then wait for 110 ms 
before sending the second message. And for the subsequent sends we can 
remove the 110 ms wait.



Chao

On 08/09/2017 10:57 AM, Chao Wang wrote:


Thank you, Fabian.

Maybe there's also some buffers sit between data source and the first 
operator? I observed that in my implementation of SourceFunction 
(using a socket server, as listed in the previous email), for 
receiving two messages, in terms of event time, it took 0.2 ms before 
the SourceFunction receives the first message but then it took 97 ms 
to receive the second message. The interval between the two sends is 
0.07 ms at the sending side, which is a java socket client.


Or could it be that there is a timeout setting for scheduling data 
source in Flink?



Thanks,

Chao


On 08/08/2017 02:58 AM, Fabian Hueske wrote:
One pointer is the StreamExecutionEnvironment.setBufferTimeout() 
parameter.
Flink's network stack collects records in buffers to send them over 
the network. A buffer is sent when it is completely filled or after a 
configurable timeout.
So if your program does not process many records, these records might 
"get stuck" in the buffers and be emitted after the timeout flushes 
the buffer.

The default timeout is 100ms. Try to reduce it.

Best, Fabian

2017-08-08 1:06 GMT+02:00 Chao Wang <chaow...@wustl.edu 
<mailto:chaow...@wustl.edu>>:


Following the original post, I've tried stripping down my Flink
app to only the following, and then it still exhibits long
latencies: after the second source socket write, it took 90+
milliseconds from data source to the socket-front in Flink. I
would like to ask for pointers about how to investigate the
latency issue like this, and in general how to properly benchmark
Flink latencies. Thank you very much!


The main method:


  public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
DataStream inEventGroupStream = env.addSource(new
SocketEventGroupStreamFunction(6065, 512));
inEventGroupStream.writeToSocket("DestHost", 6066, new
MySeGroup());
env.execute("event processing");
 }


where all the custom classes are as follows (for
serialization/deserialization and socket server functionality):


  public static class MySeGroup implements
SerializationSchema {

@Override
public byte[] serialize(EventGroup arg0) {
  int tLength = EKFFFTAES.getSizeTimepoint();
  //Note: report error if tLength != arg0.getT().length
  if (tLength != arg0.getT().length) {
System.out.println ("Serialization error: Timepoint size
discrepancy.");
System.out.println ("tLength = " + tLength);
System.out.println ("arg0.getT().length = " +
arg0.getT().length);
  }
  byte[] buffer = new byte[1 + arg0.getT().length +
arg0.getP().length];
  buffer[0] = arg0.type;
  System.arraycopy(arg0.getT(), 0, buffer, 1, tLength);
  System.arraycopy(arg0.getP(), 0, buffer, 1 + tLength,
arg0.getP().length);
  return buffer;
}
  }

  public static class Event extends
SimpleImmutableEntry<byte[],byte[]> {

Event(byte[] timestamp, byte[] payload){
  super(timestamp, payload);
}
public byte[] getT() { // get the timestamp
  return getKey();
}
public byte[] getP() { // get the payload
  return getValue();
}
  }

  public static class EventGroup extends Event {
public byte type;
EventGroup(byte type, byte[] timestamp, byte[] payload){
  super(timestamp, payload);
  this.type = type;
}
  }


  public static class SocketEventGroupStreamFunction implements
SourceFunction {

private transient ServerSocket serverSocket;
private int serverPort;
private int dataLength;
private byte[] inbuf;
private byte[] timestamp;
private byte[] payload;
private int tLength = EKFFFTAES.getSizeTimepoint();
private volatile boolean isRunning = true;

public SocketEventGroupStreamFunction(int port, int length) {
  serverPort = port;
  dataLength = length;
  inbuf = new byte[1 + dataLength + tLength];
  timestamp = new byte[tLength];
  payload = new byte[dataLength];
}

Re: Experiencing long latency while using sockets

2017-08-09 Thread Chao Wang

Thank you, Fabian.

Maybe there's also some buffers sit between data source and the first 
operator? I observed that in my implementation of SourceFunction (using 
a socket server, as listed in the previous email), for receiving two 
messages, in terms of event time, it took 0.2 ms before the 
SourceFunction receives the first message but then it took 97 ms to 
receive the second message. The interval between the two sends is 0.07 
ms at the sending side, which is a java socket client.


Or could it be that there is a timeout setting for scheduling data 
source in Flink?



Thanks,

Chao


On 08/08/2017 02:58 AM, Fabian Hueske wrote:
One pointer is the StreamExecutionEnvironment.setBufferTimeout() 
parameter.
Flink's network stack collects records in buffers to send them over 
the network. A buffer is sent when it is completely filled or after a 
configurable timeout.
So if your program does not process many records, these records might 
"get stuck" in the buffers and be emitted after the timeout flushes 
the buffer.

The default timeout is 100ms. Try to reduce it.

Best, Fabian

2017-08-08 1:06 GMT+02:00 Chao Wang <chaow...@wustl.edu 
<mailto:chaow...@wustl.edu>>:


Following the original post, I've tried stripping down my Flink
app to only the following, and then it still exhibits long
latencies: after the second source socket write, it took 90+
milliseconds from data source to the socket-front in Flink. I
would like to ask for pointers about how to investigate the
latency issue like this, and in general how to properly benchmark
Flink latencies. Thank you very much!


The main method:


  public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
DataStream inEventGroupStream = env.addSource(new
SocketEventGroupStreamFunction(6065, 512));
inEventGroupStream.writeToSocket("DestHost", 6066, new
MySeGroup());
env.execute("event processing");
 }


where all the custom classes are as follows (for
serialization/deserialization and socket server functionality):


  public static class MySeGroup implements
SerializationSchema {

@Override
public byte[] serialize(EventGroup arg0) {
  int tLength = EKFFFTAES.getSizeTimepoint();
  //Note: report error if tLength != arg0.getT().length
  if (tLength != arg0.getT().length) {
System.out.println ("Serialization error: Timepoint size
discrepancy.");
System.out.println ("tLength = " + tLength);
System.out.println ("arg0.getT().length = " +
arg0.getT().length);
  }
  byte[] buffer = new byte[1 + arg0.getT().length +
arg0.getP().length];
  buffer[0] = arg0.type;
  System.arraycopy(arg0.getT(), 0, buffer, 1, tLength);
  System.arraycopy(arg0.getP(), 0, buffer, 1 + tLength,
arg0.getP().length);
  return buffer;
}
  }

  public static class Event extends
SimpleImmutableEntry<byte[],byte[]> {

Event(byte[] timestamp, byte[] payload){
  super(timestamp, payload);
}
public byte[] getT() { // get the timestamp
  return getKey();
}
public byte[] getP() { // get the payload
  return getValue();
}
  }

  public static class EventGroup extends Event {
public byte type;
EventGroup(byte type, byte[] timestamp, byte[] payload){
  super(timestamp, payload);
  this.type = type;
}
  }


  public static class SocketEventGroupStreamFunction implements
SourceFunction {

private transient ServerSocket serverSocket;
private int serverPort;
private int dataLength;
private byte[] inbuf;
private byte[] timestamp;
private byte[] payload;
private int tLength = EKFFFTAES.getSizeTimepoint();
private volatile boolean isRunning = true;

public SocketEventGroupStreamFunction(int port, int length) {
  serverPort = port;
  dataLength = length;
  inbuf = new byte[1 + dataLength + tLength];
  timestamp = new byte[tLength];
  payload = new byte[dataLength];
}

@Override
public void run(SourceContext ctx) throws Exception {
  while(isRunning) {
serverSocket = new ServerSocket(serverPort, 100,
InetAddress.getByName("192.168.1.13"));
serverSocket.setSoTimeout(100);
System.out.println("Waiting for incoming connections on
port " +
  serverSocket.getLocalPort() + "...");
Socket server = serverSocket.accept();

System.out.println("Just connected to " +
server.getRemoteSocketAddre

Re: Experiencing long latency while using sockets

2017-08-07 Thread Chao Wang
Following the original post, I've tried stripping down my Flink app to 
only the following, and then it still exhibits long latencies: after the 
second source socket write, it took 90+ milliseconds from data source to 
the socket-front in Flink. I would like to ask for pointers about how to 
investigate the latency issue like this, and in general how to properly 
benchmark Flink latencies. Thank you very much!



The main method:


  public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
DataStream inEventGroupStream = env.addSource(new 
SocketEventGroupStreamFunction(6065, 512));
inEventGroupStream.writeToSocket("DestHost", 6066, new 
MySeGroup());

env.execute("event processing");
 }


where all the custom classes are as follows (for 
serialization/deserialization and socket server functionality):



  public static class MySeGroup implements 
SerializationSchema {


@Override
public byte[] serialize(EventGroup arg0) {
  int tLength = EKFFFTAES.getSizeTimepoint();
  //Note: report error if tLength != arg0.getT().length
  if (tLength != arg0.getT().length) {
System.out.println ("Serialization error: Timepoint size 
discrepancy.");

System.out.println ("tLength = " + tLength);
System.out.println ("arg0.getT().length = " + arg0.getT().length);
  }
  byte[] buffer = new byte[1 + arg0.getT().length + 
arg0.getP().length];

  buffer[0] = arg0.type;
  System.arraycopy(arg0.getT(), 0, buffer, 1, tLength);
  System.arraycopy(arg0.getP(), 0, buffer, 1 + tLength, 
arg0.getP().length);

  return buffer;
}
  }

  public static class Event extends SimpleImmutableEntry<byte[],byte[]> {

Event(byte[] timestamp, byte[] payload){
  super(timestamp, payload);
}
public byte[] getT() { // get the timestamp
  return getKey();
}
public byte[] getP() { // get the payload
  return getValue();
}
  }

  public static class EventGroup extends Event {
public byte type;
EventGroup(byte type, byte[] timestamp, byte[] payload){
  super(timestamp, payload);
  this.type = type;
}
  }


  public static class SocketEventGroupStreamFunction implements 
SourceFunction {


private transient ServerSocket serverSocket;
private int serverPort;
private int dataLength;
private byte[] inbuf;
private byte[] timestamp;
private byte[] payload;
private int tLength = EKFFFTAES.getSizeTimepoint();
private volatile boolean isRunning = true;

public SocketEventGroupStreamFunction(int port, int length) {
  serverPort = port;
  dataLength = length;
  inbuf = new byte[1 + dataLength + tLength];
  timestamp = new byte[tLength];
  payload = new byte[dataLength];
}

@Override
public void run(SourceContext ctx) throws Exception {
  while(isRunning) {
serverSocket = new ServerSocket(serverPort, 100, 
InetAddress.getByName("192.168.1.13"));

serverSocket.setSoTimeout(100);
System.out.println("Waiting for incoming connections on port " +
  serverSocket.getLocalPort() + "...");
Socket server = serverSocket.accept();

System.out.println("Just connected to " + 
server.getRemoteSocketAddress());

DataInputStream in = new DataInputStream(server.getInputStream());

while(isRunning) {
  in.readFully(inbuf, 0, inbuf.length);
  System.arraycopy(inbuf, 1, timestamp, 0, tLength);
  System.arraycopy(inbuf, 1+tLength, payload, 0, dataLength);

  System.out.print("Got an event " + inbuf[0] + ": ");
  displayElapsedTime(timestamp);

  ctx.collect(new EventGroup(inbuf[0], timestamp, payload));
}
  }
}

@Override
public void cancel() {
  isRunning = false;
  ServerSocket theSocket = this.serverSocket;
  if (theSocket != null) {
try {
  theSocket.close();
}catch(SocketTimeoutException s) {
  System.out.println("Socket timed out!");
}catch(IOException e) {
  e.printStackTrace();
}
  }
}
  }


and finally, EKFFFTAES is my cpp library implementing the timestamping 
facility:



int timePointLength = sizeof(std::chrono::system_clock::time_point);

JNIEXPORT jint JNICALL Java_eventProcessing_EKFFFTAES_getSizeTimepoint
  (JNIEnv *, jclass)
{
  return ::timePointLength;
}

JNIEXPORT void JNICALL Java_eventProcessing_EKFFFTAES_displayElapsedTime
  (JNIEnv *env, jclass, jbyteArray inArray)
{
  std::chrono::system_clock::time_point end =
std::chrono::system_clock::now();
  jbyte *inCArray = env->GetByteArrayElements(inArray, NULL);
  std::chrono::system_clock::time_point start;
  std::memcpy (, inCArray, ::timePointLength);
  std::cout <&

Experiencing long latency while using sockets

2017-08-07 Thread Chao Wang

Hi,

I have been trying to benchmark the end-to-end latency of a Flink 1.3.1 
application, but got confused regarding the amount of time spent in 
Flink. In my setting, data source and data sink dwell in separated 
machines, like the following topology:


Machine 1Machine 2  
Machine 3
data source (via a socket client)   ->  Flink ->data sink (via a 
socket server)


I observed 200-400 milliseconds end-to-end latency, while the execution 
time of my stream transformations took no more than two milliseconds, 
and the socket-only networking latency between machines is no more than 
one millisecond, and I used ptpd so that the clock offset between 
machines were also no more than one millisecond.


Question: What took those hundreds of milliseconds?

Here are the details of my setting and my observation so far:

On Machine 2, I implemented a socket server as a data source to Flink 
(by implementing SourceFunction), and I splited the incoming stream into 
several streams (by SplitStream) for some transformations (implementing 
MapFuction and CoFlatMapFunction), where the results were fed to socket 
(using writeToSocket). I used c++11's chrono time library (through JNI) 
to take timestamps and determine the elapsed time, and I have verified 
that the overhead of timestamping this way is no more than one millisecond.


I observed that for the four consecutive writes from Machine 1, with the 
time between two writes no more than 0.3 milliseconds, on Machine 2 
Flink got the first write in 0.2 milliseconds, but then it took 90 
milliseconds for Flink to get the next write, and another 4 milliseconds 
for the third write, and yet another 4 milliseconds for the fourth write.


And then it took more than 70 milliseconds before Flink started 
processing my plan's first stream transformation. And after my last 
transformation, it took more than 70 milliseconds before the result was 
received at Machine 3.



Thank you,

Chao




Re: schema to just read as "byte[] array" from kafka

2017-08-07 Thread Chao Wang

A quick update, in class MyDe:

public static class MyDe extends AbstractDeserializationSchema<byte[]> {
  @Override
  public byte[] deserialize(byte[] arg0) {
// Perform deserialization here, if needed;
// otherwise, probably we can simply return arg0 as raw byte[]
return arg0;
  }
}


Chao

On 08/07/2017 12:23 PM, Chao Wang wrote:


Hi Raja,

I just happened to work on the similar thing, and here is how to do it 
in general, I think (In my case, I did a bit more, to deserialize a 
tuple of <byte[],byte[]>) :


FlinkKafkaConsumer010<byte[]> consumer = new 
FlinkKafkaConsumer010<>("topic_name", new MyDe<byte[]>(), properties);


and for MyDe the schema:

public static class MyDe extends 
AbstractDeserializationSchema<byte[]> {

  @Override
  public byte[] deserialize(byte[] arg0) {
return new e;
  }
}


Chao

On 08/07/2017 10:47 AM, Raja.Aravapalli wrote:


Hi

I am using /SimpleStringSchema/ to deserialize a message read from 
kafka, but need some help to know if there is any schema available I 
can use rather than “SimpleStringSchema()” and instead just get 
“byte[]” without any deserialization happening!


Below is code I am currently using, but instead of 
SimpleStringSchema() which is giving me Strings, but I want the a raw 
byte array Byte[]:


FlinkKafkaConsumer08 myConsumer = *new 
*FlinkKafkaConsumer08<>(*"xxx_topic"*, *new */SimpleStringSchema(),/ 
properties);


Thanks a lot.

Regards,

Raja.







Re: schema to just read as "byte[] array" from kafka

2017-08-07 Thread Chao Wang

Hi Raja,

I just happened to work on the similar thing, and here is how to do it 
in general, I think (In my case, I did a bit more, to deserialize a 
tuple of ) :


FlinkKafkaConsumer010 consumer = new 
FlinkKafkaConsumer010<>("topic_name", new MyDe(), properties);


and for MyDe the schema:

public static class MyDe extends AbstractDeserializationSchema {
  @Override
  public byte[] deserialize(byte[] arg0) {
return new e;
  }
}


Chao

On 08/07/2017 10:47 AM, Raja.Aravapalli wrote:


Hi

I am using /SimpleStringSchema/ to deserialize a message read from 
kafka, but need some help to know if there is any schema available I 
can use rather than “SimpleStringSchema()” and instead just get 
“byte[]” without any deserialization happening!


Below is code I am currently using, but instead of 
SimpleStringSchema() which is giving me Strings, but I want the a raw 
byte array Byte[]:


FlinkKafkaConsumer08 myConsumer = *new 
*FlinkKafkaConsumer08<>(*"xxx_topic"*, *new */SimpleStringSchema(),/ 
properties);


Thanks a lot.

Regards,

Raja.





Re: CEP condition expression and its event consuming strategy

2017-08-03 Thread Chao Wang
Thank you, Dawid. FYI, I've implemented the discarding logic by 
CoFlatMapFunction, for the special case where there are only two input 
streams: I maintain a logical state (no match, input1 matched, or input2 
matched) and use private variables to store the matched event so far, 
which waits to be processed along with the event from the other input 
source.


Chao


On 07/31/2017 02:13 AM, Dawid Wysakowicz wrote:

Ad. 1 Yes it returns and Iterable to support times and oneOrMore patterns(which 
can accept more than one event).

Ad. 2 Some use case for not discarding used events could be e.g. looking for 
some shapes in our data, e.g. W-shapes. In this case one W-shape could start on 
the middle peak of the previous one.

Unfortunately personally I can’t point you to any in-use applications. Maybe 
Kostas, I’ve added to the discussion, know of any.

Anyway, thanks for interest in the CEP library. We will be happy to hear any 
comments and suggestions for future improvements.




On 28 Jul 2017, at 21:54, Chao Wang <chaow...@wustl.edu> wrote:

Hi Dawid,

Thank you.

Ad. 1 I noticed that the method getEventsForPattern() returns an Iterable 
and we need to further invoke .operator().next() to get access to the event value.

Ad. 2 Here is a bit about a use case we have that calls for such discarding 
semantics. In the event processing project I am currently working on, input 
event streams are sensor data, and we join streams and do Kalman filtering, 
FFT, etc. We therefore choose to discard the accepted events once the data they 
carry have been processed; otherwise, it may cause duplicated processing as 
well as incorrect join semantics.

We came up with this question while doing an empirical comparison of Flink and 
our system (implemented with the TAO real-time event service). We implemented 
in our system such semantics, by removing input events once CEP emits the 
corresponding output events.

Could you provide some use cases where the discarding semantics are not needed? 
I guess I am wired into processing sensor data and thus cannot think of a case 
where reusing accepted events would be of interest. Also, could you share some 
pointers to streaming application in-use? We are seeking to make our research 
work more relevant to current practice.

Thank you very much,

Chao

On 07/27/2017 02:17 AM, Dawid Wysakowicz wrote:

Hi Chao,

Ad. 1 You could implement it with IterativeCondition. Sth like this:

Pattern<Event, ?> pattern = Pattern.begin("first").where(new 
SimpleCondition() {
@Override
public boolean filter(Event value) throws Exception {
   return value.equals("A") || value.equals("B");
}
}).followedBy("second").where(new IterativeCondition() {
@Override
public boolean filter(Event value, Context ctx) throws Exception {
   return (value.equals("A") || value.equals("B")) && 
!value.equals(ctx.getEventsForPattern("first"));
}
}).

Ad. 2 Unfortunately right now as you said Pattern restarts each other event and 
it is not possible to change that strategy. There is ongoing work to introduce 
AfterMatchSkipStrategy[1], but at best it will be merged in 1.4.0. I did not 
give it much thought, but I would try implement some discarding logic.

Regards,
Dawid

[1] https://issues.apache.org/jira/browse/FLINK-7169


On 26 Jul 2017, at 22:45, Chao Wang <chaow...@wustl.edu> wrote:

Hi,

I have two questions regarding the use of the Flink CEP library 
(flink-cep_2.11:1.3.1), as follows:

1. I'd like to know how to use the API to express "emit event C in the presence of events A and B, with 
no restriction on the arriving order of A and B"? I've tried by creating two patterns, one for "A 
and then B" and the other for "B and then A", and consequently using two patternStreams to 
handle each case, which emits C. It worked but to me this approach seems redundant.

2. Given the above objective expression, how to consume the accepted events so 
that they will not be used for future matchings? For example, with the arriving 
sequence {A, B, A}, the CEP should only emit one C (due to the matching of 
{A,B}), not two Cs (due to {A,B} and {B,A}). Similarly, with the arriving 
sequence {B, A, B, A}, the CPE should only emit two Cs, not three.


Thanks,

Chao





Re: Operations dependencies between values with different key in a ConnectedStreams

2017-07-28 Thread Chao Wang

Hi Gabriele,

I think CEP may be able to deal with this kind of expressions, in 
general, although I am not sure about how to deal with different time 
windows (5s and 3s, in your case). Take a look at the available patterns 
in the CEP library doc: 
https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/libs/cep.html#the-pattern-api


Chao


On 07/28/2017 10:08 AM, Gabriele Di Bernardo wrote:

Hi guys,

I have a question for you. I have an application with two keyed data streams: 
one for control and the other one for the data. Each control message represents 
an operation to be performed on the data values marked with a certain 
identifier. I connected the two streams and I process the data with a 
CoProcessFunction.

The operations I do are really simple like collecting the MAX or the MEAN value 
of the last n seconds. Now, I would like to create more complex operations 
where the result value of a key might depend by the result value of another 
key. To be more clear, I would like to evaluate expressions like: if {ALL the 
values of data marked with id 22 in the last 5s} are BIGGER THAN {The MEAN 
value of data marked with id 17 in the last 3s}. In this example, I should 
defer the evaluation of the expression until I have the MEAN value of the right 
part of the expression and check it against ALL the data keyed with key 22 from 
the last 5 seconds. I’d like to ask you if something like this in Flink is 
doable and what is the best way to do that in your opinion. I am also checking 
how the CEP library works (can it be helpful?).

Thank you so much in advance.

Best,


Gabriele




Re: CEP condition expression and its event consuming strategy

2017-07-28 Thread Chao Wang

Hi Dawid,

Thank you.

Ad. 1 I noticed that the method getEventsForPattern() returns an 
Iterable and we need to further invoke .operator().next() to get 
access to the event value.


Ad. 2 Here is a bit about a use case we have that calls for such 
discarding semantics. In the event processing project I am currently 
working on, input event streams are sensor data, and we join streams and 
do Kalman filtering, FFT, etc. We therefore choose to discard the 
accepted events once the data they carry have been processed; otherwise, 
it may cause duplicated processing as well as incorrect join semantics.


We came up with this question while doing an empirical comparison of 
Flink and our system (implemented with the TAO real-time event service). 
We implemented in our system such semantics, by removing input events 
once CEP emits the corresponding output events.


Could you provide some use cases where the discarding semantics are not 
needed? I guess I am wired into processing sensor data and thus cannot 
think of a case where reusing accepted events would be of interest. 
Also, could you share some pointers to streaming application in-use? We 
are seeking to make our research work more relevant to current practice.


Thank you very much,

Chao

On 07/27/2017 02:17 AM, Dawid Wysakowicz wrote:

Hi Chao,

Ad. 1 You could implement it with IterativeCondition. Sth like this:

Pattern<Event, ?> pattern = Pattern.begin("first").where(new 
SimpleCondition() {
@Override
public boolean filter(Event value) throws Exception {
   return value.equals("A") || value.equals("B");
}
}).followedBy("second").where(new IterativeCondition() {
@Override
public boolean filter(Event value, Context ctx) throws Exception {
   return (value.equals("A") || value.equals("B")) && 
!value.equals(ctx.getEventsForPattern("first"));
}
}).

Ad. 2 Unfortunately right now as you said Pattern restarts each other event and 
it is not possible to change that strategy. There is ongoing work to introduce 
AfterMatchSkipStrategy[1], but at best it will be merged in 1.4.0. I did not 
give it much thought, but I would try implement some discarding logic.

Regards,
Dawid

[1] https://issues.apache.org/jira/browse/FLINK-7169


On 26 Jul 2017, at 22:45, Chao Wang <chaow...@wustl.edu> wrote:

Hi,

I have two questions regarding the use of the Flink CEP library 
(flink-cep_2.11:1.3.1), as follows:

1. I'd like to know how to use the API to express "emit event C in the presence of events A and B, with 
no restriction on the arriving order of A and B"? I've tried by creating two patterns, one for "A 
and then B" and the other for "B and then A", and consequently using two patternStreams to 
handle each case, which emits C. It worked but to me this approach seems redundant.

2. Given the above objective expression, how to consume the accepted events so 
that they will not be used for future matchings? For example, with the arriving 
sequence {A, B, A}, the CEP should only emit one C (due to the matching of 
{A,B}), not two Cs (due to {A,B} and {B,A}). Similarly, with the arriving 
sequence {B, A, B, A}, the CPE should only emit two Cs, not three.


Thanks,

Chao





CEP condition expression and its event consuming strategy

2017-07-26 Thread Chao Wang

Hi,

I have two questions regarding the use of the Flink CEP library 
(flink-cep_2.11:1.3.1), as follows:


1. I'd like to know how to use the API to express "emit event C in the 
presence of events A and B, with no restriction on the arriving order of 
A and B"? I've tried by creating two patterns, one for "A and then B" 
and the other for "B and then A", and consequently using two 
patternStreams to handle each case, which emits C. It worked but to me 
this approach seems redundant.


2. Given the above objective expression, how to consume the accepted 
events so that they will not be used for future matchings? For example, 
with the arriving sequence {A, B, A}, the CEP should only emit one C 
(due to the matching of {A,B}), not two Cs (due to {A,B} and {B,A}). 
Similarly, with the arriving sequence {B, A, B, A}, the CPE should only 
emit two Cs, not three.



Thanks,

Chao