Error while deserializing the element
Setup Specifics: Version: 1.6.2 RocksDB Map State Timers stored in rocksdb When we have this job running for long periods of time like > 30 days, if for some reason the job restarts, we encounter "Error while deserializing the element". Is this a known issue fixed in later versions? I see some changes to code for FLINK-10175, but we don't use any queryable state Below is the stack trace org.apache.flink.util.FlinkRuntimeException: Error while deserializing the element. at org.apache.flink.contrib.streaming.state.RocksDBCachingPriorityQueueSet.deserializeElement(RocksDBCachingPriorityQueueSet.java:389) at org.apache.flink.contrib.streaming.state.RocksDBCachingPriorityQueueSet.peek(RocksDBCachingPriorityQueueSet.java:146) at org.apache.flink.contrib.streaming.state.RocksDBCachingPriorityQueueSet.peek(RocksDBCachingPriorityQueueSet.java:56) at org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue$InternalPriorityQueueComparator.comparePriority(KeyGroupPartitionedPriorityQueue.java:274) at org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue$InternalPriorityQueueComparator.comparePriority(KeyGroupPartitionedPriorityQueue.java:261) at org.apache.flink.runtime.state.heap.HeapPriorityQueue.isElementPriorityLessThen(HeapPriorityQueue.java:164) at org.apache.flink.runtime.state.heap.HeapPriorityQueue.siftUp(HeapPriorityQueue.java:121) at org.apache.flink.runtime.state.heap.HeapPriorityQueue.addInternal(HeapPriorityQueue.java:85) at org.apache.flink.runtime.state.heap.AbstractHeapPriorityQueue.add(AbstractHeapPriorityQueue.java:73) at org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.(KeyGroupPartitionedPriorityQueue.java:89) at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBPriorityQueueSetFactory.create(RocksDBKeyedStateBackend.java:2792) at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.create(RocksDBKeyedStateBackend.java:450) at org.apache.flink.streaming.api.operators.InternalTimeServiceManager.createTimerPriorityQueue(InternalTimeServiceManager.java:121) at org.apache.flink.streaming.api.operators.InternalTimeServiceManager.registerOrGetTimerService(InternalTimeServiceManager.java:106) at org.apache.flink.streaming.api.operators.InternalTimeServiceManager.getInternalTimerService(InternalTimeServiceManager.java:87) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.getInternalTimerService(AbstractStreamOperator.java:764) at org.apache.flink.streaming.api.operators.KeyedProcessOperator.open(KeyedProcessOperator.java:61) at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) at java.lang.Thread.run(Thread.java:748) Caused by: java.io.EOFException at java.io.DataInputStream.readUnsignedByte(DataInputStream.java:290) at org.apache.flink.types.StringValue.readString(StringValue.java:769) at org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:69) at org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:28) at org.apache.flink.api.java.typeutils.runtime.RowSerializer.deserialize(RowSerializer.java:179) at org.apache.flink.api.java.typeutils.runtime.RowSerializer.deserialize(RowSerializer.java:46) at org.apache.flink.streaming.api.operators.TimerSerializer.deserialize(TimerSerializer.java:168) at org.apache.flink.streaming.api.operators.TimerSerializer.deserialize(TimerSerializer.java:45) at org.apache.flink.contrib.streaming.state.RocksDBCachingPriorityQueueSet.deserializeElement(RocksDBCachingPriorityQueueSet.java:387) ... 20 more -- Thanks, -Vijay
Re: Loading Rules from compacted Kafka Topic - open() vs Connected Streams
What i was trying to achieve from above was similar to GlobalKTable in Kafka Streams. https://cwiki.apache.org/confluence/display/KAFKA/KIP-99%3A+Add+Global+Tables+to+Kafka+Streams Also current flink version i am using is 1.4 Are there any other suggestions/guidance to achieve GlobalKTable functionality in flink Thanks. On Thu, Jul 12, 2018 at 1:00 PM vijayakumar palaniappan < vijayakuma...@gmail.com> wrote: > Hello All, > I can think of two options of implementing below requirement and request > some guidance on choosing the option with pros and cons. > > Requirements: > - A in memory rules cache to be loaded from log compacted kafka topic. > This cache has to be loaded prior to arrival of events. > - Updates to the log compacted kafka topic has to be tracked to keep the > in memory rule cache up to date > > Additional properties of data: > - On Job start/restart, this rule cache is always loaded from earliest > available offset in the log. - No kafka offset store and restore required. > - No checkpointing needed for the rule cache, as it is loaded afresh in > event of crash and restore > - No eventTime semantics required as we always want the latest rules to be > loaded to cache > > Implementation Options: > > 1. Using a KafkaConsumer in open() doing a initial load, and continuously > fetching rule updates and keeping the in memory cache up to date. This > option is not using a DataStream for rules as we don't use any goodies of > stream like state,checkpoint, event time etc. > 2. Connected Stream approach. Using a KafkaConsumer in open() doing a > initial load. Have a FlinkKafkaSource Stream connected with events. In this > case have to take care of out of order updates to caches, since the rules > updates are from open() and Rule DataStream. > > -- > Thanks, > -Vijay > -- Thanks, -Vijay
Loading Rules from compacted Kafka Topic - open() vs Connected Streams
Hello All, I can think of two options of implementing below requirement and request some guidance on choosing the option with pros and cons. Requirements: - A in memory rules cache to be loaded from log compacted kafka topic. This cache has to be loaded prior to arrival of events. - Updates to the log compacted kafka topic has to be tracked to keep the in memory rule cache up to date Additional properties of data: - On Job start/restart, this rule cache is always loaded from earliest available offset in the log. - No kafka offset store and restore required. - No checkpointing needed for the rule cache, as it is loaded afresh in event of crash and restore - No eventTime semantics required as we always want the latest rules to be loaded to cache Implementation Options: 1. Using a KafkaConsumer in open() doing a initial load, and continuously fetching rule updates and keeping the in memory cache up to date. This option is not using a DataStream for rules as we don't use any goodies of stream like state,checkpoint, event time etc. 2. Connected Stream approach. Using a KafkaConsumer in open() doing a initial load. Have a FlinkKafkaSource Stream connected with events. In this case have to take care of out of order updates to caches, since the rules updates are from open() and Rule DataStream. -- Thanks, -Vijay
Task Slots allocation
I am using flink 1.4.0 in standalone cluster mode I have a job with a graph like a Source(parallelism 3) -> Filter(parallelism 3) -> Map(parallelism 1) -> sink(parallelism 3) As per the understanding, max number of tasks slots should be bounded by the max parallelism, which in this case is 3. The behavior i observe is that, when more than required number of slots(say 10) are available in cluster, this job uses 5 slots. I attempted setting the same slotsharinggroup to all streamnodes, but didn't help. Is this a change in behavior in 1.4.0?. If so are there any workarounds to force the old behavior -- Thanks, -Vijay
Re: Incremental RocksDB checkpointing
I observed the job for 18 hrs, it went from 118kb to 1.10MB. I am using version 1.3.0 flink On Fri, Dec 1, 2017 at 11:39 AM, Stefan Richter wrote: > Maybe one more question: is the size always increasing, or will it also > reduce eventually? Over what period of time did you observe growth? From > the way how RocksDB works, it does persist updates in a way that is > sometimes closer to a log than in-place updates. So it is perfectly > possible that you can observe a growing state for some time. Eventually, if > the state reaches a critical mass, RocksDB will consolidate and prune the > written state and that is the time when you should also observe a drop in > size. > > From what it seems, you use case is working with a very small state, so if > this is not just a test you should reconsider if this is the right use-case > for a) incremental checkpoints and b) RocksDB at all. > > > Am 01.12.2017 um 16:34 schrieb vijayakumar palaniappan < > vijayakuma...@gmail.com>: > > > > I have simple event time window aggregate count function with > incremental checkpointing enabled. The checkpoint size keeps increasing > over a period of time, even though my input data has a single key and data > is flowing at a constant rate. > > > > When i turn off incremental checkpointing, checkpoint size remains > constant? > > > > Is there are any switches i need to enable or is this a bug? > > > > -- > > Thanks, > > -Vijay > > -- Thanks, -Vijay
Incremental RocksDB checkpointing
I have simple event time window aggregate count function with incremental checkpointing enabled. The checkpoint size keeps increasing over a period of time, even though my input data has a single key and data is flowing at a constant rate. When i turn off incremental checkpointing, checkpoint size remains constant? Is there are any switches i need to enable or is this a bug? -- Thanks, -Vijay
REST Endpoint for Triggering Savepoint
Hi, Is there a REST endpoint for triggering savepoint without cancelling the job? -- Thanks, -Vijay
Windowed Stream Queryable State Support
What is the state of Windowed Stream Queryable State Support? Is it available in 1.3 or planned for 1.4? Thanks Vijay
CEP Pattern detection progressively slower
While trying to use Pattern detection on KeyedStream, list of unique keys encountered, keeps accumulating in AbstractKeyedCEPPatternOperator.java. On encountering watermarks these accumulated keys are looped over to detect patterns. This causes spike in CPU usage and progressively gets slower. Is this a known issue Are there any workarounds? This happens with 1.2.0 version -- Thanks, -Vijay
CEP timeout does not trigger under certain conditions
-TimeoutPattern does not trigger under certain conditions. Following are the preconditions: -Assume a pattern of Event A followed by Event B within 2 Seconds -PeriodicWaterMarks every 1 second -Assume follwoing events have arrived. -Event A-1[time: 1 sec] -Event B-1[time: 2 sec] -Event A-2[time: 2 sec] -Event A-3[time: 5 sec] -WaterMark[time: 5 sec] I would assume that after watermark arrival, Event A-1,B-1 detected. A-2 timed out. But A-2 timeout does not happen. if i use a punctuated watermark and generate watermark for every event, it seems to work as expected. To me this seems to be a bug, is my understanding correct? -- Thanks, -Vijay