CEP and Within Clause

2016-07-26 Thread Sameer W
Hi,

It looks like the WithIn clause of CEP uses Tumbling Windows. I could get
it to use Sliding windows by using an upstream pipeline which uses Sliding
Windows and produces repeating elements (in each sliding window) and
applying a Watermark assigner on the resulting stream with elements
duplicated. I wanted to use the "followedBy" pattern where there is a
strong need for sliding windows.

Is there a plan to add sliding windows to the within clause at some point?

The PatternStream class's "select" and "flatSelect" have overloaded
versions which take PatternTimeOut variable. Is there a way to insert some
of those elements back to the front of the stream. Say I am trying to find
a pattern where two temperature readings >150 within 6 second window should
raise an alert. If only one was found, can I insert that one back in the
front of the stream on that task node (for that window pane) so that I can
find a pattern match in the events occurring in the next 6 seconds. If I
can do that, I don't need sliding windows. Else I cannot avoid using them
for such scenarios.

Thanks,
Sameer


stop then start the job, how to load latest checkpoints automatically?

2016-07-26 Thread Shaosu Liu
I want to load previous states and I understand I could do this with
specifying a savepoints.
Is there a way to do this automatically, given I do not change my code
(jar)?

-- 
Cheers,
Shaosu


Re: .so linkage error in Cluster

2016-07-26 Thread Ufuk Celebi
Out of curiosity I've tried this locally by adding the following
dependencies to my Maven project:


   org.bytedeco
   javacpp
   1.2.2


   org.bytedeco.javacpp-presets
   opencv
   3.1.0-1.2


With this, running mvn clean package works as expected.



On Tue, Jul 26, 2016 at 7:09 PM, Ufuk Celebi  wrote:
> What error message to you get from Maven?
>
> On Tue, Jul 26, 2016 at 4:39 PM, Debaditya Roy  wrote:
>> Hello,
>>
>> I am using the jar builder from IntelliJ IDE (the mvn one was causing
>> problems). After that I executed it successfully locally. But in remote it
>> is causing problem.
>>
>> Warm Regards,
>> Debaditya
>>
>> On Tue, Jul 26, 2016 at 4:36 PM, Ufuk Celebi  wrote:
>>>
>>> Yes, the BlobCache on each TaskManager node should fetch it from the
>>> JobManager. How are you packaging your JAR?
>>>
>>> On Tue, Jul 26, 2016 at 4:32 PM, Debaditya Roy 
>>> wrote:
>>> > Hello users,
>>> >
>>> > I am having a problem while running my flink program in a cluster. It
>>> > gives
>>> > me an error that it is unable to find an .so file in a tmp directory.
>>> >
>>> > Caused by: java.lang.UnsatisfiedLinkError: no jniopencv_core in
>>> > java.library.path
>>> > at java.lang.ClassLoader.loadLibrary(ClassLoader.java:1867)
>>> > at java.lang.Runtime.loadLibrary0(Runtime.java:870)
>>> > at java.lang.System.loadLibrary(System.java:1122)
>>> > at org.bytedeco.javacpp.Loader.loadLibrary(Loader.java:654)
>>> > at org.bytedeco.javacpp.Loader.load(Loader.java:492)
>>> > at org.bytedeco.javacpp.Loader.load(Loader.java:409)
>>> > at org.bytedeco.javacpp.opencv_core.(opencv_core.java:10)
>>> > at java.lang.Class.forName0(Native Method)
>>> > at java.lang.Class.forName(Class.java:348)
>>> > at org.bytedeco.javacpp.Loader.load(Loader.java:464)
>>> > at org.bytedeco.javacpp.Loader.load(Loader.java:409)
>>> > at
>>> >
>>> > org.bytedeco.javacpp.helper.opencv_core$AbstractArray.(opencv_core.java:109)
>>> > at loc.video.FlinkStreamSource.run(FlinkStreamSource.java:95)
>>> > at
>>> >
>>> > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:78)
>>> > at
>>> >
>>> > org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:56)
>>> > at
>>> >
>>> > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
>>> > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>>> > at java.lang.Thread.run(Thread.java:745)
>>> > Caused by: java.lang.UnsatisfiedLinkError:
>>> > /tmp/javacpp5400264496782/libjniopencv_core.so: libgomp.so.1: cannot
>>> > open
>>> > shared object file: No such file or directory
>>> > at java.lang.ClassLoader$NativeLibrary.load(Native Method)
>>> > at java.lang.ClassLoader.loadLibrary0(ClassLoader.java:1941)
>>> > at java.lang.ClassLoader.loadLibrary(ClassLoader.java:1824)
>>> > at java.lang.Runtime.load0(Runtime.java:809)
>>> > at java.lang.System.load(System.java:1086)
>>> > at org.bytedeco.javacpp.Loader.loadLibrary(Loader.java:637)
>>> >
>>> >
>>> > I searched for the temp directory and in one of the nodes this directory
>>> > and
>>> > the .jar file was present. Is it required to have the file across all
>>> > the
>>> > nodes? If yes is there any way to control it? Since this tmp directory
>>> > and
>>> > the .so file gets extracted during the runtime without any external
>>> > manipulation.
>>> >
>>> >
>>> > Thanks in advance.
>>> >
>>> > Regards,
>>> > Debaditya
>>
>>


Re: .so linkage error in Cluster

2016-07-26 Thread Ufuk Celebi
What error message to you get from Maven?

