Re: Queryable State and Windows
I just want to add another workaround, which does not need a self-compiled version. You can use TimeWindow with a CountTriger.of(1) combined with a FoldFunction for pre-aggregration and a RichWindowFunction to update the queryable state. Additionally, you need a TimeWindow for the final results. So you are doubling the amount of state as well as computation, but depending on the circumstances this might be preferrable than tweaking Flink 1.2. I think, Jamie Grier did this similarly in one of his presentation on the topic. Cheers, Konstantin On 23.01.2017 15:39, Ufuk Celebi wrote: > This is not possible at the moment. We discussed this a couple of > times before, but in the end did not want to expose it with the > initial version, because the interfaces are still very raw. This is > definitely on the agenda though. > > As a work around you would have to build a custom Flink version with > calls `setQueryable` on the state descriptors of the WindowOperator. > If there is an easy non intrusive way to activate this for the > upcoming 1.2 version, I will try to do it. > > > > On Mon, Jan 23, 2017 at 2:46 PM, Joe Olsonwrote: >> From what I've read in the documentation, and from the examples I've seen, >> in order to make state queryable externally to Flink, the state descriptor >> variables need access to the Flink runtime context. >> >> This means the stream processor has to have access to the 'Rich' level >> objects - 'RichFlatMap' for example. All the SNAPSHOT1.2 queryable state >> examples I have seen revolve around RichFlatMap. >> >> Is there a way to get the runtime context exposed so that you can have state >> descriptor variables queryable from within a Flink window, while the window >> is loading? >> >> My processor is built around the following: >> >> .addSource(new FlinkKafkaConsumer010()) >> .assignTimestampsAndWatermarks(new MyTimestampsAndWatermarks()) >> .keyBy() >> .window(GlobalWindows.create()) >> .trigger(new myTrigger()) >> .apply(new myWindowFunction()) >> .addSink(new mySink()) >> >> The only rich object in this chain are available in the apply >> (RichWindowFunction). But that is too late - I want to be able to query out >> whats in the window while it is filling. I know I have access to onElement >> in the trigger, and I can set up the state descriptor variables there, but >> the variables don't seem to have exposure to the runtime environment within >> the trigger. >> >> Is there a way to get queryable state within a Flink window while it is >> filling? >> > -- Konstantin Knauf * konstantin.kn...@tngtech.com * +49-174-3413182 TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke Sitz: Unterföhring * Amtsgericht München * HRB 135082 signature.asc Description: OpenPGP digital signature
Re: Flink checkpointing gets stuck
Thanks! This looks like a bigger example, involving MongoDB, etc. Are you able to reproduce this issue with a smaller example? It would also help to understand the problem better if we knew the topology a bit better. The stack traces look like "phase 1&2" want to send data (but are back pressured) and "phase 3&4&5" wait for input data. Stephan On Sun, Feb 26, 2017 at 12:30 PM, Shai Kaplanwrote: > Running jstack on one of the Task Managers: > > > > 2017-02-26 10:06:27 > > Full thread dump Java HotSpot(TM) 64-Bit Server VM (25.111-b14 mixed mode): > > > > "Attach Listener" #6414 daemon prio=9 os_prio=0 tid=0x7f3c8c089000 > nid=0xe692 waiting on condition [0x] > >java.lang.Thread.State: RUNNABLE > > > > "Async calls on Sink: phase 5 (32/48)" #2337 daemon prio=5 os_prio=0 > tid=0x7f3b942fc000 nid=0xb0d5 waiting on condition [0x7f3adf0af000] > >java.lang.Thread.State: WAITING (parking) > > at sun.misc.Unsafe.park(Native Method) > > - parking to wait for <0x7f3d9d000620> (a > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) > > at java.util.concurrent.locks.LockSupport.park(LockSupport. > java:175) > > at java.util.concurrent.locks.AbstractQueuedSynchronizer$ > ConditionObject.await(AbstractQueuedSynchronizer.java:2039) > > at java.util.concurrent.LinkedBlockingQueue.take( > LinkedBlockingQueue.java:442) > > at java.util.concurrent.ThreadPoolExecutor.getTask( > ThreadPoolExecutor.java:1067) > > at java.util.concurrent.ThreadPoolExecutor.runWorker( > ThreadPoolExecutor.java:1127) > > at java.util.concurrent.ThreadPoolExecutor$Worker.run( > ThreadPoolExecutor.java:617) > > at java.lang.Thread.run(Thread.java:745) > > > > "Async calls on Sink: phase 5 (31/48)" #2336 daemon prio=5 os_prio=0 > tid=0x7f3b942fb000 nid=0xb0d4 waiting on condition [0x7f3adf1b] > >java.lang.Thread.State: WAITING (parking) > > at sun.misc.Unsafe.park(Native Method) > > - parking to wait for <0x7f3d9fbd7e70> (a > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) > > at java.util.concurrent.locks.LockSupport.park(LockSupport. > java:175) > > at java.util.concurrent.locks.AbstractQueuedSynchronizer$ > ConditionObject.await(AbstractQueuedSynchronizer.java:2039) > > at java.util.concurrent.LinkedBlockingQueue.take( > LinkedBlockingQueue.java:442) > > at java.util.concurrent.ThreadPoolExecutor.getTask( > ThreadPoolExecutor.java:1067) > > at java.util.concurrent.ThreadPoolExecutor.runWorker( > ThreadPoolExecutor.java:1127) > > at java.util.concurrent.ThreadPoolExecutor$Worker.run( > ThreadPoolExecutor.java:617) > > at java.lang.Thread.run(Thread.java:745) > > > > "Async calls on Sink: phase 5 (30/48)" #2335 daemon prio=5 os_prio=0 > tid=0x7f3b942f9800 nid=0xb0d3 waiting on condition [0x7f3adf2b1000] > >java.lang.Thread.State: WAITING (parking) > > at sun.misc.Unsafe.park(Native Method) > > - parking to wait for <0x7f3da07cdde8> (a > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) > > at java.util.concurrent.locks.LockSupport.park(LockSupport. > java:175) > > at java.util.concurrent.locks.AbstractQueuedSynchronizer$ > ConditionObject.await(AbstractQueuedSynchronizer.java:2039) > > at java.util.concurrent.LinkedBlockingQueue.take( > LinkedBlockingQueue.java:442) > > at java.util.concurrent.ThreadPoolExecutor.getTask( > ThreadPoolExecutor.java:1067) > > at java.util.concurrent.ThreadPoolExecutor.runWorker( > ThreadPoolExecutor.java:1127) > > at java.util.concurrent.ThreadPoolExecutor$Worker.run( > ThreadPoolExecutor.java:617) > > at java.lang.Thread.run(Thread.java:745) > > > > "Async calls on Sink: phase 5 (29/48)" #2334 daemon prio=5 os_prio=0 > tid=0x7f3b942f8800 nid=0xb0d2 waiting on condition [0x7f3adf3b2000] > >java.lang.Thread.State: WAITING (parking) > > at sun.misc.Unsafe.park(Native Method) > > - parking to wait for <0x7f3d9e0003e8> (a > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) > > at java.util.concurrent.locks.LockSupport.park(LockSupport. > java:175) > > at java.util.concurrent.locks.AbstractQueuedSynchronizer$ > ConditionObject.await(AbstractQueuedSynchronizer.java:2039) > > at java.util.concurrent.LinkedBlockingQueue.take( > LinkedBlockingQueue.java:442) > > at java.util.concurrent.ThreadPoolExecutor.getTask( > ThreadPoolExecutor.java:1067) > > at java.util.concurrent.ThreadPoolExecutor.runWorker( > ThreadPoolExecutor.java:1127) > >
Re: Flink streaming. Broadcast reference data map across nodes
Hi, what Ufuk said is valid. In addition, you can make your function a RichFunction and load the static data in the open() method. In the future, you might be able to handle this use case with a feature called side inputs that we're currently working on: https://docs.google.com/document/d/1hIgxi2Zchww_5fWUHLoYiXwSBXjv-M5eOv-MKQYN3m4/edit Best, Aljoscha On Tue, 21 Feb 2017 at 15:50 Ufuk Celebiwrote: > On Tue, Feb 21, 2017 at 2:35 PM, Vadim Vararu > wrote: > > Basically, i have a big dictionary of reference data that has to be > > accessible from all the nodes (in order to do some joins of log line with > > reference line). > > If the dictionary is small you can make it part of the closures that > are send to the task managers. Just make it part of your function. > > If it is large, I'm not sure what the best way is to do it is right > now. I've CC'd Aljoscha who can probably help... >
Re: Serialization schema
There was a private member variable that was not serializable and was not marked transient. Thanks for the pointer. On Thu, Feb 23, 2017 at 11:44 PM, Tzu-Li (Gordon) Taiwrote: > Thanks for clarifying. > > From the looks of your exception: > > Caused by: java.io.NotSerializableException: > com.sy.flink.test.Tuple2Serializerr$1 > at java.io.ObjectOutputStream.wri > teObject0(ObjectOutputStream.java:1184) > at java.io.ObjectOutputStream.def > aultWriteFields(ObjectOutputStream.java:1548) > > com.sy.flink.test.Tuple2Serializerr$1: this states that an anonymous > inner class in `Tuple2Serializerr` is not serializable. > > Could you check if that’s the case? > > > > On February 24, 2017 at 3:10:58 PM, Mohit Anchlia (mohitanch...@gmail.com) > wrote: > > But it is not an inner class. > > On Thu, Feb 23, 2017 at 11:09 PM, Tzu-Li (Gordon) Tai > wrote: > >> Since I don’t have your complete code, I’m guessing this is the problem: >> Is your `Tuple2Serializer` an inner class? If yes, you should be able to >> solve the problem by declaring `Tuple2Serializer` to be `static`. >> >> This is more of a Java problem - >> It isn’t serializable if it isn’t static, because it will contain an >> implicit reference to the enclosing outer class, and therefore serializing >> it will result in serializing the outer class instance as well. >> >> >> On February 24, 2017 at 2:43:38 PM, Mohit Anchlia (mohitanch...@gmail.com) >> wrote: >> >> This is at high level what I am doing: >> >> Serialize: >> >> String s = tuple.getPos(0) + "," + tuple.getPos(1); >> return s.getBytes() >> >> Deserialize: >> >> String s = new String(message); >> String [] sarr = s.split(","); >> Tuple2 tuple = new Tuple2<>(Integer.valueOf(sarr[0]), >> Integer.valueOf(sarr[1])); >> >> return tuple; >> >> >> On Thu, Feb 23, 2017 at 10:22 PM, Tzu-Li (Gordon) Tai < >> tzuli...@apache.org> wrote: >> >>> Hi Mohit, >>> >>> As 刘彪 pointed out in his reply, the problem is that your >>> `Tuple2Serializer` contains fields that are not serializable, so >>> `Tuple2Serializer` itself is not serializable. >>> Could you perhaps share your `Tuple2Serializer` implementation with us >>> so we can pinpoint the problem? >>> >>> A snippet of the class fields and constructor will do, so you don’t have >>> to provide the whole `serialize` / `deserialize` implementation if you >>> don’t want to. >>> >>> Cheers, >>> Gordon >>> >>> >>> On February 24, 2017 at 11:04:34 AM, Mohit Anchlia ( >>> mohitanch...@gmail.com) wrote: >>> >>> I am using String inside to convert into bytes. >>> >>> On Thu, Feb 23, 2017 at 6:50 PM, 刘彪 wrote: >>> Hi Mohit As you did not give the whole codes of Tuple2Serializerr. I guess the reason is some fields of Tuple2Serializerr do not implement Serializable. 2017-02-24 9:07 GMT+08:00 Mohit Anchlia : > I wrote a key serialization class to write to kafka however I am > getting this error. Not sure why as I've already implemented the > interfaces. > > Caused by: java.io.NotSerializableException: > com.sy.flink.test.Tuple2Serializerr$1 > at java.io.ObjectOutputStream.wri > teObject0(ObjectOutputStream.java:1184) > at java.io.ObjectOutputStream.def > aultWriteFields(ObjectOutputStream.java:1548) > > And the class implements the following: > > *public* *class* *Tuple2Serializerr* *implements* > > DeserializationSchema >, > > SerializationSchema > { > > And called like this: > > > FlinkKafkaProducer010 > myProducer = *new* > FlinkKafkaProducer010 >( > > "10.22.4.15:9092", // broker list > > "my-topic", // target topic > > *new* Tuple2Serializerr()); // serialization schema > > > > >>> >> >