Flink Elasticsearch connector 7 with SSL

2022-06-30 Thread shishal singh
Hi Flink community members,

In my flink job, I am trying to use *elasticsearch7* connector. I could not
find a way to communicate over SSL when using *Elasticsearch7SinkBuilder* Am
I missing something?

Also there is an deprecated class *ElasticsearchSink.Builder* where it has
*RestClientFactory* which I can use to setup https connection but since
this class is deprecated thus wondering if same is possible with
*Elasticsearch7SinkBuilder*.

Also since there is no connector for *elasticsearch 8* yet, Is it possible
to use any of the above clients with elastic 8?

Note: Earlier it used to be elastic connector compatibility matrix in Flink
doc but I could not find it any more in latest version of doc.


Re: Flink job hangs using rocksDb as backend

2018-07-23 Thread shishal singh
Thanks Stefan,

You are correct , I learned the hard way that when timers fires it stops
processing new events till the time all timers callback completes. This is
the points when I decided to isolate the problem by scheduling only 5-6K
timers in total so that even if its taking time in timers it should
progress after a reasonable period of time. But event after I left it
running whole night, watermark didn't progressed at all and cpu still shows
100% usages without any error log(either JM of TM). The stack trace I
shared in the one I took in the morning.

Also to isolate any problem with elastic sink, I removed sink and just did
stream.print() at the end.

I am using  spinning disk and set following option

*setPredefinedOptions(PredefinedOptions.SPINNING_DISK_OPTIMIZED); // Also
tried SPINNING_DISK_OPTIMIZED_HIGH_MEM*

My cluster setup has 3 node (Its a private cloud machine and has 4 cpu core
each) and 1 TM with 4 slot each running on each node.  Also Job manager and
hadoop is also running on same 3 node.

My job graph look like this:

[image: image.png]
I am using following config with checkpointing interval of 10min and hadoop
to store checkpoint.

* RocksDBStateBackend backend = new
RocksDBStateBackend(baseDir+"/checkpoints", true);*
*
backend.setPredefinedOptions(PredefinedOptions.SPINNING_DISK_OPTIMIZED);*
*env.setStateBackend(backend);*
*env.enableCheckpointing(intervalMilli);*
*
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(intervalMilli);*
*env.getCheckpointConfig().setCheckpointTimeout(timeoutMilli);*
*
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);*
*env.getCheckpointConfig().enableExternalizedCheckpoints(
CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);*
*env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);*


The last thing I am intended to try is using FSSatebackend to be sure if
its rocksDB related issue, but the problem is sometimes issue get
reproduced after couple of days.

Regards,
Shishal


On Mon, Jul 23, 2018 at 10:08 AM Stefan Richter 
wrote:

> Hi,
>
> let me first clarify what you mean by „stuck“, just because your job stops
> consuming events for some time does not necessarily mean that it is
> „stuck“. That is very hard to evaluate from the information we have so far,
> because from the stack trace you cannot conclude that the thread is
> „stuck“, because it looks like it is just processing firing timers. And
> while timers are firing, the pipeline will stop consuming further events
> until all timers have been processed. Even if your thread dump looks the
> same all the time, it could just be that you observe the same call (the
> most expensive one) across multiple invocations and is not necessarily an
> indicator for the thread being stuck. Attaching a sampler or introducing
> logging to one of the seemingly stuck task JVMs could clarify this a bit
> more. For now I am assuming that it makes progress but spends a lot of work
> on timers. Why you might experience this randomly is, for example, if your
> watermark makes a bigger jump and many (or all) of your timers suddenly
> fire. From the perspective of consuming events, this could look like being
> stuck.
> In case that the job really is stuck in the strict sense, it does not look
> like a Flink problem because your threads are in some call against RocksDB.
> Since we are not aware of any similar problem from the mailing list, a
> setup problem would be the most likely explanation, e.g. what types of disk
> are you using, how many threads are available on the TM machine so that
> also RocksDB compaction, processing, async checkpointing etc. can work in
> parallel. But for now, the most important piece of information would be
> what exactly „stuck“ means in your problem.
>
> Best,
> Stefan
>
> Am 20.07.2018 um 18:58 schrieb shishal singh :
>
> Hi Richer,
>
> Actually for the testing , now I have reduced the number of timers to few
> thousands (5-6K) but my job still gets stuck randomly.  And its not
> reproducible each time. next time when I restart the job it again starts
> working  for few few hours/days then gets stuck again.
> I took thread dump when my job was hanged with almost 100% cpu . The most
> cpu taking thread has following stack:
>
> It look like sometimes its not able to read data from RocksDB.
>
> *"process (3/6)" #782 prio=5 os_prio=0 tid=0x7f68b81ddcf0 nid=0xee73
> runnable [0x7f688d83a000]*
> *   java.lang.Thread.State: RUNNABLE*
> * at org.rocksdb.RocksDB.get(Native Method)*
> * at org.rocksdb.RocksDB.get(RocksDB.java:810)*
> * at
> org.apache.flink.contrib.streaming.state.RocksDBMapState.contains(RocksDBMapState.java:137)*
> * at
> org.apache.flink.runtime.state.Us