On Tue, Jul 26, 2016 at 4:39 PM, Debaditya Roy  wrote:
> Hello,
>
> I am using the jar builder from IntelliJ IDE (the mvn one was causing
> problems). After that I executed it successfully locally. But in remote it
> is causing problem.
>
> Warm Regards,
> Debaditya
>
> On Tue, Jul 26, 2016 at 4:36 PM, Ufuk Celebi  wrote:
>>
>> Yes, the BlobCache on each TaskManager node should fetch it from the
>> JobManager. How are you packaging your JAR?
>>
>> On Tue, Jul 26, 2016 at 4:32 PM, Debaditya Roy 
>> wrote:
>> > Hello users,
>> >
>> > I am having a problem while running my flink program in a cluster. It
>> > gives
>> > me an error that it is unable to find an .so file in a tmp directory.
>> >
>> > Caused by: java.lang.UnsatisfiedLinkError: no jniopencv_core in
>> > java.library.path
>> > at java.lang.ClassLoader.loadLibrary(ClassLoader.java:1867)
>> > at java.lang.Runtime.loadLibrary0(Runtime.java:870)
>> > at java.lang.System.loadLibrary(System.java:1122)
>> > at org.bytedeco.javacpp.Loader.loadLibrary(Loader.java:654)
>> > at org.bytedeco.javacpp.Loader.load(Loader.java:492)
>> > at org.bytedeco.javacpp.Loader.load(Loader.java:409)
>> > at org.bytedeco.javacpp.opencv_core.(opencv_core.java:10)
>> > at java.lang.Class.forName0(Native Method)
>> > at java.lang.Class.forName(Class.java:348)
>> > at org.bytedeco.javacpp.Loader.load(Loader.java:464)
>> > at org.bytedeco.javacpp.Loader.load(Loader.java:409)
>> > at
>> >
>> > org.bytedeco.javacpp.helper.opencv_core$AbstractArray.(opencv_core.java:109)
>> > at loc.video.FlinkStreamSource.run(FlinkStreamSource.java:95)
>> > at
>> >
>> > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:78)
>> > at
>> >
>> > org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:56)
>> > at
>> >
>> > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
>> > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>> > at java.lang.Thread.run(Thread.java:745)
>> > Caused by: java.lang.UnsatisfiedLinkError:
>> > /tmp/javacpp5400264496782/libjniopencv_core.so: libgomp.so.1: cannot
>> > open
>> > shared object file: No such file or directory
>> > at java.lang.ClassLoader$NativeLibrary.load(Native Method)
>> > at java.lang.ClassLoader.loadLibrary0(ClassLoader.java:1941)
>> > at java.lang.ClassLoader.loadLibrary(ClassLoader.java:1824)
>> > at java.lang.Runtime.load0(Runtime.java:809)
>> > at java.lang.System.load(System.java:1086)
>> > at org.bytedeco.javacpp.Loader.loadLibrary(Loader.java:637)
>> >
>> >
>> > I searched for the temp directory and in one of the nodes this directory
>> > and
>> > the .jar file was present. Is it required to have the file across all
>> > the
>> > nodes? If yes is there any way to control it? Since this tmp directory
>> > and
>> > the .so file gets extracted during the runtime without any external
>> > manipulation.
>> >
>> >
>> > Thanks in advance.
>> >
>> > Regards,
>> > Debaditya
>
>


Re: Performance issues with GroupBy?

2016-07-26 Thread Greg Hogan
Hi Robert,

Are you able to simplify the your function input / output types? Flink
aggressively serializes the data stream and complex types such as ArrayList
and BitSet will be much slower to process. Are you able to reconstruct the
lists to be groupings on elements?

Greg

On Mon, Jul 25, 2016 at 8:06 AM, Paschek, Robert <
robert.pasc...@tu-berlin.de> wrote:

> Hi Mailing List,
>
>
>
> i actually do some benchmarks with different algorithms. The System has 8
> nodes and a configured parallelism of 48 - The IBM-Power-1 cluster, if
> somebody from the TU Berlin read this : - ) – and to “simulate” Hadoop
> MapReduce, the execution mode is set to “BATCH_FORCED”
>
>
>
> It is suspicious, that three of the six algorithms had a big gap in
> runtime (5000ms vs. 2ms) for easy (low dim) tuple. Additionally the
> algorithms in the “upper” group using a groupBy transformation and the
> algorithms in the “lower” group don’t use groupBy.
>
> I attached the plot for better visualization.
>
>
>
> I also checked the logs, especially the time, when the mappers finishing
> and the reducers start _*iterating*_ - they hardened my speculation.
>
>
>
> So my question is, if it is “normal”, that grouping is so cost-intensive
> that – in my case – the runtime increases by 4 times?
>
> I have data from the same experiments running on a 13 nodes cluster with
> 26 cores with Apache Hadoop MapReduce, where the gap is still present, but
> smaller (50s vs 57s or 55s vs 65s).
>
>
>
> Is there something I might could do to optimize the grouping? Some
> codesnipplets:
>
>
>
> The Job:
> DataSet output = input
>
> .mapPartition(*new*
> MR_GPMRS_Mapper()).withBroadcastSet(metaData, "MetaData").withParameters(
> parameters).name(MR_GPMRS_OPTIMIZED.*class*.getSimpleName()+"_MAPPER")
>
> .groupBy(0)
>
> .reduceGroup(*new* MR_GPMRS_Reducer()).returns(
> input.getType()).withBroadcastSet(metaData, "MetaData").withParameters(
> parameters).name(MR_GPMRS_OPTIMIZED.*class*.getSimpleName()+"_REDUCER");
>
>
>
> MR_GPMRS_Mapper():
>
> *public* *class* MR_GPMRS_Mapper  *extends*
> RichMapPartitionFunction,
> BitSet, BitSet>>>
>
>
>
> MR_GPMRS_Reducer():
>
> *public* *class* MR_GPMRS_Reducer  *extends*
> RichGroupReduceFunction,
> BitSet, BitSet>>, T>
>
>
>
> The Tuple2 has as Payload on position f1 the Tuple3 and on position f0 the
> Integer Key for grouping.
>
>
>
> Any suggestions (or comments, that it is a “normal” behaviour) are welcome
> : - )
>
>
>
> Thank you in advance!
>
> Robert
>


