CEP and Within Clause
Hi, It looks like the WithIn clause of CEP uses Tumbling Windows. I could get it to use Sliding windows by using an upstream pipeline which uses Sliding Windows and produces repeating elements (in each sliding window) and applying a Watermark assigner on the resulting stream with elements duplicated. I wanted to use the "followedBy" pattern where there is a strong need for sliding windows. Is there a plan to add sliding windows to the within clause at some point? The PatternStream class's "select" and "flatSelect" have overloaded versions which take PatternTimeOut variable. Is there a way to insert some of those elements back to the front of the stream. Say I am trying to find a pattern where two temperature readings >150 within 6 second window should raise an alert. If only one was found, can I insert that one back in the front of the stream on that task node (for that window pane) so that I can find a pattern match in the events occurring in the next 6 seconds. If I can do that, I don't need sliding windows. Else I cannot avoid using them for such scenarios. Thanks, Sameer
stop then start the job, how to load latest checkpoints automatically?
I want to load previous states and I understand I could do this with specifying a savepoints. Is there a way to do this automatically, given I do not change my code (jar)? -- Cheers, Shaosu
Re: .so linkage error in Cluster
Out of curiosity I've tried this locally by adding the following dependencies to my Maven project: org.bytedeco javacpp 1.2.2 org.bytedeco.javacpp-presets opencv 3.1.0-1.2 With this, running mvn clean package works as expected. On Tue, Jul 26, 2016 at 7:09 PM, Ufuk Celebiwrote: > What error message to you get from Maven? > > On Tue, Jul 26, 2016 at 4:39 PM, Debaditya Roy wrote: >> Hello, >> >> I am using the jar builder from IntelliJ IDE (the mvn one was causing >> problems). After that I executed it successfully locally. But in remote it >> is causing problem. >> >> Warm Regards, >> Debaditya >> >> On Tue, Jul 26, 2016 at 4:36 PM, Ufuk Celebi wrote: >>> >>> Yes, the BlobCache on each TaskManager node should fetch it from the >>> JobManager. How are you packaging your JAR? >>> >>> On Tue, Jul 26, 2016 at 4:32 PM, Debaditya Roy >>> wrote: >>> > Hello users, >>> > >>> > I am having a problem while running my flink program in a cluster. It >>> > gives >>> > me an error that it is unable to find an .so file in a tmp directory. >>> > >>> > Caused by: java.lang.UnsatisfiedLinkError: no jniopencv_core in >>> > java.library.path >>> > at java.lang.ClassLoader.loadLibrary(ClassLoader.java:1867) >>> > at java.lang.Runtime.loadLibrary0(Runtime.java:870) >>> > at java.lang.System.loadLibrary(System.java:1122) >>> > at org.bytedeco.javacpp.Loader.loadLibrary(Loader.java:654) >>> > at org.bytedeco.javacpp.Loader.load(Loader.java:492) >>> > at org.bytedeco.javacpp.Loader.load(Loader.java:409) >>> > at org.bytedeco.javacpp.opencv_core.(opencv_core.java:10) >>> > at java.lang.Class.forName0(Native Method) >>> > at java.lang.Class.forName(Class.java:348) >>> > at org.bytedeco.javacpp.Loader.load(Loader.java:464) >>> > at org.bytedeco.javacpp.Loader.load(Loader.java:409) >>> > at >>> > >>> > org.bytedeco.javacpp.helper.opencv_core$AbstractArray.(opencv_core.java:109) >>> > at loc.video.FlinkStreamSource.run(FlinkStreamSource.java:95) >>> > at >>> > >>> > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:78) >>> > at >>> > >>> > org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:56) >>> > at >>> > >>> > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225) >>> > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) >>> > at java.lang.Thread.run(Thread.java:745) >>> > Caused by: java.lang.UnsatisfiedLinkError: >>> > /tmp/javacpp5400264496782/libjniopencv_core.so: libgomp.so.1: cannot >>> > open >>> > shared object file: No such file or directory >>> > at java.lang.ClassLoader$NativeLibrary.load(Native Method) >>> > at java.lang.ClassLoader.loadLibrary0(ClassLoader.java:1941) >>> > at java.lang.ClassLoader.loadLibrary(ClassLoader.java:1824) >>> > at java.lang.Runtime.load0(Runtime.java:809) >>> > at java.lang.System.load(System.java:1086) >>> > at org.bytedeco.javacpp.Loader.loadLibrary(Loader.java:637) >>> > >>> > >>> > I searched for the temp directory and in one of the nodes this directory >>> > and >>> > the .jar file was present. Is it required to have the file across all >>> > the >>> > nodes? If yes is there any way to control it? Since this tmp directory >>> > and >>> > the .so file gets extracted during the runtime without any external >>> > manipulation. >>> > >>> > >>> > Thanks in advance. >>> > >>> > Regards, >>> > Debaditya >> >>
Re: .so linkage error in Cluster
What error message to you get from Maven? On Tue, Jul 26, 2016 at 4:39 PM, Debaditya Roywrote: > Hello, > > I am using the jar builder from IntelliJ IDE (the mvn one was causing > problems). After that I executed it successfully locally. But in remote it > is causing problem. > > Warm Regards, > Debaditya > > On Tue, Jul 26, 2016 at 4:36 PM, Ufuk Celebi wrote: >> >> Yes, the BlobCache on each TaskManager node should fetch it from the >> JobManager. How are you packaging your JAR? >> >> On Tue, Jul 26, 2016 at 4:32 PM, Debaditya Roy >> wrote: >> > Hello users, >> > >> > I am having a problem while running my flink program in a cluster. It >> > gives >> > me an error that it is unable to find an .so file in a tmp directory. >> > >> > Caused by: java.lang.UnsatisfiedLinkError: no jniopencv_core in >> > java.library.path >> > at java.lang.ClassLoader.loadLibrary(ClassLoader.java:1867) >> > at java.lang.Runtime.loadLibrary0(Runtime.java:870) >> > at java.lang.System.loadLibrary(System.java:1122) >> > at org.bytedeco.javacpp.Loader.loadLibrary(Loader.java:654) >> > at org.bytedeco.javacpp.Loader.load(Loader.java:492) >> > at org.bytedeco.javacpp.Loader.load(Loader.java:409) >> > at org.bytedeco.javacpp.opencv_core.(opencv_core.java:10) >> > at java.lang.Class.forName0(Native Method) >> > at java.lang.Class.forName(Class.java:348) >> > at org.bytedeco.javacpp.Loader.load(Loader.java:464) >> > at org.bytedeco.javacpp.Loader.load(Loader.java:409) >> > at >> > >> > org.bytedeco.javacpp.helper.opencv_core$AbstractArray.(opencv_core.java:109) >> > at loc.video.FlinkStreamSource.run(FlinkStreamSource.java:95) >> > at >> > >> > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:78) >> > at >> > >> > org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:56) >> > at >> > >> > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225) >> > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) >> > at java.lang.Thread.run(Thread.java:745) >> > Caused by: java.lang.UnsatisfiedLinkError: >> > /tmp/javacpp5400264496782/libjniopencv_core.so: libgomp.so.1: cannot >> > open >> > shared object file: No such file or directory >> > at java.lang.ClassLoader$NativeLibrary.load(Native Method) >> > at java.lang.ClassLoader.loadLibrary0(ClassLoader.java:1941) >> > at java.lang.ClassLoader.loadLibrary(ClassLoader.java:1824) >> > at java.lang.Runtime.load0(Runtime.java:809) >> > at java.lang.System.load(System.java:1086) >> > at org.bytedeco.javacpp.Loader.loadLibrary(Loader.java:637) >> > >> > >> > I searched for the temp directory and in one of the nodes this directory >> > and >> > the .jar file was present. Is it required to have the file across all >> > the >> > nodes? If yes is there any way to control it? Since this tmp directory >> > and >> > the .so file gets extracted during the runtime without any external >> > manipulation. >> > >> > >> > Thanks in advance. >> > >> > Regards, >> > Debaditya > >
Re: Performance issues with GroupBy?
Hi Robert, Are you able to simplify the your function input / output types? Flink aggressively serializes the data stream and complex types such as ArrayList and BitSet will be much slower to process. Are you able to reconstruct the lists to be groupings on elements? Greg On Mon, Jul 25, 2016 at 8:06 AM, Paschek, Robert < robert.pasc...@tu-berlin.de> wrote: > Hi Mailing List, > > > > i actually do some benchmarks with different algorithms. The System has 8 > nodes and a configured parallelism of 48 - The IBM-Power-1 cluster, if > somebody from the TU Berlin read this : - ) – and to “simulate” Hadoop > MapReduce, the execution mode is set to “BATCH_FORCED” > > > > It is suspicious, that three of the six algorithms had a big gap in > runtime (5000ms vs. 2ms) for easy (low dim) tuple. Additionally the > algorithms in the “upper” group using a groupBy transformation and the > algorithms in the “lower” group don’t use groupBy. > > I attached the plot for better visualization. > > > > I also checked the logs, especially the time, when the mappers finishing > and the reducers start _*iterating*_ - they hardened my speculation. > > > > So my question is, if it is “normal”, that grouping is so cost-intensive > that – in my case – the runtime increases by 4 times? > > I have data from the same experiments running on a 13 nodes cluster with > 26 cores with Apache Hadoop MapReduce, where the gap is still present, but > smaller (50s vs 57s or 55s vs 65s). > > > > Is there something I might could do to optimize the grouping? Some > codesnipplets: > > > > The Job: > DataSet output = input > > .mapPartition(*new* > MR_GPMRS_Mapper()).withBroadcastSet(metaData, "MetaData").withParameters( > parameters).name(MR_GPMRS_OPTIMIZED.*class*.getSimpleName()+"_MAPPER") > > .groupBy(0) > > .reduceGroup(*new* MR_GPMRS_Reducer()).returns( > input.getType()).withBroadcastSet(metaData, "MetaData").withParameters( > parameters).name(MR_GPMRS_OPTIMIZED.*class*.getSimpleName()+"_REDUCER"); > > > > MR_GPMRS_Mapper(): > > *public* *class* MR_GPMRS_Mapper *extends* > RichMapPartitionFunction, > BitSet, BitSet>>> > > > > MR_GPMRS_Reducer(): > > *public* *class* MR_GPMRS_Reducer *extends* > RichGroupReduceFunction , > BitSet, BitSet>>, T> > > > > The Tuple2 has as Payload on position f1 the Tuple3 and on position f0 the > Integer Key for grouping. > > > > Any suggestions (or comments, that it is a “normal” behaviour) are welcome > : - ) > > > > Thank you in advance! > > Robert >
Re: .so linkage error in Cluster
Hello, I am using the jar builder from IntelliJ IDE (the mvn one was causing problems). After that I executed it successfully locally. But in remote it is causing problem. Warm Regards, Debaditya On Tue, Jul 26, 2016 at 4:36 PM, Ufuk Celebiwrote: > Yes, the BlobCache on each TaskManager node should fetch it from the > JobManager. How are you packaging your JAR? > > On Tue, Jul 26, 2016 at 4:32 PM, Debaditya Roy > wrote: > > Hello users, > > > > I am having a problem while running my flink program in a cluster. It > gives > > me an error that it is unable to find an .so file in a tmp directory. > > > > Caused by: java.lang.UnsatisfiedLinkError: no jniopencv_core in > > java.library.path > > at java.lang.ClassLoader.loadLibrary(ClassLoader.java:1867) > > at java.lang.Runtime.loadLibrary0(Runtime.java:870) > > at java.lang.System.loadLibrary(System.java:1122) > > at org.bytedeco.javacpp.Loader.loadLibrary(Loader.java:654) > > at org.bytedeco.javacpp.Loader.load(Loader.java:492) > > at org.bytedeco.javacpp.Loader.load(Loader.java:409) > > at org.bytedeco.javacpp.opencv_core.(opencv_core.java:10) > > at java.lang.Class.forName0(Native Method) > > at java.lang.Class.forName(Class.java:348) > > at org.bytedeco.javacpp.Loader.load(Loader.java:464) > > at org.bytedeco.javacpp.Loader.load(Loader.java:409) > > at > > > org.bytedeco.javacpp.helper.opencv_core$AbstractArray.(opencv_core.java:109) > > at loc.video.FlinkStreamSource.run(FlinkStreamSource.java:95) > > at > > > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:78) > > at > > > org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:56) > > at > > > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225) > > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) > > at java.lang.Thread.run(Thread.java:745) > > Caused by: java.lang.UnsatisfiedLinkError: > > /tmp/javacpp5400264496782/libjniopencv_core.so: libgomp.so.1: cannot open > > shared object file: No such file or directory > > at java.lang.ClassLoader$NativeLibrary.load(Native Method) > > at java.lang.ClassLoader.loadLibrary0(ClassLoader.java:1941) > > at java.lang.ClassLoader.loadLibrary(ClassLoader.java:1824) > > at java.lang.Runtime.load0(Runtime.java:809) > > at java.lang.System.load(System.java:1086) > > at org.bytedeco.javacpp.Loader.loadLibrary(Loader.java:637) > > > > > > I searched for the temp directory and in one of the nodes this directory > and > > the .jar file was present. Is it required to have the file across all the > > nodes? If yes is there any way to control it? Since this tmp directory > and > > the .so file gets extracted during the runtime without any external > > manipulation. > > > > > > Thanks in advance. > > > > Regards, > > Debaditya >
Re: .so linkage error in Cluster
Yes, the BlobCache on each TaskManager node should fetch it from the JobManager. How are you packaging your JAR? On Tue, Jul 26, 2016 at 4:32 PM, Debaditya Roywrote: > Hello users, > > I am having a problem while running my flink program in a cluster. It gives > me an error that it is unable to find an .so file in a tmp directory. > > Caused by: java.lang.UnsatisfiedLinkError: no jniopencv_core in > java.library.path > at java.lang.ClassLoader.loadLibrary(ClassLoader.java:1867) > at java.lang.Runtime.loadLibrary0(Runtime.java:870) > at java.lang.System.loadLibrary(System.java:1122) > at org.bytedeco.javacpp.Loader.loadLibrary(Loader.java:654) > at org.bytedeco.javacpp.Loader.load(Loader.java:492) > at org.bytedeco.javacpp.Loader.load(Loader.java:409) > at org.bytedeco.javacpp.opencv_core.(opencv_core.java:10) > at java.lang.Class.forName0(Native Method) > at java.lang.Class.forName(Class.java:348) > at org.bytedeco.javacpp.Loader.load(Loader.java:464) > at org.bytedeco.javacpp.Loader.load(Loader.java:409) > at > org.bytedeco.javacpp.helper.opencv_core$AbstractArray.(opencv_core.java:109) > at loc.video.FlinkStreamSource.run(FlinkStreamSource.java:95) > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:78) > at > org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:56) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.lang.UnsatisfiedLinkError: > /tmp/javacpp5400264496782/libjniopencv_core.so: libgomp.so.1: cannot open > shared object file: No such file or directory > at java.lang.ClassLoader$NativeLibrary.load(Native Method) > at java.lang.ClassLoader.loadLibrary0(ClassLoader.java:1941) > at java.lang.ClassLoader.loadLibrary(ClassLoader.java:1824) > at java.lang.Runtime.load0(Runtime.java:809) > at java.lang.System.load(System.java:1086) > at org.bytedeco.javacpp.Loader.loadLibrary(Loader.java:637) > > > I searched for the temp directory and in one of the nodes this directory and > the .jar file was present. Is it required to have the file across all the > nodes? If yes is there any way to control it? Since this tmp directory and > the .so file gets extracted during the runtime without any external > manipulation. > > > Thanks in advance. > > Regards, > Debaditya
.so linkage error in Cluster
Hello users, I am having a problem while running my flink program in a cluster. It gives me an error that it is unable to find an .so file in a tmp directory. Caused by: java.lang.UnsatisfiedLinkError: no jniopencv_core in java.library.path at java.lang.ClassLoader.loadLibrary(ClassLoader.java:1867) at java.lang.Runtime.loadLibrary0(Runtime.java:870) at java.lang.System.loadLibrary(System.java:1122) at org.bytedeco.javacpp.Loader.loadLibrary(Loader.java:654) at org.bytedeco.javacpp.Loader.load(Loader.java:492) at org.bytedeco.javacpp.Loader.load(Loader.java:409) at org.bytedeco.javacpp.opencv_core.(opencv_core.java:10) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:348) at org.bytedeco.javacpp.Loader.load(Loader.java:464) at org.bytedeco.javacpp.Loader.load(Loader.java:409) at org.bytedeco.javacpp.helper.opencv_core$AbstractArray.(opencv_core.java:109) at loc.video.FlinkStreamSource.run(FlinkStreamSource.java:95) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:78) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:56) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.UnsatisfiedLinkError: /tmp/javacpp5400264496782/libjniopencv_core.so: libgomp.so.1: cannot open shared object file: No such file or directory at java.lang.ClassLoader$NativeLibrary.load(Native Method) at java.lang.ClassLoader.loadLibrary0(ClassLoader.java:1941) at java.lang.ClassLoader.loadLibrary(ClassLoader.java:1824) at java.lang.Runtime.load0(Runtime.java:809) at java.lang.System.load(System.java:1086) at org.bytedeco.javacpp.Loader.loadLibrary(Loader.java:637) I searched for the temp directory and in one of the nodes this directory and the .jar file was present. Is it required to have the file across all the nodes? If yes is there any way to control it? Since this tmp directory and the .so file gets extracted during the runtime without any external manipulation. Thanks in advance. Regards, Debaditya
Re: Supporting Multi-tenancy in a Flink
Hi, Well, what we are doing at King is trying to solve a similar problem. It would be great if you could read the blogpost because it goes into detail about the actual implementation but let me recap here quickly: We are building a stream processing system that data scientists and other developers share at King in a way that they can use it through a simple web interface without knowing any operational details. The stream processing system itself is One complex Flink job that receives both events and executes the user scripts/jobs that are written in a higher lever dsl. The DSL is designed so that we can execute the operations in a fixed streaming topology instead of having to dynamically deploy new jobs for every new scripts. Both scripts and events are sent through Kafka so this makes our backend Flink job naturally multi-tenant. This is of course not always appropriate as there is no resource isolation between individual scripts but we can work around this by dedicating backend jobs to different teams. Let me know if this helps! Gyula Aparup Banerjee (apbanerj)ezt írta (időpont: 2016. júl. 26., K, 14:50): > Thanks. > > Hi Gyula anything , you can share on this? > > Aparup > > > > > On 7/26/16, 4:38 AM, "Ufuk Celebi" wrote: > > >On Mon, Jul 25, 2016 at 5:38 AM, Aparup Banerjee (apbanerj) > > wrote: > >> We are building a Stream processing system using Apache beam on top of > Flink > >> using the Flink Runner. Our pipelines take Kafka streams as sources , > and > >> can write to multiple sinks. The system needs to be tenant aware. > Tenants > >> can share same Kafka topic. Tenants can write their own pipelines. We am > >> providing a small framework to write pipelines (on top of beam), so we > have > >> control of what data stream is available to pipeline developer. I am > looking > >> for some strategies for following : > >> > >> How can I partition / group the data in a way that pipeline developers > don’t > >> need to care about tenancy , but the data integrity is maintained ? > >> Ways in which I can assign compute(work nodes for e.g) to different jobs > >> based on Tenant configuration. > > > >There is no built-in support for this in Flink, but King.com worked on > >something similar using custom operators. You can check out the blog > >post here: > https://techblog.king.com/rbea-scalable-real-time-analytics-king/ > > > >I'm pulling in Gyula (cc'd) who worked on the implementation at King... >
Re: Supporting Multi-tenancy in a Flink
Thanks. Hi Gyula anything , you can share on this? Aparup On 7/26/16, 4:38 AM, "Ufuk Celebi"wrote: >On Mon, Jul 25, 2016 at 5:38 AM, Aparup Banerjee (apbanerj) > wrote: >> We are building a Stream processing system using Apache beam on top of Flink >> using the Flink Runner. Our pipelines take Kafka streams as sources , and >> can write to multiple sinks. The system needs to be tenant aware. Tenants >> can share same Kafka topic. Tenants can write their own pipelines. We am >> providing a small framework to write pipelines (on top of beam), so we have >> control of what data stream is available to pipeline developer. I am looking >> for some strategies for following : >> >> How can I partition / group the data in a way that pipeline developers don’t >> need to care about tenancy , but the data integrity is maintained ? >> Ways in which I can assign compute(work nodes for e.g) to different jobs >> based on Tenant configuration. > >There is no built-in support for this in Flink, but King.com worked on >something similar using custom operators. You can check out the blog >post here: https://techblog.king.com/rbea-scalable-real-time-analytics-king/ > >I'm pulling in Gyula (cc'd) who worked on the implementation at King...
Re: Question about Checkpoint Storage (RocksDB)
On Tue, Jul 26, 2016 at 2:15 PM, Sameer Wwrote: > 1. Calling clear() on the KV state is only possible for snapshots right? Do > you control that for checkpoints too. What do you mean with snapshots vs. checkpoints exactly? > 2. Assuming that the user has no control over the checkpoint process outside > of controlling the checkpoint interval , when is the RocksDB cleared of the > operator state for checkpoints after they are long past. It seems like there > are only two checkpoints that are really necessary to maintain, the current > one and the previous one for restore. Does Flink clean up checkpoints on a > timer? When it does clean up checkpoints does it also clean up the state > backend (I am assuming they are different). Yes, here: https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/fault_tolerance.html By default, only one completed checkpoint is kept. > 3. The pre-aggregating windows was very helpful as the WindowFunction is now > passed the pre-aggregated state. For windows, are the Reduce and Fold > functions called on each element event before the window is triggered. I can > see how that would work where the pre-compute is done per element but the > actual output is emitted only when the window is fired. But that is only > possible if there are no Evictors defined on the window? Also how are the > elements fed to the Reduce/Fold function. Is it like MapReduce where even if > you are using a Iterator, in reality all the values for a key are not > buffered into memory? Which ties back to how is RocksDB is used to store a > large window state before it is triggered. If my elements are accumulating > in a window (serving a ReduceFunction) does it spill to disk (RocksDB?) when > a threshold size is reached? - The function is called before adding the element to the window KV state - Yes, only possible if no evictors are defined - The window reduce function is applied directly on the elements of the stream and then update the KvState instance (e.g. update RocksDB) - Operations with RocksDB always touch RocksDB, which takes care of spilling etc.
Re: Question about Checkpoint Storage (RocksDB)
Thanks Ufuk, That was very helpful. But that raised a few more questions :-): 1. Calling clear() on the KV state is only possible for snapshots right? Do you control that for checkpoints too. 2. Assuming that the user has no control over the checkpoint process outside of controlling the checkpoint interval , when is the RocksDB cleared of the operator state for checkpoints after they are long past. It seems like there are only two checkpoints that are really necessary to maintain, the current one and the previous one for restore. Does Flink clean up checkpoints on a timer? When it does clean up checkpoints does it also clean up the state backend (I am assuming they are different). 3. The pre-aggregating windows was very helpful as the WindowFunction is now passed the pre-aggregated state. For windows, are the Reduce and Fold functions called on each element event before the window is triggered. I can see how that would work where the pre-compute is done per element but the actual output is emitted only when the window is fired. But that is only possible if there are no Evictors defined on the window? Also how are the elements fed to the Reduce/Fold function. Is it like MapReduce where even if you are using a Iterator, in reality all the values for a key are not buffered into memory? Which ties back to how is RocksDB is used to store a large window state before it is triggered. If my elements are accumulating in a window (serving a ReduceFunction) does it spill to disk (RocksDB?) when a threshold size is reached? Thanks, Sameer On Tue, Jul 26, 2016 at 7:29 AM, Ufuk Celebiwrote: > On Mon, Jul 25, 2016 at 8:50 PM, Sameer W wrote: > > The question is, if using really long windows (in hours) if the state of > the > > window gets very large over time, would size of the RocksDB get larger? > > Would replication to HDFS start causing performance bottlenecks? Also > would > > this need a constant (at checkpoint interval?), read from RocksDB, add > more > > window elements and write to RocksDB. > > Yes. The size of the RocksDB instance is directly correlated with the > number of K/V state pairs you store. You can remove state by calling > `clear()` on the KvState instance. > > All state updates go directly to RocksDB and snapshots copy the DB > files (semi-async mode, current default) or iterate-and-copy all K/V > pairs (fully-async mode). No records are deleted automatically after > snapshots. > > Snapshotting large RocksDB instances will cause some slow down, but > you can trade this cost off by adjusting the checkpointing interval. > There are plans to do the snapshots in an incremental fashion in order > to lower the costs for this, but there is no design doc available for > it at this point. > > > Outside of the read costs, is there a risk to having very long windows > when > > you know you could collect a lot of elements in them. Instead is it > safer to > > perform aggregations on top of aggregations or use your own custom remote > > store like HBase to persist larger state per record and use windows only > to > > store the keys in HBase. I mention HBase because of its support for > column > > qualifiers allow elements to be added to the same key in multiple ordered > > column qualifiers. Reading can also be throttled in batches of column > > qualifiers allowing for the better memory consumption. Is this approach > used > > in practice? > > RocksDB works quite well for large stateful jobs. If possible for your > use case, I would still recommend work with pre-aggregating window > functions ( > https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/windows.html#window-functions > ) > or pre-aggregating the data. The I/O costs will correlate with the > state size, but there is "no risk" in the sense of that it will still > work as expected. > > What you describe with HBase could work, but I'm not aware of someone > doing this. Furhtermore, depending on your use case, it can cause > problems in failure scenarios, because you might need to keep HBase > and Flink state in sync. >
Re: flink batch data processing
Are you using the DataSet or DataStream API? Yes, most Flink transformations operate on single tuples, but you can work around it: - You could write a custom source function, which emits records that contain X points (https://ci.apache.org/projects/flink/flink-docs-master/apis/batch/index.html#data-sources) - You can use a mapPartition (https://ci.apache.org/projects/flink/flink-docs-master/apis/batch/dataset_transformations.html#mappartition) or FlatMap (https://ci.apache.org/projects/flink/flink-docs-master/apis/batch/dataset_transformations.html#flatmap) function and create the batches manually. Does this help? On Fri, Jul 22, 2016 at 7:21 PM, Paul Joiremanwrote: > I'm evaluating for some processing batches of data. As a simple example say > I have 2000 points which I would like to pass through an FIR filter using > functionality provided by the Python scipy libraryjk. The scipy filter is a > simple function which accepts a set of coefficients and the data to filter > and returns the data. Is is possible to create a transformation to handle > this in flink? It seems flink transformations are applied on a point by > point basis but I may be missing something. > > Paul
Re: Question about Apache Flink Use Case
Hi Suma Cherukuri, I also replied to your question in the dev list, but I repeat the answer here just in case you missed in. From what I understand you have many small files and you want to aggregate them into bigger ones containing the logs of the last 24h. As Max said RollingSinks will allow you to have exactly-once semantics when writing your aggregated results to your FS. As far as reading your input is concerned, Flink recently integrated functionality to periodically monitor a directory, e.g. your log directory, and process only the new files as they appear. This will be part of the 1.1 release which is coming possibly during this week or the next, but you can always find it on the master branch. The method that you need is: readFile(FileInputFormat inputFormat, String filePath, FileProcessingMode watchType, long interval, FilePathFilter filter, TypeInformation typeInformation) which allows you to specify the FileProcessingMode (which you should set to FileProcessingMode.PROCESS_CONTINUOUSLY) and the “interval” at which Flink is going to monitor the directory (path) for new files. In addition you can find some helper methods in the StreamExecutionEnvironment class that allow you to avoid specifying some parameters. I believe that with the above two features (RollingSink and ContinuousMonitoring source) Link can be the tool for your job, as both of them also provide exactly-once guarantees. I hope this helps. Let us know what you think, Kostas > On Jul 22, 2016, at 9:03 PM, Suma Cherukuri> wrote: > > Hi, > > Good Afternoon! > > I work as an engineer at Symantec. My team works on Multi-tenant Event > Processing System. Just a high level background, our customers write data to > kafka brokers though agents like logstash and we process the events and save > the log data in Elastic Search and S3. > > Use Case: We have a use case where in we write batches of events to S3 when > file size limitation of 1MB (specific to our case) or a certain time > threshold is reached. We are planning on merging the number of files > specific to a folder into one single file based on either time limit such as > every 24 hrs. > > We were considering various options available today and would like to know if > Apache Flink can be used to serve the purpose. > > Looking forward to hearing from you. > > Thank you > Suma Cherukuri >
Re: Supporting Multi-tenancy in a Flink
On Mon, Jul 25, 2016 at 5:38 AM, Aparup Banerjee (apbanerj)wrote: > We are building a Stream processing system using Apache beam on top of Flink > using the Flink Runner. Our pipelines take Kafka streams as sources , and > can write to multiple sinks. The system needs to be tenant aware. Tenants > can share same Kafka topic. Tenants can write their own pipelines. We am > providing a small framework to write pipelines (on top of beam), so we have > control of what data stream is available to pipeline developer. I am looking > for some strategies for following : > > How can I partition / group the data in a way that pipeline developers don’t > need to care about tenancy , but the data integrity is maintained ? > Ways in which I can assign compute(work nodes for e.g) to different jobs > based on Tenant configuration. There is no built-in support for this in Flink, but King.com worked on something similar using custom operators. You can check out the blog post here: https://techblog.king.com/rbea-scalable-real-time-analytics-king/ I'm pulling in Gyula (cc'd) who worked on the implementation at King...
Re: dynamic streams and patterns
On Mon, Jul 25, 2016 at 10:09 AM, Claudia Wegmannwrote: > To 3) Would an approach similar to King/RBEA even be possible combined with > Flink CEP? As I understand, Patterns have to be defined in Java code and > therefore have to be recompiled? Do I overlook something important? Pulling in Till (cc'd) who worked on the CEP library.
Re: Question about Checkpoint Storage (RocksDB)
On Mon, Jul 25, 2016 at 8:50 PM, Sameer Wwrote: > The question is, if using really long windows (in hours) if the state of the > window gets very large over time, would size of the RocksDB get larger? > Would replication to HDFS start causing performance bottlenecks? Also would > this need a constant (at checkpoint interval?), read from RocksDB, add more > window elements and write to RocksDB. Yes. The size of the RocksDB instance is directly correlated with the number of K/V state pairs you store. You can remove state by calling `clear()` on the KvState instance. All state updates go directly to RocksDB and snapshots copy the DB files (semi-async mode, current default) or iterate-and-copy all K/V pairs (fully-async mode). No records are deleted automatically after snapshots. Snapshotting large RocksDB instances will cause some slow down, but you can trade this cost off by adjusting the checkpointing interval. There are plans to do the snapshots in an incremental fashion in order to lower the costs for this, but there is no design doc available for it at this point. > Outside of the read costs, is there a risk to having very long windows when > you know you could collect a lot of elements in them. Instead is it safer to > perform aggregations on top of aggregations or use your own custom remote > store like HBase to persist larger state per record and use windows only to > store the keys in HBase. I mention HBase because of its support for column > qualifiers allow elements to be added to the same key in multiple ordered > column qualifiers. Reading can also be throttled in batches of column > qualifiers allowing for the better memory consumption. Is this approach used > in practice? RocksDB works quite well for large stateful jobs. If possible for your use case, I would still recommend work with pre-aggregating window functions (https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/windows.html#window-functions) or pre-aggregating the data. The I/O costs will correlate with the state size, but there is "no risk" in the sense of that it will still work as expected. What you describe with HBase could work, but I'm not aware of someone doing this. Furhtermore, depending on your use case, it can cause problems in failure scenarios, because you might need to keep HBase and Flink state in sync.
tumbling time window, date boundary and timezone
Hello, I want to calculate daily access count using Flink streaming. Flink's TumblingProcessingTimeWindow assigns events to windows of 00:00 GMT to 23:59 GMT each day, but I live in Japan (GMT+09:00) and want date boundaries to be 09:00 GMT (00:00 JST). Do I have to implement my own WindowAssigner for this use case? Thanks, Hironori Ogibayashi
Re: Performance issues with GroupBy?
+1 to what Gavor said. The hash combine will be part of the upcoming 1.1. release, too. This could be further amplified by the blocking intermediate results, which have a very simplistic implementation writing out many different files, which can lead to a lot of random I/O. – Ufuk On Tue, Jul 26, 2016 at 11:41 AM, Gábor Gévaywrote: > Hello Robert, > >> Is there something I might could do to optimize the grouping? > > You can try to make your `RichGroupReduceFunction` implement the > `GroupCombineFunction` interface, so that Flink can do combining > before the shuffle, which might significantly reduce the network load. > (How much the combiner helps the performance can greatly depend on how > large are your groups on average.) > > Alternatively, if you can reformulate your algorithm to use a `reduce` > instead of a `reduceGroup` that might also improve the performance. > Also, if you are using a `reduce`, then you can try calling > `.setCombineHint(CombineHint.HASH)` after the reduce. (The combine > hint is a relatively new feature, so you need the current master for > this.) > > Best, > Gábor > > > > 2016-07-25 14:06 GMT+02:00 Paschek, Robert : >> Hi Mailing List, >> >> >> >> i actually do some benchmarks with different algorithms. The System has 8 >> nodes and a configured parallelism of 48 - The IBM-Power-1 cluster, if >> somebody from the TU Berlin read this : - ) – and to “simulate” Hadoop >> MapReduce, the execution mode is set to “BATCH_FORCED” >> >> >> >> It is suspicious, that three of the six algorithms had a big gap in runtime >> (5000ms vs. 2ms) for easy (low dim) tuple. Additionally the algorithms >> in the “upper” group using a groupBy transformation and the algorithms in >> the “lower” group don’t use groupBy. >> >> I attached the plot for better visualization. >> >> >> >> I also checked the logs, especially the time, when the mappers finishing and >> the reducers start _iterating_ - they hardened my speculation. >> >> >> >> So my question is, if it is “normal”, that grouping is so cost-intensive >> that – in my case – the runtime increases by 4 times? >> >> I have data from the same experiments running on a 13 nodes cluster with 26 >> cores with Apache Hadoop MapReduce, where the gap is still present, but >> smaller (50s vs 57s or 55s vs 65s). >> >> >> >> Is there something I might could do to optimize the grouping? Some >> codesnipplets: >> >> >> >> The Job: >> DataSet output = input >> >> .mapPartition(new >> MR_GPMRS_Mapper()).withBroadcastSet(metaData, >> "MetaData").withParameters(parameters).name(MR_GPMRS_OPTIMIZED.class.getSimpleName()+"_MAPPER") >> >> .groupBy(0) >> >> .reduceGroup(new >> MR_GPMRS_Reducer()).returns(input.getType()).withBroadcastSet(metaData, >> "MetaData").withParameters(parameters).name(MR_GPMRS_OPTIMIZED.class.getSimpleName()+"_REDUCER"); >> >> >> >> MR_GPMRS_Mapper(): >> >> public class MR_GPMRS_Mapper extends >> RichMapPartitionFunction , >> BitSet, BitSet>>> >> >> >> >> MR_GPMRS_Reducer(): >> >> public class MR_GPMRS_Reducer extends >> RichGroupReduceFunction , >> BitSet, BitSet>>, T> >> >> >> >> The Tuple2 has as Payload on position f1 the Tuple3 and on position f0 the >> Integer Key for grouping. >> >> >> >> Any suggestions (or comments, that it is a “normal” behaviour) are welcome : >> - ) >> >> >> >> Thank you in advance! >> >> Robert
Re: Performance issues with GroupBy?
Hello Robert, > Is there something I might could do to optimize the grouping? You can try to make your `RichGroupReduceFunction` implement the `GroupCombineFunction` interface, so that Flink can do combining before the shuffle, which might significantly reduce the network load. (How much the combiner helps the performance can greatly depend on how large are your groups on average.) Alternatively, if you can reformulate your algorithm to use a `reduce` instead of a `reduceGroup` that might also improve the performance. Also, if you are using a `reduce`, then you can try calling `.setCombineHint(CombineHint.HASH)` after the reduce. (The combine hint is a relatively new feature, so you need the current master for this.) Best, Gábor 2016-07-25 14:06 GMT+02:00 Paschek, Robert: > Hi Mailing List, > > > > i actually do some benchmarks with different algorithms. The System has 8 > nodes and a configured parallelism of 48 - The IBM-Power-1 cluster, if > somebody from the TU Berlin read this : - ) – and to “simulate” Hadoop > MapReduce, the execution mode is set to “BATCH_FORCED” > > > > It is suspicious, that three of the six algorithms had a big gap in runtime > (5000ms vs. 2ms) for easy (low dim) tuple. Additionally the algorithms > in the “upper” group using a groupBy transformation and the algorithms in > the “lower” group don’t use groupBy. > > I attached the plot for better visualization. > > > > I also checked the logs, especially the time, when the mappers finishing and > the reducers start _iterating_ - they hardened my speculation. > > > > So my question is, if it is “normal”, that grouping is so cost-intensive > that – in my case – the runtime increases by 4 times? > > I have data from the same experiments running on a 13 nodes cluster with 26 > cores with Apache Hadoop MapReduce, where the gap is still present, but > smaller (50s vs 57s or 55s vs 65s). > > > > Is there something I might could do to optimize the grouping? Some > codesnipplets: > > > > The Job: > DataSet output = input > > .mapPartition(new > MR_GPMRS_Mapper()).withBroadcastSet(metaData, > "MetaData").withParameters(parameters).name(MR_GPMRS_OPTIMIZED.class.getSimpleName()+"_MAPPER") > > .groupBy(0) > > .reduceGroup(new > MR_GPMRS_Reducer()).returns(input.getType()).withBroadcastSet(metaData, > "MetaData").withParameters(parameters).name(MR_GPMRS_OPTIMIZED.class.getSimpleName()+"_REDUCER"); > > > > MR_GPMRS_Mapper(): > > public class MR_GPMRS_Mapper extends > RichMapPartitionFunction , > BitSet, BitSet>>> > > > > MR_GPMRS_Reducer(): > > public class MR_GPMRS_Reducer extends > RichGroupReduceFunction , > BitSet, BitSet>>, T> > > > > The Tuple2 has as Payload on position f1 the Tuple3 and on position f0 the > Integer Key for grouping. > > > > Any suggestions (or comments, that it is a “normal” behaviour) are welcome : > - ) > > > > Thank you in advance! > > Robert