Re: ArrayIndexOutOfBoundException on checkpoint creation

2019-12-15 Thread Theo Diefenthal
Hi, 

Just for completeness sake: I followed the advice of Gyula and now everything 
seems to work. 
I changed my HashSet to managed MapState of Flink and changed my LinkedList to 
a custom class of mine which is recognized by Flink as POJOType and thus don't 
use Kryo for serialization. This new version survived a few stress tests 
already. Seems really to be some kryo / race condition which is at best avoided 
by avoiding kryo :) 

Best regards 
Theo 


Von: "Gyula Fóra"  
An: "Theo Diefenthal"  
CC: "user"  
Gesendet: Freitag, 29. November 2019 10:53:20 
Betreff: Re: ArrayIndexOutOfBoundException on checkpoint creation 

Hi Theo! 

I have not seen this error before however I have encountered many strange 
things when using Kryo for serialization. From the stack trace it seems that 
this might indeed be a Kryo related issue. 

I am not sure what it is but what I would try is to change the state 
serializers to a non Kryo variants. 

When you create the state you can specify the TypeInformation object for the 
state instead of the class. You have 2 states: 

For LinkedList: 
Use the ListTypeInfo class and change your state type to List 

For HashSet: 
There is no Set Type info built into Flink but you can replace it with a Map 
and use the MapTypeInfo to try it out, before implementing a custom type info 

Also make sure that your MyPOJO type is a nice flink supported pojo. All public 
non-final fields + empty constructor. 

Let me know if this makes sense :) 

Cheers, 
Gyula 

On Wed, Nov 27, 2019 at 2:18 PM [ mailto:theo.diefent...@scoop-software.de | 
theo.diefent...@scoop-software.de ] < [ 
mailto:theo.diefent...@scoop-software.de | theo.diefent...@scoop-software.de ] 
> wrote: 


Sorry, I forgot to mention the environment. 
We use Flink 1.9.1 on a cloudera cdh6. 3.1 cluster (with Hadoop 3.0.0 but using 
Flink shaded 2.8.3-7. Might this be a problem? As it seems to arise from kryo, 
I doubt it) 
Our flink is configured as default. Our job uses FsStateBackend and exactly 
once processing with Kafka source and sink. 
Best regardsTheo 
 Ursprüngliche Nachricht  
Betreff: ArrayIndexOutOfBoundException on checkpoint creation 
Von: Theo Diefenthal 
An: user 
Cc: 


Hi, 

We have a pipeline with a custom ProcessFunction and state (see [1], 
implemented as suggested by Fabian with a ValueState and 
ValueState>) 
The behavior of that function works fine in our unittests and with low load in 
our test environment (100.000 records per minute). On the production 
environment, we observe reproduceable crashes like the attached one. 
Any idea on why this out of bound could be caused? Every time we read the state 
and modify it, we are certain that an .update() was called: 

2019-11-26T11:26:55+01:00 host19 java.lang.Exception: Could not materialize 
checkpoint 7 for operator our_operator) (4/8). 
2019-11-26T11:26:55+01:00 host19 at 
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:1100)
 
2019-11-26T11:26:55+01:00 host19 at 
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1042)
 
2019-11-26T11:26:55+01:00 host19 at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
2019-11-26T11:26:55+01:00 host19 at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
2019-11-26T11:26:55+01:00 host19 at java.lang.Thread.run(Thread.java:745) 
2019-11-26T11:26:55+01:00 host19 Caused by: 
java.util.concurrent.ExecutionException: 
java.lang.ArrayIndexOutOfBoundsException: 67108864 
2019-11-26T11:26:55+01:00 host19 at 
java.util.concurrent.FutureTask.report(FutureTask.java:122) 
2019-11-26T11:26:55+01:00 host19 at 
java.util.concurrent.FutureTask.get(FutureTask.java:192) 
2019-11-26T11:26:55+01:00 host19 at 
org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:450)
 
2019-11-26T11:26:55+01:00 host19 at 
org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.(OperatorSnapshotFinalizer.java:47)
 
2019-11-26T11:26:55+01:00 host19 at 
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1011)
 
