Re: NegativeArraySizeException trying to take a savepoint

2022-06-14 Thread Mike Barborak
Thank you for your replies.

Upgrading is in our plans but I think Yun is saying that might not help.

We are still trying to find what part of the savepoint is causing the error. We 
will try removing pieces of the job graph until we are able to savepoint.

From: Yun Tang 
Date: Tuesday, June 14, 2022 at 9:15 AM
To: Martijn Visser , Mike Barborak 
Cc: user@flink.apache.org 
Subject: Re: NegativeArraySizeException trying to take a savepoint
Hi Mike,

I think the root cause is that the size of java bytes array still exceed VM 
limit.
The exception message is not friendly and not covered by sanity check [1] as it 
uses different code path [2]:
The native method org.rocksdb.RocksIterator.$$YJP$$value0 would allocate the 
byte array directly without check.

If you want to walk around the problem, please consider to reduce the size of 
listState#add to avoid too large value.



[1] 
https://github.com/facebook/rocksdb/pull/3850<https://nam10.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Ffacebook%2Frocksdb%2Fpull%2F3850=05%7C01%7CMikeB%40ec.ai%7C833cb775caab42ac716a08da4e07f312%7Cf48a62c73e034851846d8f83284a7646%7C0%7C0%7C637908093305339455%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C3000%7C%7C%7C=PutaTHzccBngCe3uakdS8u55cKyr3SY3cpVfhO1Ks2Y%3D=0>
[2] 
https://github.com/ververica/frocksdb/blob/8608d75d85f8e1b3b64b73a4fb6d19baec61ba5c/java/rocksjni/iterator.cc#L239-L245<https://nam10.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fververica%2Ffrocksdb%2Fblob%2F8608d75d85f8e1b3b64b73a4fb6d19baec61ba5c%2Fjava%2Frocksjni%2Fiterator.cc%23L239-L245=05%7C01%7CMikeB%40ec.ai%7C833cb775caab42ac716a08da4e07f312%7Cf48a62c73e034851846d8f83284a7646%7C0%7C0%7C637908093305339455%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C3000%7C%7C%7C=PgfYK71IU45UeLH2ywbamECTm8ADE%2B8%2Fq7MBjjZI7eM%3D=0>

Best
Yun Tang


From: Martijn Visser 
Sent: Monday, June 13, 2022 21:47
To: Mike Barborak 
Cc: user@flink.apache.org 
Subject: Re: NegativeArraySizeException trying to take a savepoint

Hi Mike,

It would be worthwhile to check if this still occurs in Flink 1.14, since Flink 
bumped to a newer version of RocksDB in that version. Is that a possibility for 
you?

Best regards,

Martijn

Op ma 13 jun. 2022 om 15:21 schreef Mike Barborak 
mailto:mi...@ec.ai>>:

When trying to savepoint our job, we are getting the stack trace below. Is 
there a way to know more about this failure? Like which function in the job 
graph is associated with the problematic state and which key (assuming it is 
keyed state)?



Or is there a fix for this exception? The only mention of this exception that I 
can find is in [1] and [2]. [1] has a message at the bottom saying that the 
issue was fixed in RocksDb in 2018. And while we do have a part of the job 
graph that matches the pattern discussed in these two links, our attempts to 
reproduce the problem by pumping messages through at a rate millions of times 
higher than normal have not worked.



We are using Flink version 1.13.5.



Thanks,

Mike



[1] 
https://issues.apache.org/jira/browse/FLINK-9268<https://nam10.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FFLINK-9268=05%7C01%7CMikeB%40ec.ai%7C833cb775caab42ac716a08da4e07f312%7Cf48a62c73e034851846d8f83284a7646%7C0%7C0%7C637908093305339455%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C3000%7C%7C%7C=ARv5JT6h4SLWgJg71STYViCtRPFmI7nd4eYFiKtF7OM%3D=0>

[2] 
https://www.mail-archive.com/user@flink.apache.org/msg34915.html<https://nam10.safelinks.protection.outlook.com/?url=https%3A%2F%2Fwww.mail-archive.com%2Fuser%40flink.apache.org%2Fmsg34915.html=05%7C01%7CMikeB%40ec.ai%7C833cb775caab42ac716a08da4e07f312%7Cf48a62c73e034851846d8f83284a7646%7C0%7C0%7C637908093305339455%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C3000%7C%7C%7C=n3ZXESXxeTsFAWelJcOn8lNNWRXw%2BrK0KIAN4qI4V5c%3D=0>