Re: .so linkage error in Cluster

2016-07-26 Thread Debaditya Roy
Hello,

I am using the jar builder from IntelliJ IDE (the mvn one was causing
problems). After that I executed it successfully locally. But in remote it
is causing problem.

Warm Regards,
Debaditya

On Tue, Jul 26, 2016 at 4:36 PM, Ufuk Celebi  wrote:

> Yes, the BlobCache on each TaskManager node should fetch it from the
> JobManager. How are you packaging your JAR?
>
> On Tue, Jul 26, 2016 at 4:32 PM, Debaditya Roy 
> wrote:
> > Hello users,
> >
> > I am having a problem while running my flink program in a cluster. It
> gives
> > me an error that it is unable to find an .so file in a tmp directory.
> >
> > Caused by: java.lang.UnsatisfiedLinkError: no jniopencv_core in
> > java.library.path
> > at java.lang.ClassLoader.loadLibrary(ClassLoader.java:1867)
> > at java.lang.Runtime.loadLibrary0(Runtime.java:870)
> > at java.lang.System.loadLibrary(System.java:1122)
> > at org.bytedeco.javacpp.Loader.loadLibrary(Loader.java:654)
> > at org.bytedeco.javacpp.Loader.load(Loader.java:492)
> > at org.bytedeco.javacpp.Loader.load(Loader.java:409)
> > at org.bytedeco.javacpp.opencv_core.(opencv_core.java:10)
> > at java.lang.Class.forName0(Native Method)
> > at java.lang.Class.forName(Class.java:348)
> > at org.bytedeco.javacpp.Loader.load(Loader.java:464)
> > at org.bytedeco.javacpp.Loader.load(Loader.java:409)
> > at
> >
> org.bytedeco.javacpp.helper.opencv_core$AbstractArray.(opencv_core.java:109)
> > at loc.video.FlinkStreamSource.run(FlinkStreamSource.java:95)
> > at
> >
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:78)
> > at
> >
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:56)
> > at
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
> > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
> > at java.lang.Thread.run(Thread.java:745)
> > Caused by: java.lang.UnsatisfiedLinkError:
> > /tmp/javacpp5400264496782/libjniopencv_core.so: libgomp.so.1: cannot open
> > shared object file: No such file or directory
> > at java.lang.ClassLoader$NativeLibrary.load(Native Method)
> > at java.lang.ClassLoader.loadLibrary0(ClassLoader.java:1941)
> > at java.lang.ClassLoader.loadLibrary(ClassLoader.java:1824)
> > at java.lang.Runtime.load0(Runtime.java:809)
> > at java.lang.System.load(System.java:1086)
> > at org.bytedeco.javacpp.Loader.loadLibrary(Loader.java:637)
> >
> >
> > I searched for the temp directory and in one of the nodes this directory
> and
> > the .jar file was present. Is it required to have the file across all the
> > nodes? If yes is there any way to control it? Since this tmp directory
> and
> > the .so file gets extracted during the runtime without any external
> > manipulation.
> >
> >
> > Thanks in advance.
> >
> > Regards,
> > Debaditya
>


Re: .so linkage error in Cluster

2016-07-26 Thread Ufuk Celebi
Yes, the BlobCache on each TaskManager node should fetch it from the
JobManager. How are you packaging your JAR?