2019-11-26T11:26:55+01:00 host19 ... 3 more 
2019-11-26T11:26:55+01:00 host19 Caused by: 
java.lang.ArrayIndexOutOfBoundsException: 67108864 
2019-11-26T11:26:55+01:00 host19 at 
com.esotericsoftware.kryo.util.IdentityObjectIntMap.clear(IdentityObjectIntMap.java:364)
 
2019-11-26T11:26:55+01:00 host19 at 
com.esotericsoftware.kryo.util.MapReferenceResolver.reset(MapReferenceResolver.java:47)
 
2019-11-26T11:26:55+01:00 host19 at 
com.esotericsoftware.kryo.Kryo.reset(Kryo.java:836) 
2019-11-26T11:26:55+01:00 host19 at 
com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:601) 
2019-11-26T11:26:55+01:00 host19 at 
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.serialize(KryoSerializer.java:

Re: ArrayIndexOutOfBoundException on checkpoint creation

2019-11-29 Thread Gyula Fóra
Hi Theo!

I have not seen this error before however I have encountered many strange
things when using Kryo for serialization. From the stack trace it seems
that this might indeed be a Kryo related issue.

I am not sure what it is but what I would try is to change the state
serializers to a non Kryo variants.

When you create the state you can specify the TypeInformation object for
the state instead of the class. You have 2 states:

For LinkedList:
Use the ListTypeInfo class and change your state type to List

For HashSet:
There is no Set Type info built into Flink but you can replace it with a
Map and use the MapTypeInfo to try it out, before implementing a custom
type info

Also make sure that your MyPOJO type is a nice flink supported pojo. All
public non-final fields + empty constructor.

Let me know if this makes sense :)

Cheers,
Gyula

On Wed, Nov 27, 2019 at 2:18 PM theo.diefent...@scoop-software.de <
theo.diefent...@scoop-software.de> wrote:

> Sorry, I forgot to mention the environment.
> We use Flink 1.9.1 on a cloudera cdh6. 3.1 cluster (with Hadoop 3.0.0 but
> using Flink shaded 2.8.3-7. Might this be a problem? As it seems to arise
> from kryo, I doubt it)
> Our flink is configured as default. Our job uses FsStateBackend and
> exactly once processing with Kafka source and sink.
> Best regardsTheo
>  Ursprüngliche Nachricht 
> Betreff: ArrayIndexOutOfBoundException on checkpoint creation
> Von: Theo Diefenthal
> An: user
> Cc:
>
>
> Hi,
>
> We have a pipeline with a custom ProcessFunction and state (see [1],
> implemented as suggested by Fabian with a ValueState and
> ValueState>)
> The behavior of that function works fine in our unittests and with low
> load in our test environment (100.000 records per minute). On the
> production environment, we observe reproduceable crashes like the attached
> one.
> Any idea on why this out of bound could be caused? Every time we read the
> state and modify it, we are certain that an .update() was called:
>
> 2019-11-26T11:26:55+01:00 host19 java.lang.Exception: Could not
> materialize checkpoint 7 for operator our_operator) (4/8).
> 2019-11-26T11:26:55+01:00 host19 at
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:1100)
>
> 2019-11-26T11:26:55+01:00 host19 at
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1042)
>
> 2019-11-26T11:26:55+01:00 host19 at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>
> 2019-11-26T11:26:55+01:00 host19 at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>
> 2019-11-26T11:26:55+01:00 host19 at
> java.lang.Thread.run(Thread.java:745)
> 2019-11-26T11:26:55+01:00 host19 Caused by:
> java.util.concurrent.ExecutionException:
> java.lang.ArrayIndexOutOfBoundsException: 67108864
> 2019-11-26T11:26:55+01:00 host19 at
> java.util.concurrent.FutureTask.report(FutureTask.java:122)
> 2019-11-26T11:26:55+01:00 host19 at
> java.util.concurrent.FutureTask.get(FutureTask.java:192)
> 2019-11-26T11:26:55+01:00 host19 at
> org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:450)
>
> 2019-11-26T11:26:55+01:00 host19 at
> org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.(OperatorSnapshotFinalizer.java:47)
>
> 2019-11-26T11:26:55+01:00 host19 at
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1011)
>
> 2019-11-26T11:26:55+01:00 host19 ... 3 more
> 2019-11-26T11:26:55+01:00 host19 Caused by:
> java.lang.ArrayIndexOutOfBoundsException: 67108864
> 2019-11-26T11:26:55+01:00 host19 at
> com.esotericsoftware.kryo.util.IdentityObjectIntMap.clear(IdentityObjectIntMap.java:364)
>
> 2019-11-26T11:26:55+01:00 host19 at
> com.esotericsoftware.kryo.util.MapReferenceResolver.reset(MapReferenceResolver.java:47)
>
> 2019-11-26T11:26:55+01:00 host19 at
> com.esotericsoftware.kryo.Kryo.reset(Kryo.java:836)
> 2019-11-26T11:26:55+01:00 host19 at
> com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:601)
> 2019-11-26T11:26:55+01:00 host19 at
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.serialize(KryoSerializer.java:305)
>
> 2019-11-26T11:26:55+01:00 host19 at
> org.apache.flink.runtime.state.heap.CopyOnWriteStateMapSnapshot.writeState(CopyOnWriteStateMapSnapshot.java:116)
>
> 2019-11-26T11:26:55+01:00 host19 at
> org.apache.flink.runtime.state.heap.AbstractStateTableSnapshot.writeStateInKeyGroup(AbstractStateTableSnapshot.java:121)
>
> 2019-11-26T11:26:55+01:00 host19 at
> org.apache.flink.runtime.state.heap.Copy

