Error while deserializing the element

2021-08-18 Thread vijayakumar palaniappan
Setup Specifics:
Version: 1.6.2
RocksDB Map State
Timers stored in rocksdb

When we have this job running for long periods of time like > 30 days, if
for some reason the job restarts, we encounter "Error while deserializing
the element". Is this a known issue fixed in later versions? I see some
changes to code for FLINK-10175, but we don't use any queryable state

Below is the stack trace

org.apache.flink.util.FlinkRuntimeException: Error while deserializing the
element.

at
org.apache.flink.contrib.streaming.state.RocksDBCachingPriorityQueueSet.deserializeElement(RocksDBCachingPriorityQueueSet.java:389)

at
org.apache.flink.contrib.streaming.state.RocksDBCachingPriorityQueueSet.peek(RocksDBCachingPriorityQueueSet.java:146)

at
org.apache.flink.contrib.streaming.state.RocksDBCachingPriorityQueueSet.peek(RocksDBCachingPriorityQueueSet.java:56)

at
org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue$InternalPriorityQueueComparator.comparePriority(KeyGroupPartitionedPriorityQueue.java:274)

at
org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue$InternalPriorityQueueComparator.comparePriority(KeyGroupPartitionedPriorityQueue.java:261)

at
org.apache.flink.runtime.state.heap.HeapPriorityQueue.isElementPriorityLessThen(HeapPriorityQueue.java:164)

at
org.apache.flink.runtime.state.heap.HeapPriorityQueue.siftUp(HeapPriorityQueue.java:121)

at
org.apache.flink.runtime.state.heap.HeapPriorityQueue.addInternal(HeapPriorityQueue.java:85)

at
org.apache.flink.runtime.state.heap.AbstractHeapPriorityQueue.add(AbstractHeapPriorityQueue.java:73)

at
org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.(KeyGroupPartitionedPriorityQueue.java:89)

at
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBPriorityQueueSetFactory.create(RocksDBKeyedStateBackend.java:2792)

at
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.create(RocksDBKeyedStateBackend.java:450)

at
org.apache.flink.streaming.api.operators.InternalTimeServiceManager.createTimerPriorityQueue(InternalTimeServiceManager.java:121)

at
org.apache.flink.streaming.api.operators.InternalTimeServiceManager.registerOrGetTimerService(InternalTimeServiceManager.java:106)

at
org.apache.flink.streaming.api.operators.InternalTimeServiceManager.getInternalTimerService(InternalTimeServiceManager.java:87)

at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.getInternalTimerService(AbstractStreamOperator.java:764)

at
org.apache.flink.streaming.api.operators.KeyedProcessOperator.open(KeyedProcessOperator.java:61)

at
org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424)

at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290)

at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)

at java.lang.Thread.run(Thread.java:748)

Caused by: java.io.EOFException

at java.io.DataInputStream.readUnsignedByte(DataInputStream.java:290)

at org.apache.flink.types.StringValue.readString(StringValue.java:769)

at
org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:69)

at
org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:28)

at
org.apache.flink.api.java.typeutils.runtime.RowSerializer.deserialize(RowSerializer.java:179)

at
org.apache.flink.api.java.typeutils.runtime.RowSerializer.deserialize(RowSerializer.java:46)

at
org.apache.flink.streaming.api.operators.TimerSerializer.deserialize(TimerSerializer.java:168)

at
org.apache.flink.streaming.api.operators.TimerSerializer.deserialize(TimerSerializer.java:45)

at
org.apache.flink.contrib.streaming.state.RocksDBCachingPriorityQueueSet.deserializeElement(RocksDBCachingPriorityQueueSet.java:387)

... 20 more

-- 
Thanks,
-Vijay


Re: Loading Rules from compacted Kafka Topic - open() vs Connected Streams

2018-07-13 Thread vijayakumar palaniappan
What i was trying to achieve from above was similar to GlobalKTable in
Kafka Streams.
https://cwiki.apache.org/confluence/display/KAFKA/KIP-99%3A+Add+Global+Tables+to+Kafka+Streams
Also current flink version i am using is 1.4

Are there any other suggestions/guidance to achieve GlobalKTable functionality
in flink

Thanks.

On Thu, Jul 12, 2018 at 1:00 PM vijayakumar palaniappan <
vijayakuma...@gmail.com> wrote:

> Hello All,
> I can think of two options of implementing below requirement and request
> some guidance on choosing the option with pros and cons.
>
> Requirements:
> - A in memory rules cache to be loaded from log compacted kafka topic.
> This cache has to be loaded prior to arrival of events.
> - Updates to the log compacted kafka topic has to be tracked to keep the
> in memory rule cache up to date
>
> Additional properties of data:
> - On Job start/restart, this rule cache is always loaded from earliest
> available offset in the log. - No kafka offset store and restore required.
> - No checkpointing needed for the rule cache, as it is loaded afresh in
> event of crash and restore
> - No eventTime semantics required as we always want the latest rules to be
> loaded to cache
>
> Implementation Options:
>
> 1. Using a KafkaConsumer in open() doing a initial load, and continuously
> fetching rule updates and keeping the in memory cache up to date. This
> option is not using a DataStream for rules as we don't use any goodies of
> stream like state,checkpoint, event time etc.
> 2. Connected Stream approach. Using a KafkaConsumer in open() doing a
> initial load. Have a FlinkKafkaSource Stream connected with events. In this
> case have to take care of out of order updates to caches, since the rules
> updates are from open() and Rule DataStream.
>
> --
> Thanks,
> -Vijay
>


-- 
Thanks,
-Vijay