Caused by: java.lang.Exception: Could not materialize checkpoint 49768 for 
operator KeyedProcess -> KeyedProcess -> re-operator-output -> Sink: Kafka sink 
to ec.platform.braid.responses-rtw (9/15)#0.

at 
org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.handleExecutionException(AsyncCheckpointRunnable.java:257)

... 4 more

Caused by: java.util.concurrent.ExecutionException: 
java.lang.NegativeArraySizeException: -785722504

at java.base/java.util.concurrent.FutureTask.report(Unknown 
Source)

at java.base/java.util.concurrent.FutureTask.get(Unknown Source)

at 
org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:636)

at 
org.apache.flink.streaming.api.operators.OperatorSnapshotFinal

Re: NegativeArraySizeException trying to take a savepoint

2022-06-14 Thread Yun Tang
Hi Mike,

I think the root cause is that the size of java bytes array still exceed VM 
limit.
The exception message is not friendly and not covered by sanity check [1] as it 
uses different code path [2]:
The native method org.rocksdb.RocksIterator.$$YJP$$value0 would allocate the 
byte array directly without check.

If you want to walk around the problem, please consider to reduce the size of 
listState#add to avoid too large value.



[1] https://github.com/facebook/rocksdb/pull/3850
[2] 
https://github.com/ververica/frocksdb/blob/8608d75d85f8e1b3b64b73a4fb6d19baec61ba5c/java/rocksjni/iterator.cc#L239-L245

Best
Yun Tang


From: Martijn Visser 
Sent: Monday, June 13, 2022 21:47
To: Mike Barborak 
Cc: user@flink.apache.org 
Subject: Re: NegativeArraySizeException trying to take a savepoint

Hi Mike,

It would be worthwhile to check if this still occurs in Flink 1.14, since Flink 
bumped to a newer version of RocksDB in that version. Is that a possibility for 
you?

Best regards,

Martijn

Op ma 13 jun. 2022 om 15:21 schreef Mike Barborak 
mailto:mi...@ec.ai>>:

When trying to savepoint our job, we are getting the stack trace below. Is 
there a way to know more about this failure? Like which function in the job 
graph is associated with the problematic state and which key (assuming it is 
keyed state)?



Or is there a fix for this exception? The only mention of this exception that I 
can find is in [1] and [2]. [1] has a message at the bottom saying that the 
issue was fixed in RocksDb in 2018. And while we do have a part of the job 
graph that matches the pattern discussed in these two links, our attempts to 
reproduce the problem by pumping messages through at a rate millions of times 
higher than normal have not worked.



We are using Flink version 1.13.5.



Thanks,

Mike



[1] https://issues.apache.org/jira/browse/FLINK-9268

[2] https://www.mail-archive.com/user@flink.apache.org/msg34915.html



Caused by: java.lang.Exception: Could not materialize checkpoint 49768 for 
operator KeyedProcess -> KeyedProcess -> re-operator-output -> Sink: Kafka sink 
to ec.platform.braid.responses-rtw (9/15)#0.

at 
org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.handleExecutionException(AsyncCheckpointRunnable.java:257)

... 4 more

Caused by: java.util.concurrent.ExecutionException: 
java.lang.NegativeArraySizeException: -785722504

at java.base/java.util.concurrent.FutureTask.report(Unknown 
Source)

at java.base/java.util.concurrent.FutureTask.get(Unknown Source)

at 
org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:636)

at 
org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.(OperatorSnapshotFinalizer.java:54)

at 
org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.run(AsyncCheckpointRunnable.java:128)

... 3 more

Caused by: java.lang.NegativeArraySizeException: -785722504

at org.rocksdb.RocksIterator.$$YJP$$value0(Native Method)

at org.rocksdb.RocksIterator.value0(RocksIterator.java)

at org.rocksdb.RocksIterator.value(RocksIterator.java:50)

at 
org.apache.flink.contrib.streaming.state.RocksIteratorWrapper.value(RocksIteratorWrapper.java:103)

at 
org.apache.flink.contrib.streaming.state.iterator.RocksSingleStateIterator.value(RocksSingleStateIterator.java:66)

at 
org.apache.flink.contrib.streaming.state.iterator.RocksStatesPerKeyGroupMergeIterator.value(RocksStatesPerKeyGroupMergeIterator.java:202)

at 
org.apache.flink.runtime.state.FullSnapshotAsyncWriter.writeKVStateData(FullSnapshotAsyncWriter.java:210)