AW: ArrayIndexOutOfBoundException on checkpoint creation

2019-11-27 Thread theo.diefent...@scoop-software.de
Sorry, I forgot to mention the environment.
We use Flink 1.9.1 on a cloudera cdh6. 3.1 cluster (with Hadoop 3.0.0 but using 
Flink shaded 2.8.3-7. Might this be a problem? As it seems to arise from kryo, 
I doubt it)
Our flink is configured as default. Our job uses FsStateBackend and exactly 
once processing with Kafka source and sink.
Best regardsTheo
 Ursprüngliche Nachricht 
Betreff: ArrayIndexOutOfBoundException on checkpoint creation
Von: Theo Diefenthal
An: user
Cc:


Hi, 

We have a pipeline with a custom ProcessFunction and state (see [1], 
implemented as suggested by Fabian with a ValueState and 
ValueState>) 
The behavior of that function works fine in our unittests and with low load in 
our test environment (100.000 records per minute). On the production 
environment, we observe reproduceable crashes like the attached one. 
Any idea on why this out of bound could be caused? Every time we read the state 
and modify it, we are certain that an .update() was called: 

2019-11-26T11:26:55+01:00 host19 java.lang.Exception: Could not materialize 
checkpoint 7 for operator our_operator) (4/8). 
2019-11-26T11:26:55+01:00 host19 at 
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:1100)
 
2019-11-26T11:26:55+01:00 host19 at 
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1042)
 
2019-11-26T11:26:55+01:00 host19 at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
2019-11-26T11:26:55+01:00 host19 at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
2019-11-26T11:26:55+01:00 host19 at java.lang.Thread.run(Thread.java:745) 
2019-11-26T11:26:55+01:00 host19 Caused by: 
java.util.concurrent.ExecutionException: 
java.lang.ArrayIndexOutOfBoundsException: 67108864 
2019-11-26T11:26:55+01:00 host19 at 
java.util.concurrent.FutureTask.report(FutureTask.java:122) 
2019-11-26T11:26:55+01:00 host19 at 
java.util.concurrent.FutureTask.get(FutureTask.java:192) 
2019-11-26T11:26:55+01:00 host19 at 
org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:450)
 
2019-11-26T11:26:55+01:00 host19 at 
org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.(OperatorSnapshotFinalizer.java:47)
 
2019-11-26T11:26:55+01:00 host19 at 
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1011)
 
2019-11-26T11:26:55+01:00 host19 ... 3 more 
2019-11-26T11:26:55+01:00 host19 Caused by: 
java.lang.ArrayIndexOutOfBoundsException: 67108864 
2019-11-26T11:26:55+01:00 host19 at 
com.esotericsoftware.kryo.util.IdentityObjectIntMap.clear(IdentityObjectIntMap.java:364)
 
2019-11-26T11:26:55+01:00 host19 at 
com.esotericsoftware.kryo.util.MapReferenceResolver.reset(MapReferenceResolver.java:47)
 