Loading Rules from compacted Kafka Topic - open() vs Connected Streams

2018-07-12 Thread vijayakumar palaniappan
 Hello All,
I can think of two options of implementing below requirement and request
some guidance on choosing the option with pros and cons.

Requirements:
- A in memory rules cache to be loaded from log compacted kafka topic. This
cache has to be loaded prior to arrival of events.
- Updates to the log compacted kafka topic has to be tracked to keep the in
memory rule cache up to date

Additional properties of data:
- On Job start/restart, this rule cache is always loaded from earliest
available offset in the log. - No kafka offset store and restore required.
- No checkpointing needed for the rule cache, as it is loaded afresh in
event of crash and restore
- No eventTime semantics required as we always want the latest rules to be
loaded to cache

Implementation Options:

1. Using a KafkaConsumer in open() doing a initial load, and continuously
fetching rule updates and keeping the in memory cache up to date. This
option is not using a DataStream for rules as we don't use any goodies of
stream like state,checkpoint, event time etc.
2. Connected Stream approach. Using a KafkaConsumer in open() doing a
initial load. Have a FlinkKafkaSource Stream connected with events. In this
case have to take care of out of order updates to caches, since the rules
updates are from open() and Rule DataStream.

-- 
Thanks,
-Vijay


Task Slots allocation

2018-03-04 Thread vijayakumar palaniappan
I am using flink 1.4.0 in standalone cluster mode

I have a job with a graph like a Source(parallelism 3) ->
Filter(parallelism 3) -> Map(parallelism 1) -> sink(parallelism 3)

As per the understanding, max number of tasks slots should be bounded by
the max parallelism, which in this case is 3.

The behavior i observe is that, when more than required number of slots(say
10) are available in cluster, this job uses 5 slots. I attempted setting
the same slotsharinggroup to all streamnodes, but didn't help.

Is this a change in behavior in 1.4.0?. If so are there any workarounds to
force the old behavior



-- 
Thanks,
-Vijay


Re: Incremental RocksDB checkpointing

2017-12-01 Thread vijayakumar palaniappan
I observed the job for 18 hrs, it went from 118kb to 1.10MB.

I am using version 1.3.0 flink

On Fri, Dec 1, 2017 at 11:39 AM, Stefan Richter  wrote:

> Maybe one more question: is the size always increasing, or will it also
> reduce eventually? Over what period of time did you observe growth? From
> the way how RocksDB works, it does persist updates in a way that is
> sometimes closer to a log than in-place updates. So it is perfectly
> possible that you can observe a growing state for some time. Eventually, if
> the state reaches a critical mass, RocksDB will consolidate and prune the
> written state and that is the time when you should also observe a drop in
> size.
>
> From what it seems, you use case is working with a very small state, so if
> this is not just a test you should reconsider if this is the right use-case
> for a) incremental checkpoints and b) RocksDB at all.
>
> > Am 01.12.2017 um 16:34 schrieb vijayakumar palaniappan <
> vijayakuma...@gmail.com>:
> >
> > I have simple event time window aggregate count function with
> incremental checkpointing enabled. The checkpoint size keeps increasing
> over a period of time, even though my input data has a single key and data
> is flowing at a constant rate.
> >
> > When i turn off incremental checkpointing, checkpoint size remains
> constant?
> >
> > Is there are any switches i need to enable or is this a bug?
> >
> > --
> > Thanks,
> > -Vijay
>
>


-- 
Thanks,
-Vijay


Incremental RocksDB checkpointing

2017-12-01 Thread vijayakumar palaniappan
I have simple event time window aggregate count function with incremental
checkpointing enabled. The checkpoint size keeps increasing over a period
of time, even though my input data has a single key and data is flowing at
a constant rate.

When i turn off incremental checkpointing, checkpoint size remains constant?

Is there are any switches i need to enable or is this a bug?

-- 
Thanks,
-Vijay


REST Endpoint for Triggering Savepoint

2017-11-07 Thread vijayakumar palaniappan
Hi,

Is there a REST endpoint for triggering savepoint without cancelling the
job?

-- 
Thanks,
-Vijay


Windowed Stream Queryable State Support

2017-10-07 Thread vijayakumar palaniappan
What is the state of Windowed Stream Queryable State Support?

Is it available in 1.3 or planned for 1.4?

Thanks
Vijay


CEP Pattern detection progressively slower

2017-04-11 Thread vijayakumar palaniappan
While trying to use Pattern detection on KeyedStream, list of unique keys
encountered, keeps accumulating in AbstractKeyedCEPPatternOperator.java. On
encountering watermarks these accumulated keys are looped over to detect
patterns. This causes spike in CPU usage and progressively gets slower.

Is this a known issue
Are there any workarounds?

This happens with 1.2.0 version

-- 
Thanks,
-Vijay


CEP timeout does not trigger under certain conditions

2017-04-11 Thread vijayakumar palaniappan
-TimeoutPattern does not trigger under certain conditions. Following are
the preconditions:

-Assume a pattern of Event A followed by Event B within 2 Seconds

-PeriodicWaterMarks every 1 second

-Assume follwoing events have arrived.

-Event A-1[time: 1 sec]

-Event B-1[time: 2 sec]

-Event A-2[time: 2 sec]

-Event A-3[time: 5 sec]

-WaterMark[time: 5 sec]

I would assume that after watermark arrival, Event A-1,B-1 detected. A-2
timed out. But A-2 timeout does not happen.

if i use a punctuated watermark and generate watermark for every event, it
seems to work as expected.

To me this seems to be a bug, is my understanding correct?

-- 
Thanks,
-Vijay