Re: Exception in Flink 1.3.0
Any update when 1.3.1 will be available? Our current copy is 1.2.0 but that has separate issue(invalid type code: 00). http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/invalid-type-code-00-td13326.html#a13332 -- View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Exception-in-Flink-1-3-0-tp13493p13664.html Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
Re: Use Single Sink For All windows
Thanks Aljoscha for your response. I would give a try.. 1- flink call *invoke* method of SinkFunction to dispatch aggregated information. My follow up question here is .. while snapshotState method is in process, if sink received another update then we might have mix records, however per document all update stop during checkpoint. i assume this works the same way. https://ci.apache.org/projects/flink/flink-docs-release-1.3/internals/stream_checkpointing.html *"As soon as the operator receives snapshot barrier n from an incoming stream, it cannot process any further records from that stream until it has received the barrier n from the other inputs as well. Otherwise, it would mix records that belong to snapshot n and with records that belong to snapshot n+1."* *"Streams that report barrier n are temporarily set aside. Records that are received from these streams are not processed, but put into an input buffer". * 2- snapshotState method call when "checkpoint is requested". is there an interface that provide when checkpoint complete .. I meant.. I will add my flush logic right after completion of snapshot & before flink resume the stream. With this approach we can assure that we update state only if the checkpoint was successful. -- View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Use-Single-Sink-For-All-windows-tp13475p13652.html Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
Deterministic Update
Is there any possibility to trigger sink operator on completion of checkpoint? -- View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Deterministic-Update-tp13580.html Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
Re: Use Single Sink For All windows
because of parallelism i am seeing db contention. Wondering if i can merge sink of multiple windows and insert in batch. -- View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Use-Single-Sink-For-All-windows-tp13475p13525.html Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
Re: Exception in Flink 1.3.0
Thanks for sharing ticket reference. Is there any time line as this is blocker? -- View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Exception-in-Flink-1-3-0-tp13493p13500.html Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
Re: ProcessFunction broke : Flink 1.3 upgrade
yes i see that so shall i update code from RichProcessFunction to RichFunction(based object)? The implementation throw compile exception as RichProcessFunction has been removed from 1.3. -- View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ProcessFunction-broke-Flink-1-3-upgrade-tp13492p13498.html Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
Exception in Flink 1.3.0
After upgrade i started getting this exception, is this a bug? 2017-06-05 23:45:03,423 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph- UTCStream -> Sink: UTC sink (2/12) (f78ef7f7368d27f414ebb9d0db7a26c8) switched from RUNNING to FAILED. java.lang.Exception: Could not perform checkpoint 1 for operator UTCStream -> Sink: UTC sink (2/12). at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:551) at org.apache.flink.streaming.runtime.io.BarrierBuffer.notifyCheckpoint(BarrierBuffer.java:378) at org.apache.flink.streaming.runtime.io.BarrierBuffer.processBarrier(BarrierBuffer.java:281) at org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:183) at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:213) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:262) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.Exception: Could not complete snapshot 1 for operator UTCStream -> Sink: UTC sink (2/12). at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:407) at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1157) at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1089) at org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:653) at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:589) at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:542) ... 8 more Caused by: java.lang.UnsupportedOperationException at org.apache.flink.api.scala.typeutils.TraversableSerializer.snapshotConfiguration(TraversableSerializer.scala:155) at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.buildConfigSnapshot(PojoSerializer.java:1183) at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.snapshotConfiguration(PojoSerializer.java:572) at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.snapshotConfiguration(PojoSerializer.java:55) at org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo.snapshot(RegisteredKeyedBackendStateMetaInfo.java:77) at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.snapshot(HeapKeyedStateBackend.java:267) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:397) -- View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Exception-in-Flink-1-3-0-tp13493.html Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
ProcessFunction broke : Flink 1.3 upgrade
with 1.3 what should we use for processFunction? org.apache.flink.streaming.api.functions.RichProcessFunction org.apache.flink.streaming.api.functions.ProcessFunction.{OnTimerContext, Context} -- View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ProcessFunction-broke-Flink-1-3-upgrade-tp13492.html Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
Use Single Sink For All windows
Is it possible to return all windows update to single Sink (aggregated collection). The reason i am asking because we are using mysql for sink. I am wondering if i can update all of the them in single batch so as to avoid possibly avoid contention. -- View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Use-Single-Sink-For-All-windows-tp13475.html Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
Re: Checkpoints very slow with high backpressure
Nvm i found it. Backpressure caused by aws RDS instance of mysql. -- View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Checkpoints-very-slow-with-high-backpressure-tp12762p13468.html Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
Re: Checkpoints very slow with high backpressure
Enable info log. it seems it stuck ==> /mnt/ephemeral/logs/flink-flink-jobmanager-0-vpc2w2-rep-stage-flink1.log <== 2017-06-01 12:45:18,229 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 1 @ 1496321118221 ==> /mnt/ephemeral/logs/flink-flink-taskmanager-0-vpc2w2-rep-stage-flink1.log <== 2017-06-01 12:45:18,237 INFO org.apache.flink.core.fs.FileSystem - Created new CloseableRegistry org.apache.flink.core.fs.SafetyNetCloseableRegistry@79e68dd3 for Async calls on Source: Custom Source (2/12) 2017-06-01 12:45:18,237 INFO org.apache.flink.core.fs.FileSystem - Created new CloseableRegistry org.apache.flink.core.fs.SafetyNetCloseableRegistry@78da1e82 for Async calls on Source: Custom Source (5/12) 2017-06-01 12:45:18,238 INFO org.apache.flink.core.fs.FileSystem - Created new CloseableRegistry org.apache.flink.core.fs.SafetyNetCloseableRegistry@68bff79e for Async calls on Source: Custom Source (8/12) 2017-06-01 12:45:18,238 INFO org.apache.flink.core.fs.FileSystem - Created new CloseableRegistry org.apache.flink.core.fs.SafetyNetCloseableRegistry@600bdc29 for Async calls on Source: Custom Source (11/12) 2017-06-01 12:45:24,853 INFO com.company.deserializer.EventDeserializer - ==> KafkaConsumertest :: 2017-06-01 12:45:24,853 INFO com.company.deserializer.EventDeserializer - ==> KafkaConsumertest :: 2017-06-01 12:45:24,853 INFO com.company.deserializer.EventDeserializer - ==> KafkaConsumertest :: 2017-06-01 12:45:24,854 INFO com.company.deserializer.EventDeserializer - ==> KafkaConsumertest :: 2017-06-01 12:45:24,859 INFO org.apache.flink.configuration.GlobalConfiguration- Loading configuration property: jobmanager.rpc.address, host 2017-06-01 12:45:24,859 INFO org.apache.flink.configuration.GlobalConfiguration- Loading configuration property: jobmanager.rpc.port, 6123 2017-06-01 12:45:24,859 INFO org.apache.flink.configuration.GlobalConfiguration- Loading configuration property: jobmanager.heap.mb, 512 2017-06-01 12:45:24,859 INFO org.apache.flink.configuration.GlobalConfiguration- Loading configuration property: taskmanager.heap.mb, 1024 2017-06-01 12:45:24,859 INFO org.apache.flink.configuration.GlobalConfiguration- Loading configuration property: taskmanager.numberOfTaskSlots, 20 2017-06-01 12:45:24,859 INFO org.apache.flink.configuration.GlobalConfiguration- Loading configuration property: taskmanager.memory.preallocate, false 2017-06-01 12:45:24,859 INFO org.apache.flink.configuration.GlobalConfiguration- Loading configuration property: parallelism.default, 4 2017-06-01 12:45:24,859 INFO org.apache.flink.configuration.GlobalConfiguration- Loading configuration property: jobmanager.web.port, 8081 2017-06-01 12:45:24,859 INFO org.apache.flink.configuration.GlobalConfiguration- Loading configuration property: state.backend, filesystem 2017-06-01 12:45:24,860 INFO org.apache.flink.configuration.GlobalConfiguration- Loading configuration property: taskmanager.network.numberOfBuffers, 2048 2017-06-01 12:45:24,860 INFO org.apache.flink.configuration.GlobalConfiguration- Loading configuration property: taskmanager.tmp.dirs, /mnt/ephemeral/tmp 2017-06-01 12:45:24,860 INFO org.apache.flink.configuration.GlobalConfiguration- Loading configuration property: fs.hdfs.hadoopconf, /opt/hadoop-config 2017-06-01 12:45:24,862 INFO org.apache.flink.configuration.GlobalConfiguration- Loading configuration property: yarn.application-attempts, 10 2017-06-01 12:45:24,863 INFO org.apache.flink.configuration.GlobalConfiguration- Loading configuration property: high-availability, zookeeper 2017-06-01 12:45:24,863 INFO org.apache.flink.configuration.GlobalConfiguration- Loading configuration property: high-availability.zookeeper.quorum, host1:2181,host2:2181 2017-06-01 12:45:24,863 INFO org.apache.flink.configuration.GlobalConfiguration- Loading configuration property: high-availability.zookeeper.storageDir, s3://somelocation/ha-recovery/ 2017-06-01 12:45:24,863 INFO org.apache.flink.configuration.GlobalConfiguration- Loading configuration property: high-availability.zookeeper.path.root, /flink-y 2017-06-01 12:45:24,863 INFO org.apache.flink.configuration.GlobalConfiguration- Loading configuration property: zookeeper.sasl.disable, true 2017-06-01 12:45:24,863 INFO org.apache.flink.configuration.GlobalConfiguration- Loading configuration property: taskmanager.heap.mb, 12288 2017-06-01 12:45:24,895 INFO org.apache.flink.configuration.GlobalConfiguration- Loading configuration property: jobmanager.rpc.address, host 2017-06-01
Re: Checkpoints very slow with high backpressure
I tried to extend timeout to 1 hour but no luck. it is still timing out & no exception in log file So i am guessing something stuck, will dig down further. Here is configuration detail. Standalone cluster & checkpoint store in S3. i just have 217680 messages in 24 partitions. Anyidea? -- View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Checkpoints-very-slow-with-high-backpressure-tp12762p13419.html Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
Re: Checkpoints very slow with high backpressure
I tried to extend timeout to 1 hour but no luck. it is still timing out. So i am guessing something stuck, will dig down further. Here is configuration detail. Standalone cluster & checkpoint store in S3. i just have 217680 messages in 24 partitions. Anyidea? -- View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Checkpoints-very-slow-with-high-backpressure-tp12762p13418.html Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
Re: Checkpoints very slow with high backpressure
So what is the resolution? flink consuming messages from kafka. Flink went down about a day ago, so now flink has to process 24 hour worth of events. But i hit backpressure, as of right now checkpoint are timing out. Is there any recommendation how to handle this situation? Seems like trigger are also not firing so no update being made to down line database. is there recommended approach to handle backpressure? Version Flink 1.2. -- View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Checkpoints-very-slow-with-high-backpressure-tp12762p13411.html Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
Re: State in Custom Tumble Window Class
Thanks Aljoscha Krettek, So the results will not be deterministic for late events. For idempotent update, i would need to find an additional key base of current event time if they are late and attached to the aggregator which probably possible by doing some function(maxEventTime, actualEventTime). For that i need maxEventTime to be stored as part of state & recover in case of runtime failure. Here is my corner case like. -- If assume whole flink runtime crashed(auto commit on) & after recovery the first event arrived is from past(actually late). Without keeping max currentTime state, may potentially override previous aggregate. I was wondering if i can record my last max EventTime as part of checkPoint, or run query against sink source to find last processed eventtime. Any recommendation? -- View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/State-in-Custom-Tumble-Window-Class-tp13177p13387.html Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
Re: invalid type code: 00
Which Flink version you are using? 1.2 What is your job doing (e.g. operators that you are using)? ProcessFunction to determine if event is late change event time to current & then window Which operator throws this exception? i will have to dig it further Which state-backend are you using? mysql. -- View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/invalid-type-code-00-tp13326p13332.html Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
invalid type code: 00
Sprodically i am seeing this error. Any idea? java.lang.IllegalStateException: Could not initialize keyed state backend. at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:286) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:199) at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:664) at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:651) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:257) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655) at java.lang.Thread.run(Thread.java:745) Caused by: java.io.StreamCorruptedException: invalid type code: 00 at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1381) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:373) at java.util.HashMap.readObject(HashMap.java:1404) at sun.reflect.GeneratedMethodAccessor6.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1909) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018) at java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:503) at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.readObject(PojoSerializer.java:130) at sun.reflect.GeneratedMethodAccessor21.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1909) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:373) at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:292) at org.apache.flink.api.common.typeutils.TypeSerializerSerializationProxy.read(TypeSerializerSerializationProxy.java:97) at org.apache.flink.runtime.state.KeyedBackendSerializationProxy.read(KeyedBackendSerializationProxy.java:88) at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restorePartitionedState(HeapKeyedStateBackend.java:299) at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restore(HeapKeyedStateBackend.java:243) at org.apache.flink.streaming.runtime.tasks.StreamTask.createKeyedStateBackend(StreamTask.java:799) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:277) ... 6 more -- View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/invalid-type-code-00-tp13326.html Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
Re: State in Custom Tumble Window Class
Could you elaborate this more? If i assume if i set window time to max .. does it mean my window will stay for infinite time framework, Wouldn't this may hit memory overflow with time? -- View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/State-in-Custom-Tumble-Window-Class-tp13177p13255.html Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.