2019-11-26T11:26:55+01:00 host19 at 
com.esotericsoftware.kryo.Kryo.reset(Kryo.java:836) 
2019-11-26T11:26:55+01:00 host19 at 
com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:601) 
2019-11-26T11:26:55+01:00 host19 at 
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.serialize(KryoSerializer.java:305)
 
2019-11-26T11:26:55+01:00 host19 at 
org.apache.flink.runtime.state.heap.CopyOnWriteStateMapSnapshot.writeState(CopyOnWriteStateMapSnapshot.java:116)
 
2019-11-26T11:26:55+01:00 host19 at 
org.apache.flink.runtime.state.heap.AbstractStateTableSnapshot.writeStateInKeyGroup(AbstractStateTableSnapshot.java:121)
 
2019-11-26T11:26:55+01:00 host19 at 
org.apache.flink.runtime.state.heap.CopyOnWriteStateTableSnapshot.writeStateInKeyGroup(CopyOnWriteStateTableSnapshot.java:37)
 
2019-11-26T11:26:55+01:00 host19 at 
org.apache.flink.runtime.state.heap.HeapSnapshotStrategy$1.callInternal(HeapSnapshotStrategy.java:191)
 
2019-11-26T11:26:55+01:00 host19 at 
org.apache.flink.runtime.state.heap.HeapSnapshotStrategy$1.callInternal(HeapSnapshotStrategy.java:158)
 
2019-11-26T11:26:55+01:00 host19 at 
org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75)
 
2019-11-26T11:26:55+01:00 host19 at 
java.util.concurrent.FutureTask.run(FutureTask.java:266) 
2019-11-26T11:26:55+01:00 host19 at 
org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:447)
 
2019-11-26T11:26:55+01:00 host19 ... 5 more 
2019-11-26T11:26:55+01:00 host18 WARN  org.apache.hadoop.hdfs.DataStreamer  
 - DataStreamer Exception 
2019-11-26T11:26:55+01:00 host18 java.io.FileNotFoundException: File does not 
exist: 
/.../STATE/CHECKPOINTS/0a2e111b3a800aae0d3b49f33e0db6f3/chk-7/3da2a0a4-f5ef-4e8c-bc1a-9fe892cb0b18
 (inode 577546140) Holder DFSClient_NONMAPREDUCE_-1714419242_95 does not have 
any open files. 
2019-11-26T11:26:55+01:00 host18 at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkLe

Re: ArrayIndexOutOfBoundException on checkpoint creation

2019-11-27 Thread Congxian Qiu
Hi

Which version are you using now(if on some old version, could you please
try if this exception is till there on Flink 1.9),  on the other hand, did
you try RocksDBStateBackend for this?

Best,
Congxian


Theo Diefenthal  于2019年11月26日周二 下午6:52写道:

> Hi,
>
> We have a pipeline with a custom ProcessFunction and state (see [1],
> implemented as suggested by Fabian with a ValueState and
> ValueState>)
> The behavior of that function works fine in our unittests and with low
> load in our test environment (100.000 records per minute). On the
> production environment, we observe reproduceable crashes like the attached
> one.
> Any idea on why this out of bound could be caused? Every time we read the
> state and modify it, we are certain that an .update() was called:
>
> 2019-11-26T11:26:55+01:00 host19 java.lang.Exception: Could not materialize 
> checkpoint 7 for operator our_operator) (4/8).
> 2019-11-26T11:26:55+01:00 host19 at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:1100)
> 2019-11-26T11:26:55+01:00 host19 at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1042)
> 2019-11-26T11:26:55+01:00 host19 at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> 2019-11-26T11:26:55+01:00 host19 at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> 2019-11-26T11:26:55+01:00 host19 at java.lang.Thread.run(Thread.java:745)
> 2019-11-26T11:26:55+01:00 host19 Caused by: 
> java.util.concurrent.ExecutionException: 
> java.lang.ArrayIndexOutOfBoundsException: 67108864
> 2019-11-26T11:26:55+01:00 host19 at 
> java.util.concurrent.FutureTask.report(FutureTask.java:122)
> 2019-11-26T11:26:55+01:00 host19 at 
> java.util.concurrent.FutureTask.get(FutureTask.java:192)
> 2019-11-26T11:26:55+01:00 host19 at 
> org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:450)
> 2019-11-26T11:26:55+01:00 host19 at 
> org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.(OperatorSnapshotFinalizer.java:47)
> 2019-11-26T11:26:55+01:00 host19 at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1011)
> 2019-11-26T11:26:55+01:00 host19 ... 3 more
> 2019-11-26T11:26:55+01:00 host19 Caused by: 
> java.lang.ArrayIndexOutOfBoundsException: 67108864
> 2019-11-26T11:26:55+01:00 host19 at 
> com.esotericsoftware.kryo.util.IdentityObjectIntMap.clear(IdentityObjectIntMap.java:364)
> 2019-11-26T11:26:55+01:00 host19 at 
> com.esotericsoftware.kryo.util.MapReferenceResolver.reset(MapReferenceResolver.java:47)
> 2019-11-26T11:26:55+01:00 host19 at 
> com.esotericsoftware.kryo.Kryo.reset(Kryo.java:836)
> 2019-11-26T11:26:55+01:00 host19 at 
> com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:601)
> 2019-11-26T11:26:55+01:00 host19 at 
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.serialize(KryoSerializer.java:305)
> 2019-11-26T11:26:55+01:00 host19 at 
> org.apache.flink.runtime.state.heap.CopyOnWriteStateMapSnapshot.writeState(CopyOnWriteStateMapSnapshot.java:116)
> 2019-11-26T11:26:55+01:00 host19 at 
> org.apache.flink.runtime.state.heap.AbstractStateTableSnapshot.writeStateInKeyGroup(AbstractStateTableSnapshot.java:121)
> 2019-11-26T11:26:55+01:00 host19 at 
> org.apache.flink.runtime.state.heap.CopyOnWriteStateTableSnapshot.writeStateInKeyGroup(CopyOnWriteStateTableSnapshot.java:37)
> 2019-11-26T11:26:55+01:00 host19 at 
> org.apache.flink.runtime.state.heap.HeapSnapshotStrategy$1.callInternal(HeapSnapshotStrategy.java:191)
> 2019-11-26T11:26:55+01:00 host19 at 
> org.apache.flink.runtime.state.heap.HeapSnapshotStrategy$1.callInternal(HeapSnapshotStrategy.java:158)
> 2019-11-26T11:26:55+01:00 host19 at 
> org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75)
> 2019-11-26T11:26:55+01:00 host19 at 
> java.util.concurrent.FutureTask.run(FutureTask.java:266)
> 2019-11-26T11:26:55+01:00 host19 at 
> org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:447)
> 2019-11-26T11:26:55+01:00 host19 ... 5 more
> 2019-11-26T11:26:55+01:00 host18 WARN  org.apache.hadoop.hdfs.DataStreamer
>- DataStreamer Exception
> 2019-11-26T11:26:55+01:00 host18 java.io.FileNotFoundException: File does not 
> exist: 
> /.../STATE/CHECKPOINTS/0a2e111b3a800aae0d3b49f33e0db6f3/chk-7/3da2a0a4-f5ef-4e8c-bc1a-9fe892cb0b18
>  (inode 577546140) Holder DFSClient_NONMAPREDUCE_-1714419242_95 does not have 
> any open files.
> 2019-11-26T11:26:55+01:00 host18 at 
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkLease(FSNamesystem.java:2782)
> 2019-11-26T11:26:55+01:00 host18 at 
> 

ArrayIndexOutOfBoundException on checkpoint creation

2019-11-26 Thread Theo Diefenthal
Hi, 

We have a pipeline with a custom ProcessFunction and state (see [1], 
implemented as suggested by Fabian with a ValueState and 
ValueState>) 
The behavior of that function works fine in our unittests and with low load in 
our test environment (100.000 records per minute). On the production 
environment, we observe reproduceable crashes like the attached one. 
Any idea on why this out of bound could be caused? Every time we read the state 
and modify it, we are certain that an .update() was called: 

2019-11-26T11:26:55+01:00 host19 java.lang.Exception: Could not materialize 
checkpoint 7 for operator our_operator) (4/8). 
2019-11-26T11:26:55+01:00 host19 at 
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:1100)
 
2019-11-26T11:26:55+01:00 host19 at 
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1042)
 