Re: Flink job hangs using rocksDb as backend

2018-07-20 Thread shishal singh
Hi Richer,

Actually for the testing , now I have reduced the number of timers to few
thousands (5-6K) but my job still gets stuck randomly.  And its not
reproducible each time. next time when I restart the job it again starts
working  for few few hours/days then gets stuck again.
I took thread dump when my job was hanged with almost 100% cpu . The most
cpu taking thread has following stack:

It look like sometimes its not able to read data from RocksDB.

*"process (3/6)" #782 prio=5 os_prio=0 tid=0x7f68b81ddcf0 nid=0xee73
runnable [0x7f688d83a000]*
*   java.lang.Thread.State: RUNNABLE*
* at org.rocksdb.RocksDB.get(Native Method)*
* at org.rocksdb.RocksDB.get(RocksDB.java:810)*
* at
org.apache.flink.contrib.streaming.state.RocksDBMapState.contains(RocksDBMapState.java:137)*
* at
org.apache.flink.runtime.state.UserFacingMapState.contains(UserFacingMapState.java:72)*
* at
nl.ing.gmtt.observer.analyser.job.rtpe.process.RtpeProcessFunction.isEventExist(RtpeProcessFunction.java:150)*
* at
nl.ing.gmtt.observer.analyser.job.rtpe.process.RtpeProcessFunction.onTimer(RtpeProcessFunction.java:93)*
* at
org.apache.flink.streaming.api.operators.KeyedProcessOperator.onEventTime(KeyedProcessOperator.java:75)*
* at
org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService.java:288)*
* at
org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:108)*
* at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:885)*
* at
org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:288)*
* - locked <0x000302b61458> (a java.lang.Object)*
* at
org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189)*
* at
org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111)*
* at
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:189)*
* at
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)*
* at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264)*
* at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)*
* at java.lang.Thread.run(Thread.java:748)*

*   Locked ownable synchronizers:*
* - None*

*"process (2/6)" #781 prio=5 os_prio=0 tid=0x7f68b81dcef0 nid=0xee72
runnable [0x7f688fe54000]*
*   java.lang.Thread.State: RUNNABLE*
* at org.rocksdb.RocksDB.get(Native Method)*
* at org.rocksdb.RocksDB.get(RocksDB.java:810)*
* at
org.apache.flink.contrib.streaming.state.RocksDBMapState.get(RocksDBMapState.java:102)*
* at
org.apache.flink.runtime.state.UserFacingMapState.get(UserFacingMapState.java:47)*
* at
nl.ing.gmtt.observer.analyser.job.rtpe.process.RtpeProcessFunction.onTimer(RtpeProcessFunction.java:99)*
* at
org.apache.flink.streaming.api.operators.KeyedProcessOperator.onEventTime(KeyedProcessOperator.java:75)*
* at
org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService.java:288)*
* at
org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:108)*
* at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:885)*
* at
org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:288)*
* - locked <0x000302b404a0> (a java.lang.Object)*
* at
org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189)*
* at
org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111)*
* at
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:189)*
* at
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)*
* at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264)*
* at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)*
* at java.lang.Thread.run(Thread.java:748)*

*   Locked ownable synchronizers:*


Regards,
Shishal


On Thu, Jul 12, 2018 at 4:11 PM Stefan Richter 
wrote:

> Hi,
>
> Did you check the metrics for the garbage collector? Stuck with high CPU
> consumption and lots of timers sound like there could be a possible
> problem, because timer are currently on-heap objects, but we are working on
> RocksDB-based timers right now.
>
> Best,
> Stefan
>
> Am 12.07.2018 um 14:54 schrieb shishal singh :
>
> Thanks Stefan/Stephan/Nico,
>
> Indeed there are 2 problem. For th

Re: Flink job hangs using rocksDb as backend

2018-07-12 Thread shishal singh
Thanks Stefan/Stephan/Nico,