at 
org.apache.flink.runtime.state.FullSnapshotAsyncWriter.writeSnapshotToOutputStream(FullSnapshotAsyncWriter.java:107)

at 
org.apache.flink.runtime.state.FullSnapshotAsyncWriter.get(FullSnapshotAsyncWriter.java:77)

at 
org.apache.flink.runtime.state.SnapshotStrategyRunner$1.callInternal(SnapshotStrategyRunner.java:91)

at 
org.apache.flink.runtime.state.SnapshotStrategyRunner$1.callInternal(SnapshotStrategyRunner.java:88)

at 
org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:78)

at java.base/java.util.concurrent.FutureTask.run(Unknown Source)

at 
org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:633)


Re: NegativeArraySizeException trying to take a savepoint

2022-06-13 Thread Martijn Visser
Hi Mike,

It would be worthwhile to check if this still occurs in Flink 1.14, since
Flink bumped to a newer version of RocksDB in that version. Is that a
possibility for you?

Best regards,

Martijn

Op ma 13 jun. 2022 om 15:21 schreef Mike Barborak :

> When trying to savepoint our job, we are getting the stack trace below. Is
> there a way to know more about this failure? Like which function in the job
> graph is associated with the problematic state and which key (assuming it
> is keyed state)?
>
>
>
> Or is there a fix for this exception? The only mention of this exception
> that I can find is in [1] and [2]. [1] has a message at the bottom saying
> that the issue was fixed in RocksDb in 2018. And while we do have a part of
> the job graph that matches the pattern discussed in these two links, our
> attempts to reproduce the problem by pumping messages through at a rate
> millions of times higher than normal have not worked.
>
>
>
> We are using Flink version 1.13.5.
>
>
>
> Thanks,
>
> Mike
>
>
>
> [1] https://issues.apache.org/jira/browse/FLINK-9268
>
> [2] https://www.mail-archive.com/user@flink.apache.org/msg34915.html
>
>
>
> Caused by: java.lang.Exception: Could not materialize checkpoint 49768 for
> operator KeyedProcess -> KeyedProcess -> re-operator-output -> Sink: Kafka
> sink to ec.platform.braid.responses-rtw (9/15)#0.
>
> at
> org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.handleExecutionException(AsyncCheckpointRunnable.java:257)
>
> ... 4 more
>
> Caused by: java.util.concurrent.ExecutionException:
> java.lang.NegativeArraySizeException: -785722504
>
> at
> java.base/java.util.concurrent.FutureTask.report(Unknown Source)
>
> at java.base/java.util.concurrent.FutureTask.get(Unknown
> Source)
>
> at
> org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:636)
>
> at
> org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.(OperatorSnapshotFinalizer.java:54)
>
> at
> org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.run(AsyncCheckpointRunnable.java:128)
>
> ... 3 more
>
> Caused by: java.lang.NegativeArraySizeException: -785722504
>
> at org.rocksdb.RocksIterator.$$YJP$$value0(Native Method)
>
> at org.rocksdb.RocksIterator.value0(RocksIterator.java)
>
> at org.rocksdb.RocksIterator.value(RocksIterator.java:50)
>
> at
> org.apache.flink.contrib.streaming.state.RocksIteratorWrapper.value(RocksIteratorWrapper.java:103)
>
> at
> org.apache.flink.contrib.streaming.state.iterator.RocksSingleStateIterator.value(RocksSingleStateIterator.java:66)
>
> at
> org.apache.flink.contrib.streaming.state.iterator.RocksStatesPerKeyGroupMergeIterator.value(RocksStatesPerKeyGroupMergeIterator.java:202)
>
> at
> org.apache.flink.runtime.state.FullSnapshotAsyncWriter.writeKVStateData(FullSnapshotAsyncWriter.java:210)
>
> at
> org.apache.flink.runtime.state.FullSnapshotAsyncWriter.writeSnapshotToOutputStream(FullSnapshotAsyncWriter.java:107)
>
> at
> org.apache.flink.runtime.state.FullSnapshotAsyncWriter.get(FullSnapshotAsyncWriter.java:77)
>
> at
> org.apache.flink.runtime.state.SnapshotStrategyRunner$1.callInternal(SnapshotStrategyRunner.java:91)
>
> at
> org.apache.flink.runtime.state.SnapshotStrategyRunner$1.callInternal(SnapshotStrategyRunner.java:88)
>
> at
> org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:78)
>
> at java.base/java.util.concurrent.FutureTask.run(Unknown
> Source)
>
> at
> org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:633)
>