On Tue, Jul 26, 2016 at 4:32 PM, Debaditya Roy  wrote:
> Hello users,
>
> I am having a problem while running my flink program in a cluster. It gives
> me an error that it is unable to find an .so file in a tmp directory.
>
> Caused by: java.lang.UnsatisfiedLinkError: no jniopencv_core in
> java.library.path
> at java.lang.ClassLoader.loadLibrary(ClassLoader.java:1867)
> at java.lang.Runtime.loadLibrary0(Runtime.java:870)
> at java.lang.System.loadLibrary(System.java:1122)
> at org.bytedeco.javacpp.Loader.loadLibrary(Loader.java:654)
> at org.bytedeco.javacpp.Loader.load(Loader.java:492)
> at org.bytedeco.javacpp.Loader.load(Loader.java:409)
> at org.bytedeco.javacpp.opencv_core.(opencv_core.java:10)
> at java.lang.Class.forName0(Native Method)
> at java.lang.Class.forName(Class.java:348)
> at org.bytedeco.javacpp.Loader.load(Loader.java:464)
> at org.bytedeco.javacpp.Loader.load(Loader.java:409)
> at
> org.bytedeco.javacpp.helper.opencv_core$AbstractArray.(opencv_core.java:109)
> at loc.video.FlinkStreamSource.run(FlinkStreamSource.java:95)
> at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:78)
> at
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:56)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.UnsatisfiedLinkError:
> /tmp/javacpp5400264496782/libjniopencv_core.so: libgomp.so.1: cannot open
> shared object file: No such file or directory
> at java.lang.ClassLoader$NativeLibrary.load(Native Method)
> at java.lang.ClassLoader.loadLibrary0(ClassLoader.java:1941)
> at java.lang.ClassLoader.loadLibrary(ClassLoader.java:1824)
> at java.lang.Runtime.load0(Runtime.java:809)
> at java.lang.System.load(System.java:1086)
> at org.bytedeco.javacpp.Loader.loadLibrary(Loader.java:637)
>
>
> I searched for the temp directory and in one of the nodes this directory and
> the .jar file was present. Is it required to have the file across all the
> nodes? If yes is there any way to control it? Since this tmp directory and
> the .so file gets extracted during the runtime without any external
> manipulation.
>
>
> Thanks in advance.
>
> Regards,
> Debaditya


.so linkage error in Cluster

2016-07-26 Thread Debaditya Roy
Hello users,

I am having a problem while running my flink program in a cluster. It gives
me an error that it is unable to find an .so file in a tmp directory.

Caused by: java.lang.UnsatisfiedLinkError: no jniopencv_core in
java.library.path
at java.lang.ClassLoader.loadLibrary(ClassLoader.java:1867)
at java.lang.Runtime.loadLibrary0(Runtime.java:870)
at java.lang.System.loadLibrary(System.java:1122)
at org.bytedeco.javacpp.Loader.loadLibrary(Loader.java:654)
at org.bytedeco.javacpp.Loader.load(Loader.java:492)
at org.bytedeco.javacpp.Loader.load(Loader.java:409)
at org.bytedeco.javacpp.opencv_core.(opencv_core.java:10)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at org.bytedeco.javacpp.Loader.load(Loader.java:464)
at org.bytedeco.javacpp.Loader.load(Loader.java:409)
at
org.bytedeco.javacpp.helper.opencv_core$AbstractArray.(opencv_core.java:109)
at loc.video.FlinkStreamSource.run(FlinkStreamSource.java:95)
at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:78)
at
org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:56)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.UnsatisfiedLinkError:
/tmp/javacpp5400264496782/libjniopencv_core.so: libgomp.so.1: cannot open
shared object file: No such file or directory
at java.lang.ClassLoader$NativeLibrary.load(Native Method)
at java.lang.ClassLoader.loadLibrary0(ClassLoader.java:1941)
at java.lang.ClassLoader.loadLibrary(ClassLoader.java:1824)
at java.lang.Runtime.load0(Runtime.java:809)
at java.lang.System.load(System.java:1086)
at org.bytedeco.javacpp.Loader.loadLibrary(Loader.java:637)


I searched for the temp directory and in one of the nodes this directory
and the .jar file was present. Is it required to have the file across all
the nodes? If yes is there any way to control it? Since this tmp directory
and the .so file gets extracted during the runtime without any external
manipulation.


Thanks in advance.

Regards,
Debaditya


Re: Supporting Multi-tenancy in a Flink

2016-07-26 Thread Gyula Fóra
Hi,

Well, what we are doing at King is trying to solve a similar problem. It
would be great if you could read the blogpost because it goes into detail
about the actual implementation but let me recap here quickly:

We are building a stream processing system that data scientists and other
developers share at King in a way that they can use it through a simple web
interface without knowing any operational details. The stream processing
system itself is One complex Flink job that receives both events and
executes the user scripts/jobs that are written in a higher lever dsl.

The DSL is designed so that we can execute the operations in a fixed
streaming topology instead of having to dynamically deploy new jobs for
every new scripts. Both scripts and events are sent through Kafka so this
makes our backend Flink job naturally multi-tenant. This is of course not
always appropriate as there is no resource isolation between individual
scripts but we can work around this by dedicating backend jobs to different
teams.

Let me know if this helps!
Gyula

Aparup Banerjee (apbanerj)  ezt írta (időpont: 2016.
júl. 26., K, 14:50):

> Thanks.
>
> Hi Gyula anything , you can share on this?
>
> Aparup
>
>
>
>
> On 7/26/16, 4:38 AM, "Ufuk Celebi"  wrote:
>
> >On Mon, Jul 25, 2016 at 5:38 AM, Aparup Banerjee (apbanerj)
> > wrote:
> >> We are building a Stream processing system using Apache beam on top of
> Flink
> >> using the Flink Runner. Our pipelines take Kafka streams as sources ,
> and
> >> can write to multiple sinks. The system needs to be tenant aware.
> Tenants
> >> can share same Kafka topic. Tenants can write their own pipelines. We am
> >> providing a small framework to write pipelines (on top of beam),  so we
> have
> >> control of what data stream is available to pipeline developer. I am
> looking
> >> for some strategies for following :
> >>
> >> How can I partition / group the data in a way that pipeline developers
> don’t
> >> need to care about tenancy , but the data integrity is maintained ?
> >> Ways in which I can assign compute(work nodes for e.g) to different jobs
> >> based on Tenant configuration.
> >
> >There is no built-in support for this in Flink, but King.com worked on
> >something similar using custom operators. You can check out the blog
> >post here:
> https://techblog.king.com/rbea-scalable-real-time-analytics-king/
> >
> >I'm pulling in Gyula (cc'd) who worked on the implementation at King...
>