Indeed there are 2 problem. For the 2nd problem ,I am almost certain that
explanation given by Stephan is the true as in my case as there number of
timers are in millions. (Each for different key so I guess coalescing is
not an option for me).

If I simplify my problem, each day I receive millions of events (10-20M)
and I have to schedule a timer for next day 8 AM to check if matching
events are there , if not I have to send it to Elastic sink as Alert. I
suspected that having so many timers fires at same time could cause my jobs
to hang, so I am now scheduling times randomly between (8AM-to 10AM). But
still my job gets hang after some time.  One more thing which I noticed
that when my job gets hang CPU utilization shoot to almost 100%.
I tried to isolate problem by removing ES sink and just did stream.print()
and yet problem persist.

In my current setup, I am running a standalone cluster of 3 machine (All
three server has Task manger, Job manager and Hadoop on it). So I am not
using EBS for rocksDB.

 Also I verified that when jobs gets hang even timers are not being called
as I have debug statement in Timers and only logs I see at that time are
following :

*2018-07-12 14:35:30,423 DEBUG
org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Got
ping response for sessionid: 0x2648355f7c6010f after 11ms*
*2018-07-12 14:35:31,957 DEBUG
org.apache.flink.runtime.taskmanager.TaskManager  - Sending
heartbeat to JobManager*
*2018-07-12 14:35:36,946 DEBUG
org.apache.flink.runtime.taskmanager.TaskManager  - Sending
heartbeat to JobManager*
*2018-07-12 14:35:41,963 DEBUG
org.apache.flink.runtime.taskmanager.TaskManager  - Sending
heartbeat to JobManager*
*2018-07-12 14:35:43,775 DEBUG
org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Got
ping response for sessionid: 0x2648355f7c6010f after 10ms*
*2018-07-12 14:35:46,946 DEBUG
org.apache.flink.runtime.taskmanager.TaskManager  - Sending
heartbeat to JobManager*
*2018-07-12 14:35:51,954 DEBUG
org.apache.flink.runtime.taskmanager.TaskManager  - Sending
heartbeat to JobManager*
*2018-07-12 14:35:56,967 DEBUG
org.apache.flink.runtime.taskmanager.TaskManager  - Sending
heartbeat to JobManager*
*2018-07-12 14:35:57,127 DEBUG
org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Got
ping response for sessionid: 0x2648355f7c6010f after 8ms*
*2018-07-12 14:36:01,944 DEBUG
org.apache.flink.runtime.taskmanager.TaskManager  - Sending
heartbeat to JobManager*
*2018-07-12 14:36:06,955 DEBUG
org.apache.flink.runtime.taskmanager.TaskManager  - Sending
heartbeat to JobManager*
*2018-07-12 14:36:08,287 DEBUG
org.apache.flink.runtime.taskmanager.TaskManager  - Receiver
TriggerCheckpoint 155@1531398968248 for d9af2f1da87b7268cc03e152a6179eae.*
*2018-07-12 14:36:08,287 DEBUG org.apache.flink.runtime.taskmanager.Task
 - Invoking async call Checkpoint Trigger for Source: Event
Source -> filter (1/1) (d9af2f1da87b7268cc03e152a6179eae). on task Source:
Event Source -> filter (1/1)*
*2018-07-12 14:36:10,476 DEBUG
org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Got
ping response for sessionid: 0x2648355f7c6010f after 10ms*
*2018-07-12 14:36:11,957 DEBUG
org.apache.flink.runtime.taskmanager.TaskManager  - Sending
heartbeat to JobManager*

As I expected checkpoint also start to fail during this time.

My Job Graph is pretty much simple : Source-->filter->Sink


Regards,
Shishal


On Thu, Jul 12, 2018 at 9:54 AM Stefan Richter 
wrote:

> Hi,
>
> adding to what has already been said, I think that here can be two
> orthogonal problems here: i) why is your job slowing down/getting stuck?
> and ii) why is cancellation blocked? As for ii) I think Stephan already
> gave to right reason that shutdown could take longer and that is what gets
> the TM killed.
>
> A more interesting question could still be i), why is your job slowing
> down until shutdown in the first place. I have two questions here.First,
> are you running on RocksDB on EBS volumes, then please have a look at this
> thread [1] because there can be some performance pitfalls. Second, how many
> timers are you expecting, and how are they firing? For example, if you have
> a huge amount of timers and the watermark makes a bug jump, there is a
> possibility that it takes a while until the job makes progress because it
> has to handle so many timer callbacks first. Metrics from even throughput
> and from your I/O subsystem could be helpful to see if something is
> stuck/underperforming or if there is just a lot of timer processing going
> on.
>
> Best,
> Stefan
>
> [1]
> https://mail-archives.apache.org/mod_mbox/flink-user/201803.mbox/%3ccakhqddzamdqewiz5b1qndqv4+-mtvefhbhewrpxftlu7dv9...@mail.gmail.com%3E
>
&g