2019-11-26T11:26:55+01:00 host19 at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
2019-11-26T11:26:55+01:00 host19 at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
2019-11-26T11:26:55+01:00 host19 at java.lang.Thread.run(Thread.java:745) 
2019-11-26T11:26:55+01:00 host19 Caused by: 
java.util.concurrent.ExecutionException: 
java.lang.ArrayIndexOutOfBoundsException: 67108864 
2019-11-26T11:26:55+01:00 host19 at 
java.util.concurrent.FutureTask.report(FutureTask.java:122) 
2019-11-26T11:26:55+01:00 host19 at 
java.util.concurrent.FutureTask.get(FutureTask.java:192) 
2019-11-26T11:26:55+01:00 host19 at 
org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:450)
 
2019-11-26T11:26:55+01:00 host19 at 
org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.(OperatorSnapshotFinalizer.java:47)
 
2019-11-26T11:26:55+01:00 host19 at 
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1011)
 
2019-11-26T11:26:55+01:00 host19 ... 3 more 
2019-11-26T11:26:55+01:00 host19 Caused by: 
java.lang.ArrayIndexOutOfBoundsException: 67108864 
2019-11-26T11:26:55+01:00 host19 at 
com.esotericsoftware.kryo.util.IdentityObjectIntMap.clear(IdentityObjectIntMap.java:364)
 
2019-11-26T11:26:55+01:00 host19 at 
com.esotericsoftware.kryo.util.MapReferenceResolver.reset(MapReferenceResolver.java:47)
 
2019-11-26T11:26:55+01:00 host19 at 
com.esotericsoftware.kryo.Kryo.reset(Kryo.java:836) 
2019-11-26T11:26:55+01:00 host19 at 
com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:601) 
2019-11-26T11:26:55+01:00 host19 at 
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.serialize(KryoSerializer.java:305)
 
2019-11-26T11:26:55+01:00 host19 at 
org.apache.flink.runtime.state.heap.CopyOnWriteStateMapSnapshot.writeState(CopyOnWriteStateMapSnapshot.java:116)
 
2019-11-26T11:26:55+01:00 host19 at 
org.apache.flink.runtime.state.heap.AbstractStateTableSnapshot.writeStateInKeyGroup(AbstractStateTableSnapshot.java:121)
 
2019-11-26T11:26:55+01:00 host19 at 
org.apache.flink.runtime.state.heap.CopyOnWriteStateTableSnapshot.writeStateInKeyGroup(CopyOnWriteStateTableSnapshot.java:37)
 
2019-11-26T11:26:55+01:00 host19 at 
org.apache.flink.runtime.state.heap.HeapSnapshotStrategy$1.callInternal(HeapSnapshotStrategy.java:191)
 
2019-11-26T11:26:55+01:00 host19 at 
org.apache.flink.runtime.state.heap.HeapSnapshotStrategy$1.callInternal(HeapSnapshotStrategy.java:158)
 
2019-11-26T11:26:55+01:00 host19 at 
org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75)
 
2019-11-26T11:26:55+01:00 host19 at 
java.util.concurrent.FutureTask.run(FutureTask.java:266) 
2019-11-26T11:26:55+01:00 host19 at 
org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:447)
 
2019-11-26T11:26:55+01:00 host19 ... 5 more 
2019-11-26T11:26:55+01:00 host18 WARN  org.apache.hadoop.hdfs.DataStreamer  
 - DataStreamer Exception 
2019-11-26T11:26:55+01:00 host18 java.io.FileNotFoundException: File does not 
exist: 
/.../STATE/CHECKPOINTS/0a2e111b3a800aae0d3b49f33e0db6f3/chk-7/3da2a0a4-f5ef-4e8c-bc1a-9fe892cb0b18
 (inode 577546140) Holder DFSClient_NONMAPREDUCE_-1714419242_95 does not have 
any open files. 
2019-11-26T11:26:55+01:00 host18 at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkLease(FSNamesystem.java:2782)
 
2019-11-26T11:26:55+01:00 host18 at 
org.apache.hadoop.hdfs.server.namenode.FSDirWriteFileOp.analyzeFileState(FSDirWriteFileOp.java:599)
 
2019-11-26T11:26:55+01:00 host18 at 
org.apache.hadoop.hdfs.server.namenode.FSDirWriteFileOp.validateAddBlock(FSDirWriteFileOp.java:171)
 
2019-11-26T11:26:55+01:00 host18 at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getAdditionalBlock(FSNamesystem.java:2661)
 
2019-11-26T11:26:55+01:00 host18 at