Re: Supporting Multi-tenancy in a Flink

2016-07-26 Thread Aparup Banerjee (apbanerj)
Thanks. 

Hi Gyula anything , you can share on this?

Aparup




On 7/26/16, 4:38 AM, "Ufuk Celebi"  wrote:

>On Mon, Jul 25, 2016 at 5:38 AM, Aparup Banerjee (apbanerj)
> wrote:
>> We are building a Stream processing system using Apache beam on top of Flink
>> using the Flink Runner. Our pipelines take Kafka streams as sources , and
>> can write to multiple sinks. The system needs to be tenant aware. Tenants
>> can share same Kafka topic. Tenants can write their own pipelines. We am
>> providing a small framework to write pipelines (on top of beam),  so we have
>> control of what data stream is available to pipeline developer. I am looking
>> for some strategies for following :
>>
>> How can I partition / group the data in a way that pipeline developers don’t
>> need to care about tenancy , but the data integrity is maintained ?
>> Ways in which I can assign compute(work nodes for e.g) to different jobs
>> based on Tenant configuration.
>
>There is no built-in support for this in Flink, but King.com worked on
>something similar using custom operators. You can check out the blog
>post here: https://techblog.king.com/rbea-scalable-real-time-analytics-king/
>
>I'm pulling in Gyula (cc'd) who worked on the implementation at King...


Re: Question about Checkpoint Storage (RocksDB)

2016-07-26 Thread Ufuk Celebi
On Tue, Jul 26, 2016 at 2:15 PM, Sameer W  wrote:
> 1. Calling clear() on the KV state is only possible for snapshots right? Do
> you control that for checkpoints too.

What do you mean with snapshots vs. checkpoints exactly?

> 2. Assuming that the user has no control over the checkpoint process outside
> of controlling the checkpoint interval , when is the RocksDB cleared of the
> operator state for checkpoints after they are long past. It seems like there
> are only two checkpoints that are really necessary to maintain, the current
> one and the previous one for restore. Does Flink clean up checkpoints on a
> timer? When it does clean up checkpoints does it also clean up the state
> backend (I am assuming they are different).

Yes, here: 
https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/fault_tolerance.html

By default, only one completed checkpoint is kept.

> 3. The pre-aggregating windows was very helpful as the WindowFunction is now
> passed the pre-aggregated state. For windows, are the Reduce and Fold
> functions called on each element event before the window is triggered. I can
> see how that would work where the pre-compute is done per element but the
> actual output is emitted only when the window is fired. But that is only
> possible if there are no Evictors defined on the window? Also how are the
> elements fed to the Reduce/Fold function. Is it like MapReduce where even if
> you are using a Iterator, in reality all the values for a key are not
> buffered into memory? Which ties back to how is RocksDB is used to store a
> large window state before it is triggered. If my elements are accumulating
> in a window (serving a ReduceFunction) does it spill to disk (RocksDB?) when
> a threshold size is reached?

- The function is called before adding the element to the window KV state
- Yes, only possible if no evictors are defined
- The window reduce function is applied directly on the elements of
the stream and then update the KvState instance (e.g. update RocksDB)
- Operations with RocksDB always touch RocksDB, which takes care of
spilling etc.


Re: Question about Checkpoint Storage (RocksDB)

2016-07-26 Thread Sameer W
Thanks Ufuk,

That was very helpful. But that raised a few more questions :-):

1. Calling clear() on the KV state is only possible for snapshots right? Do
you control that for checkpoints too.

2. Assuming that the user has no control over the checkpoint process
outside of controlling the checkpoint interval , when is the RocksDB
cleared of the operator state for checkpoints after they are long past. It
seems like there are only two checkpoints that are really necessary to
maintain, the current one and the previous one for restore. Does Flink
clean up checkpoints on a timer? When it does clean up checkpoints does it
also clean up the state backend (I am assuming they are different).

3. The pre-aggregating windows was very helpful as the WindowFunction is
now passed the pre-aggregated state. For windows, are the Reduce and Fold
functions called on each element event before the window is triggered. I
can see how that would work where the pre-compute is done per element but
the actual output is emitted only when the window is fired. But that is
only possible if there are no Evictors defined on the window? Also how are
the elements fed to the Reduce/Fold function. Is it like MapReduce where
even if you are using a Iterator, in reality all the values for a key are
not buffered into memory? Which ties back to how is RocksDB is used to
store a large window state before it is triggered. If my elements are
accumulating in a window (serving a ReduceFunction) does it spill to disk
(RocksDB?) when a threshold size is reached?

Thanks,
Sameer



On Tue, Jul 26, 2016 at 7:29 AM, Ufuk Celebi  wrote:

> On Mon, Jul 25, 2016 at 8:50 PM, Sameer W  wrote:
> > The question is, if using really long windows (in hours) if the state of
> the
> > window gets very large over time, would size of the RocksDB get larger?
> > Would replication to HDFS start causing performance bottlenecks? Also
> would
> > this need a constant (at checkpoint interval?), read from RocksDB, add
> more
> > window elements and write to RocksDB.
>
> Yes. The size of the RocksDB instance is directly correlated with the
> number of K/V state pairs you store. You can remove state by calling
> `clear()` on the KvState instance.
>
> All state updates go directly to RocksDB and snapshots copy the DB
> files (semi-async mode, current default) or iterate-and-copy all K/V
> pairs (fully-async mode). No records are deleted automatically after
> snapshots.
>
> Snapshotting large RocksDB instances will cause some slow down, but
> you can trade this cost off by adjusting the checkpointing interval.
> There are plans to do the snapshots in an incremental fashion in order
> to lower the costs for this, but there is no design doc available for
> it at this point.
>
> > Outside of the read costs, is there a risk to having very long windows
> when
> > you know you could collect a lot of elements in them. Instead is it
> safer to
> > perform aggregations on top of aggregations or use your own custom remote
> > store like HBase to persist larger state per record and use windows only
> to
> > store the keys in HBase. I mention HBase because of its support for
> column
> > qualifiers allow elements to be added to the same key in multiple ordered
> > column qualifiers. Reading can also be throttled in batches of column
> > qualifiers allowing for the better memory consumption. Is this approach
> used
> > in practice?
>
> RocksDB works quite well for large stateful jobs. If possible for your
> use case, I would still recommend work with pre-aggregating window
> functions (
> https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/windows.html#window-functions
> )
> or pre-aggregating the data. The I/O costs will correlate with the
> state size, but there is "no risk" in the sense of that it will still
> work as expected.
>
> What you describe with HBase could work, but I'm not aware of someone
> doing this. Furhtermore, depending on your use case, it can cause
> problems in failure scenarios, because you might need to keep HBase
> and Flink state in sync.
>


Re: flink batch data processing

2016-07-26 Thread Ufuk Celebi
Are you using the DataSet or DataStream API?