Flink job hangs using rocksDb as backend

2018-07-11 Thread shishal
Hi,

I am using flink 1.4.2 with rocksdb as backend. I am using process function
with timer on EventTime.  For checkpointing I am using hdfs.

I am trying load testing so Iam reading kafka from beginning (aprox 7 days
data with 50M events).

My job gets stuck after aprox 20 min with no error. There after watermark do
not progress and all checkpoint fails.

Also When I try to cancel my job (using web UI) , it takes several minutes
to finally gets cancelled. Also it makes Task manager down as well. 

There is no logs while my job hanged but while cancelling I get following
error.

/

2018-07-11 09:10:39,385 ERROR
org.apache.flink.runtime.taskmanager.TaskManager  - 
==
==  FATAL  ===
==

A fatal error occurred, forcing the TaskManager to shut down: Task 'process
(3/6)' did not react to cancelling signal in the last 30 seconds, but is
stuck in method:
 org.rocksdb.RocksDB.get(Native Method)
org.rocksdb.RocksDB.get(RocksDB.java:810)
org.apache.flink.contrib.streaming.state.RocksDBMapState.get(RocksDBMapState.java:102)
org.apache.flink.runtime.state.UserFacingMapState.get(UserFacingMapState.java:47)
nl.ing.gmtt.observer.analyser.job.rtpe.process.RtpeProcessFunction.onTimer(RtpeProcessFunction.java:94)
org.apache.flink.streaming.api.operators.KeyedProcessOperator.onEventTime(KeyedProcessOperator.java:75)
org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService.java:288)
org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:108)
org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:885)
org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:288)
org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189)
org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111)
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:189)
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264)
org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
java.lang.Thread.run(Thread.java:748)

2018-07-11 09:10:39,390 DEBUG
org.apache.flink.runtime.akka.StoppingSupervisorWithoutLoggingActorKilledExceptionStrategy
 
- Actor was killed. Stopping it now.
akka.actor.ActorKilledException: Kill
2018-07-11 09:10:39,407 INFO 
org.apache.flink.runtime.taskmanager.TaskManager  - Stopping
TaskManager akka://flink/user/taskmanager#-1231617791.
2018-07-11 09:10:39,408 INFO 
org.apache.flink.runtime.taskmanager.TaskManager  - Cancelling
all computations and discarding all cached data.
2018-07-11 09:10:39,409 INFO  org.apache.flink.runtime.taskmanager.Task 
   
- Attempting to fail task externally process (3/6)
(432fd129f3eea363334521f8c8de5198).
2018-07-11 09:10:39,409 INFO  org.apache.flink.runtime.taskmanager.Task 
   
- Task process (3/6) is already in state CANCELING
2018-07-11 09:10:39,409 INFO  org.apache.flink.runtime.taskmanager.Task 
   
- Attempting to fail task externally process (4/6)
(7c6b96c9f32b067bdf8fa7c283eca2e0).
2018-07-11 09:10:39,409 INFO  org.apache.flink.runtime.taskmanager.Task 
   
- Task process (4/6) is already in state CANCELING
2018-07-11 09:10:39,409 INFO  org.apache.flink.runtime.taskmanager.Task 
   
- Attempting to fail task externally process (2/6)
(a4f731797a7ea210fd0b512b0263bcd9).
2018-07-11 09:10:39,409 INFO  org.apache.flink.runtime.taskmanager.Task 
   
- Task process (2/6) is already in state CANCELING
2018-07-11 09:10:39,409 INFO  org.apache.flink.runtime.taskmanager.Task 
   
- Attempting to fail task externally process (1/6)
(cd8a113779a4c00a051d78ad63bc7963).
2018-07-11 09:10:39,409 INFO  org.apache.flink.runtime.taskmanager.Task 
   
- Task process (1/6) is already in state CANCELING
2018-07-11 09:10:39,409 INFO 
org.apache.flink.runtime.taskmanager.TaskManager  -
Disassociating from JobManager
2018-07-11 09:10:39,412 INFO 
org.apache.flink.runtime.blob.PermanentBlobCache  - Shutting
down BLOB cache
2018-07-11 09:10:39,431 INFO 
org.apache.flink.runtime.blob.TransientBlobCache  - Shutting
down BLOB cache
2018-07-11 09:10:39,444 INFO 
org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  -
Stopping ZooKeeperLeaderRetrievalService.
2018-07-11 09:10:39,444 DEBUG
org.apache.flink.runtime

Re: Collect event which arrive after watermark

2018-04-04 Thread shishal
Thanks Fabian, My understand was that late event older than watermark is
dropped. So processFunction wont be called for late event. So I guess my
understanding was wrong. Or there is something more to it?



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Collect event which arrive after watermark

2018-04-04 Thread shishal
Hi Flink community members,

I am new to flink stream processing. I am using event time processing and
keystream.

Sorry if my question sound silly but Is there a way to collect (or log) the
late event which arrived after watermark. So somehow I need to gather this
stats for further analysis.

Thanks,
Shishal





--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Flink CEP detect multiple pattern witin their specific time window

2018-03-23 Thread shishal
Hi,

I have 2 different pattern, 1st pattern should appear within 4 hours and
another pattern should appear within 24 hour.

I can go ahead and create 2 jobs running on flink cluster. But is it
possible to use same job to detect both patter within their own time window
as my source stream and elastic sink is same for both job.

Regards,
Shishal



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Flink CEP window for 1 working day

2018-03-20 Thread shishal
Thanks Fabian,

So by non working day, I mean,  I have a list of non working day in a year,
which I can use to compare.
I am very new to Flink and Flick CEP. Initially I thought there is a way to
have within(time) value expression dynamically. So now I guess that's not
possible. 

If I understand correctly, the other way around is to somehow manipulated
stream (probably using ProcessFunction), and may be manipulate event time.

Can you please help me with more pointer to how to go ahead with this.
Please note that I am using event time processing. 

Your help is much appreciated.
Thanks



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Flink CEP window for 1 working day

2018-03-20 Thread shishal
I am using flink CEP , and to match a event pattern in given time window we
use *.within(Time.days(1))*

Now in one of the case I need to wait for 1 working day instead of 1 day. Is
there any way to do that in Flink CEP?



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


design flink cep pattern for transaction life cycle

2018-03-14 Thread shishal singh
0down votefavorite
<https://stackoverflow.com/questions/49273875/design-flink-cep-pattern-for-transaction#>

I have a scenario where I need to detect following pattern for a
transaction.

My basic transaction event look like this => *Transaction(id,status, time)*

Transaction goes through following state.

BEGIN -> COMPLETE (within 5 day, if this comes then no need to wait
for PROCESSING)
BEGIN -> PROCESSING (within 1 day)
PROCESSING -> PROCESSING (within 1 day, could happen upto 5 days)
PROCESSING -> COMPLETE (within 1 day)

In plain text, basically when a transaction is initiated it can go from
BEGIN to COMPLETE in 5 days. But If its can not be completed within 1 day
then it keep sending a PROCESSING event on that day. So this can happen
upto 5 day and then finally it can send COMPLETE event.

I want to alert if there is any diversion form this pattern.

The number of transaction in a day could be 10-50 million. I am evaluating
FlinkCEP for this problem. I have gone through their doc
https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/libs/cep.html

 Is it possible to design this pattern using Flink CEP?

Also if I am not wrong , I need to use PatternTimeoutFunction to trigger
alert.

Regards,
Shishal


Flink CEP pattern with multiple temporal constraint

2018-03-14 Thread shishal singh
Hi Team,

I have a scenario where I need to detect following pattern for a
transaction.

My basic transaction event look like this => *Transaction(id,status,
eventTime)*

Transaction goes through following state.

*BEGIN -> COMPLETE (within 5 day, if this comes then no need to wait for
PROCESSING)*
*BEGIN -> PROCESSING (within 1 day)*
*PROCESSING -> PROCESSING (within 1 day, could happen upto 5 days)*
*PROCESSING -> COMPLETE (within 1 day)*


In plain text, basically when a transaction is initiated it can go from
BEGIN to COMPLETE in 5 days. But If its can not be completed within 1 day
then it keep sending a PROCESSING event on that day. So this can happen
upto 5 day and then finally it can send COMPLETE event.

I want to alert per transaction if there is any diversion form this pattern.

The number of transaction in a day could be 10-50 million. I am looking for
FlinkCEP for this problem. I have gone through their doc
https://ci.apache.org/projects/flink/flink-docs-
release-1.4/dev/libs/cep.html

But still I am not sure how to design this pattern. I hope it is possible
to design pattern with Flink CEP?

Also if I am not wrong , I need to use PatternTimeoutFunction to trigger
alert.

Thanks,
Shishal