Yes, most Flink transformations operate on single tuples, but you can
work around it:
- You could write a custom source function, which emits records that
contain X points
(https://ci.apache.org/projects/flink/flink-docs-master/apis/batch/index.html#data-sources)
- You can use a mapPartition
(https://ci.apache.org/projects/flink/flink-docs-master/apis/batch/dataset_transformations.html#mappartition)
or FlatMap 
(https://ci.apache.org/projects/flink/flink-docs-master/apis/batch/dataset_transformations.html#flatmap)
function and create the batches manually.

Does this help?

On Fri, Jul 22, 2016 at 7:21 PM, Paul Joireman  wrote:
> I'm evaluating for some processing batches of data.  As a simple example say
> I have 2000 points which I would like to pass through an FIR filter using
> functionality provided by the Python scipy libraryjk.  The scipy filter is a
> simple function which accepts a set of coefficients and the data to filter
> and returns the data.   Is is possible to create a transformation to handle
> this in flink?  It seems flink transformations are applied on a point by
> point basis but I may be missing something.
>
> Paul


Re: Question about Apache Flink Use Case

2016-07-26 Thread Kostas Kloudas
Hi Suma Cherukuri,

I also replied to your question in the dev list, but I repeat the answer here
just in case you missed in. 

From what I understand you have many small files and you want to 
aggregate them into bigger ones containing the logs of the last 24h.

As Max said RollingSinks will allow you to have exactly-once semantics
when writing your aggregated results to your FS.

As far as reading your input is concerned, Flink recently
integrated functionality to periodically monitor a directory, e.g. your
log directory, and process only the new files as they appear.

This will be part of the 1.1 release which is coming possibly during this 
week or the next, but you can always find it on the master branch.

The method that you need is:

readFile(FileInputFormat inputFormat,
String filePath,
FileProcessingMode watchType,
long interval,
FilePathFilter filter,
TypeInformation typeInformation)

which allows you to specify the FileProcessingMode (which you should set to
FileProcessingMode.PROCESS_CONTINUOUSLY) and the “interval” at which 
Flink is going to monitor the directory (path) for new files. 

In addition you can find some helper methods in the StreamExecutionEnvironment 
class that allow you to avoid specifying some parameters.

I believe that with the above two features (RollingSink and 
ContinuousMonitoring source)
Link can be the tool for your job, as both of them also provide exactly-once 
guarantees.

I hope this helps.

Let us know what you think,
Kostas

> On Jul 22, 2016, at 9:03 PM, Suma Cherukuri  
> wrote:
> 
> Hi,
> 
> Good Afternoon!
> 
> I work as an engineer at Symantec. My team works on Multi-tenant Event 
> Processing System. Just a high level background, our customers write data to 
> kafka brokers though agents like logstash and we process the events and save 
> the log data in Elastic Search and S3.
> 
> Use Case: We have a use case where in we write batches of events to S3 when 
> file size limitation of 1MB (specific to our case) or a certain time 
> threshold is reached.  We are planning on merging the number of files 
> specific to a folder into one single file based on either time limit such as 
> every 24 hrs.
> 
> We were considering various options available today and would like to know if 
> Apache Flink can be used to serve the purpose.
> 
> Looking forward to hearing from you.
> 
> Thank you
> Suma Cherukuri
> 



Re: Supporting Multi-tenancy in a Flink

2016-07-26 Thread Ufuk Celebi
On Mon, Jul 25, 2016 at 5:38 AM, Aparup Banerjee (apbanerj)
 wrote:
> We are building a Stream processing system using Apache beam on top of Flink
> using the Flink Runner. Our pipelines take Kafka streams as sources , and
> can write to multiple sinks. The system needs to be tenant aware. Tenants
> can share same Kafka topic. Tenants can write their own pipelines. We am
> providing a small framework to write pipelines (on top of beam),  so we have
> control of what data stream is available to pipeline developer. I am looking
> for some strategies for following :
>
> How can I partition / group the data in a way that pipeline developers don’t
> need to care about tenancy , but the data integrity is maintained ?
> Ways in which I can assign compute(work nodes for e.g) to different jobs
> based on Tenant configuration.

There is no built-in support for this in Flink, but King.com worked on
something similar using custom operators. You can check out the blog
post here: https://techblog.king.com/rbea-scalable-real-time-analytics-king/

I'm pulling in Gyula (cc'd) who worked on the implementation at King...


Re: dynamic streams and patterns

2016-07-26 Thread Ufuk Celebi
On Mon, Jul 25, 2016 at 10:09 AM, Claudia Wegmann  wrote:
> To 3) Would an approach similar to King/RBEA even be possible combined with
> Flink CEP? As I understand, Patterns have to be defined in Java code and
> therefore have to be recompiled? Do I overlook something important?

Pulling in Till (cc'd) who worked on the CEP library.


Re: Question about Checkpoint Storage (RocksDB)

2016-07-26 Thread Ufuk Celebi
On Mon, Jul 25, 2016 at 8:50 PM, Sameer W  wrote:
> The question is, if using really long windows (in hours) if the state of the
> window gets very large over time, would size of the RocksDB get larger?
> Would replication to HDFS start causing performance bottlenecks? Also would
> this need a constant (at checkpoint interval?), read from RocksDB, add more
> window elements and write to RocksDB.

Yes. The size of the RocksDB instance is directly correlated with the
number of K/V state pairs you store. You can remove state by calling
`clear()` on the KvState instance.

All state updates go directly to RocksDB and snapshots copy the DB
files (semi-async mode, current default) or iterate-and-copy all K/V
pairs (fully-async mode). No records are deleted automatically after
snapshots.

Snapshotting large RocksDB instances will cause some slow down, but
you can trade this cost off by adjusting the checkpointing interval.
There are plans to do the snapshots in an incremental fashion in order
to lower the costs for this, but there is no design doc available for
it at this point.

> Outside of the read costs, is there a risk to having very long windows when
> you know you could collect a lot of elements in them. Instead is it safer to
> perform aggregations on top of aggregations or use your own custom remote
> store like HBase to persist larger state per record and use windows only to
> store the keys in HBase. I mention HBase because of its support for column
> qualifiers allow elements to be added to the same key in multiple ordered
> column qualifiers. Reading can also be throttled in batches of column
> qualifiers allowing for the better memory consumption. Is this approach used
> in practice?

RocksDB works quite well for large stateful jobs. If possible for your
use case, I would still recommend work with pre-aggregating window
functions 
(https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/windows.html#window-functions)
or pre-aggregating the data. The I/O costs will correlate with the
state size, but there is "no risk" in the sense of that it will still
work as expected.

What you describe with HBase could work, but I'm not aware of someone
doing this. Furhtermore, depending on your use case, it can cause
problems in failure scenarios, because you might need to keep HBase
and Flink state in sync.


tumbling time window, date boundary and timezone

2016-07-26 Thread Hironori Ogibayashi
Hello,

I want to calculate daily access count using Flink streaming.
Flink's TumblingProcessingTimeWindow assigns events to windows of
00:00 GMT to 23:59 GMT each day, but I live in Japan (GMT+09:00) and
want date boundaries to be 09:00 GMT (00:00 JST).
Do I have to implement my own WindowAssigner for this use case?

Thanks,
Hironori Ogibayashi


Re: Performance issues with GroupBy?

2016-07-26 Thread Ufuk Celebi
+1 to what Gavor said. The hash combine will be part of the upcoming
1.1. release, too.

This could be further amplified by the blocking intermediate results,
which have a very simplistic implementation writing out many different
files, which can lead to a lot of random I/O.

– Ufuk

On Tue, Jul 26, 2016 at 11:41 AM, Gábor Gévay  wrote:
> Hello Robert,
>
>> Is there something I might could do to optimize the grouping?
>
> You can try to make your `RichGroupReduceFunction` implement the
> `GroupCombineFunction` interface, so that Flink can do combining
> before the shuffle, which might significantly reduce the network load.
> (How much the combiner helps the performance can greatly depend on how
> large are your groups on average.)
>
> Alternatively, if you can reformulate your algorithm to use a `reduce`
> instead of a `reduceGroup` that might also improve the performance.
> Also, if you are using a `reduce`, then you can try calling
> `.setCombineHint(CombineHint.HASH)` after the reduce. (The combine
> hint is a relatively new feature, so you need the current master for
> this.)
>
> Best,
> Gábor
>
>
>
> 2016-07-25 14:06 GMT+02:00 Paschek, Robert :
>> Hi Mailing List,
>>
>>
>>
>> i actually do some benchmarks with different algorithms. The System has 8
>> nodes and a configured parallelism of 48 - The IBM-Power-1 cluster, if
>> somebody from the TU Berlin read this : - ) – and to “simulate” Hadoop
>> MapReduce, the execution mode is set to “BATCH_FORCED”
>>
>>
>>
>> It is suspicious, that three of the six algorithms had a big gap in runtime
>> (5000ms vs. 2ms) for easy (low dim) tuple. Additionally the algorithms
>> in the “upper” group using a groupBy transformation and the algorithms in
>> the “lower” group don’t use groupBy.
>>
>> I attached the plot for better visualization.
>>
>>
>>
>> I also checked the logs, especially the time, when the mappers finishing and
>> the reducers start _iterating_ - they hardened my speculation.
>>
>>
>>
>> So my question is, if it is “normal”, that grouping is so cost-intensive
>> that – in my case – the runtime increases by 4 times?
>>
>> I have data from the same experiments running on a 13 nodes cluster with 26
>> cores with Apache Hadoop MapReduce, where the gap is still present, but
>> smaller (50s vs 57s or 55s vs 65s).
>>
>>
>>
>> Is there something I might could do to optimize the grouping? Some
>> codesnipplets:
>>
>>
>>
>> The Job:
>> DataSet output = input
>>
>> .mapPartition(new
>> MR_GPMRS_Mapper()).withBroadcastSet(metaData,
>> "MetaData").withParameters(parameters).name(MR_GPMRS_OPTIMIZED.class.getSimpleName()+"_MAPPER")
>>
>> .groupBy(0)
>>
>> .reduceGroup(new
>> MR_GPMRS_Reducer()).returns(input.getType()).withBroadcastSet(metaData,
>> "MetaData").withParameters(parameters).name(MR_GPMRS_OPTIMIZED.class.getSimpleName()+"_REDUCER");
>>
>>
>>
>> MR_GPMRS_Mapper():
>>
>> public class MR_GPMRS_Mapper  extends
>> RichMapPartitionFunction,
>> BitSet, BitSet>>>
>>
>>
>>
>> MR_GPMRS_Reducer():
>>
>> public class MR_GPMRS_Reducer  extends
>> RichGroupReduceFunction,
>> BitSet, BitSet>>, T>
>>
>>
>>
>> The Tuple2 has as Payload on position f1 the Tuple3 and on position f0 the
>> Integer Key for grouping.
>>
>>
>>
>> Any suggestions (or comments, that it is a “normal” behaviour) are welcome :
>> - )
>>
>>
>>
>> Thank you in advance!
>>
>> Robert


Re: Performance issues with GroupBy?

2016-07-26 Thread Gábor Gévay
Hello Robert,

> Is there something I might could do to optimize the grouping?

You can try to make your `RichGroupReduceFunction` implement the
`GroupCombineFunction` interface, so that Flink can do combining
before the shuffle, which might significantly reduce the network load.
(How much the combiner helps the performance can greatly depend on how
large are your groups on average.)

Alternatively, if you can reformulate your algorithm to use a `reduce`
instead of a `reduceGroup` that might also improve the performance.
Also, if you are using a `reduce`, then you can try calling
`.setCombineHint(CombineHint.HASH)` after the reduce. (The combine
hint is a relatively new feature, so you need the current master for
this.)

Best,
Gábor



2016-07-25 14:06 GMT+02:00 Paschek, Robert :
> Hi Mailing List,
>
>
>
> i actually do some benchmarks with different algorithms. The System has 8
> nodes and a configured parallelism of 48 - The IBM-Power-1 cluster, if
> somebody from the TU Berlin read this : - ) – and to “simulate” Hadoop
> MapReduce, the execution mode is set to “BATCH_FORCED”
>
>
>
> It is suspicious, that three of the six algorithms had a big gap in runtime
> (5000ms vs. 2ms) for easy (low dim) tuple. Additionally the algorithms
> in the “upper” group using a groupBy transformation and the algorithms in
> the “lower” group don’t use groupBy.
>
> I attached the plot for better visualization.
>
>
>
> I also checked the logs, especially the time, when the mappers finishing and
> the reducers start _iterating_ - they hardened my speculation.
>
>
>
> So my question is, if it is “normal”, that grouping is so cost-intensive
> that – in my case – the runtime increases by 4 times?
>
> I have data from the same experiments running on a 13 nodes cluster with 26
> cores with Apache Hadoop MapReduce, where the gap is still present, but
> smaller (50s vs 57s or 55s vs 65s).
>
>
>
> Is there something I might could do to optimize the grouping? Some
> codesnipplets:
>
>
>
> The Job:
> DataSet output = input
>
> .mapPartition(new
> MR_GPMRS_Mapper()).withBroadcastSet(metaData,
> "MetaData").withParameters(parameters).name(MR_GPMRS_OPTIMIZED.class.getSimpleName()+"_MAPPER")
>
> .groupBy(0)
>
> .reduceGroup(new
> MR_GPMRS_Reducer()).returns(input.getType()).withBroadcastSet(metaData,
> "MetaData").withParameters(parameters).name(MR_GPMRS_OPTIMIZED.class.getSimpleName()+"_REDUCER");
>
>
>
> MR_GPMRS_Mapper():
>
> public class MR_GPMRS_Mapper  extends
> RichMapPartitionFunction,
> BitSet, BitSet>>>
>
>
>
> MR_GPMRS_Reducer():
>
> public class MR_GPMRS_Reducer  extends
> RichGroupReduceFunction,
> BitSet, BitSet>>, T>
>
>
>
> The Tuple2 has as Payload on position f1 the Tuple3 and on position f0 the
> Integer Key for grouping.
>
>
>
> Any suggestions (or comments, that it is a “normal” behaviour) are welcome :
> - )
>
>
>
> Thank you in advance!
>
> Robert