Re: ArrayIndexOutOfBoundException on checkpoint creation
Hi, Just for completeness sake: I followed the advice of Gyula and now everything seems to work. I changed my HashSet to managed MapState of Flink and changed my LinkedList to a custom class of mine which is recognized by Flink as POJOType and thus don't use Kryo for serialization. This new version survived a few stress tests already. Seems really to be some kryo / race condition which is at best avoided by avoiding kryo :) Best regards Theo Von: "Gyula Fóra" An: "Theo Diefenthal" CC: "user" Gesendet: Freitag, 29. November 2019 10:53:20 Betreff: Re: ArrayIndexOutOfBoundException on checkpoint creation Hi Theo! I have not seen this error before however I have encountered many strange things when using Kryo for serialization. From the stack trace it seems that this might indeed be a Kryo related issue. I am not sure what it is but what I would try is to change the state serializers to a non Kryo variants. When you create the state you can specify the TypeInformation object for the state instead of the class. You have 2 states: For LinkedList: Use the ListTypeInfo class and change your state type to List For HashSet: There is no Set Type info built into Flink but you can replace it with a Map and use the MapTypeInfo to try it out, before implementing a custom type info Also make sure that your MyPOJO type is a nice flink supported pojo. All public non-final fields + empty constructor. Let me know if this makes sense :) Cheers, Gyula On Wed, Nov 27, 2019 at 2:18 PM [ mailto:theo.diefent...@scoop-software.de | theo.diefent...@scoop-software.de ] < [ mailto:theo.diefent...@scoop-software.de | theo.diefent...@scoop-software.de ] > wrote: Sorry, I forgot to mention the environment. We use Flink 1.9.1 on a cloudera cdh6. 3.1 cluster (with Hadoop 3.0.0 but using Flink shaded 2.8.3-7. Might this be a problem? As it seems to arise from kryo, I doubt it) Our flink is configured as default. Our job uses FsStateBackend and exactly once processing with Kafka source and sink. Best regardsTheo Ursprüngliche Nachricht Betreff: ArrayIndexOutOfBoundException on checkpoint creation Von: Theo Diefenthal An: user Cc: Hi, We have a pipeline with a custom ProcessFunction and state (see [1], implemented as suggested by Fabian with a ValueState and ValueState>) The behavior of that function works fine in our unittests and with low load in our test environment (100.000 records per minute). On the production environment, we observe reproduceable crashes like the attached one. Any idea on why this out of bound could be caused? Every time we read the state and modify it, we are certain that an .update() was called: 2019-11-26T11:26:55+01:00 host19 java.lang.Exception: Could not materialize checkpoint 7 for operator our_operator) (4/8). 2019-11-26T11:26:55+01:00 host19 at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:1100) 2019-11-26T11:26:55+01:00 host19 at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1042) 2019-11-26T11:26:55+01:00 host19 at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 2019-11-26T11:26:55+01:00 host19 at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 2019-11-26T11:26:55+01:00 host19 at java.lang.Thread.run(Thread.java:745) 2019-11-26T11:26:55+01:00 host19 Caused by: java.util.concurrent.ExecutionException: java.lang.ArrayIndexOutOfBoundsException: 67108864 2019-11-26T11:26:55+01:00 host19 at java.util.concurrent.FutureTask.report(FutureTask.java:122) 2019-11-26T11:26:55+01:00 host19 at java.util.concurrent.FutureTask.get(FutureTask.java:192) 2019-11-26T11:26:55+01:00 host19 at org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:450) 2019-11-26T11:26:55+01:00 host19 at org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.(OperatorSnapshotFinalizer.java:47) 2019-11-26T11:26:55+01:00 host19 at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1011) 2019-11-26T11:26:55+01:00 host19 ... 3 more 2019-11-26T11:26:55+01:00 host19 Caused by: java.lang.ArrayIndexOutOfBoundsException: 67108864 2019-11-26T11:26:55+01:00 host19 at com.esotericsoftware.kryo.util.IdentityObjectIntMap.clear(IdentityObjectIntMap.java:364) 2019-11-26T11:26:55+01:00 host19 at com.esotericsoftware.kryo.util.MapReferenceResolver.reset(MapReferenceResolver.java:47) 2019-11-26T11:26:55+01:00 host19 at com.esotericsoftware.kryo.Kryo.reset(Kryo.java:836) 2019-11-26T11:26:55+01:00 host19 at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:601) 2019-11-26T11:26:55+01:00 host19 at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.serialize(KryoSerializer.java:
Re: ArrayIndexOutOfBoundException on checkpoint creation
Hi Theo! I have not seen this error before however I have encountered many strange things when using Kryo for serialization. From the stack trace it seems that this might indeed be a Kryo related issue. I am not sure what it is but what I would try is to change the state serializers to a non Kryo variants. When you create the state you can specify the TypeInformation object for the state instead of the class. You have 2 states: For LinkedList: Use the ListTypeInfo class and change your state type to List For HashSet: There is no Set Type info built into Flink but you can replace it with a Map and use the MapTypeInfo to try it out, before implementing a custom type info Also make sure that your MyPOJO type is a nice flink supported pojo. All public non-final fields + empty constructor. Let me know if this makes sense :) Cheers, Gyula On Wed, Nov 27, 2019 at 2:18 PM theo.diefent...@scoop-software.de < theo.diefent...@scoop-software.de> wrote: > Sorry, I forgot to mention the environment. > We use Flink 1.9.1 on a cloudera cdh6. 3.1 cluster (with Hadoop 3.0.0 but > using Flink shaded 2.8.3-7. Might this be a problem? As it seems to arise > from kryo, I doubt it) > Our flink is configured as default. Our job uses FsStateBackend and > exactly once processing with Kafka source and sink. > Best regardsTheo > Ursprüngliche Nachricht > Betreff: ArrayIndexOutOfBoundException on checkpoint creation > Von: Theo Diefenthal > An: user > Cc: > > > Hi, > > We have a pipeline with a custom ProcessFunction and state (see [1], > implemented as suggested by Fabian with a ValueState and > ValueState>) > The behavior of that function works fine in our unittests and with low > load in our test environment (100.000 records per minute). On the > production environment, we observe reproduceable crashes like the attached > one. > Any idea on why this out of bound could be caused? Every time we read the > state and modify it, we are certain that an .update() was called: > > 2019-11-26T11:26:55+01:00 host19 java.lang.Exception: Could not > materialize checkpoint 7 for operator our_operator) (4/8). > 2019-11-26T11:26:55+01:00 host19 at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:1100) > > 2019-11-26T11:26:55+01:00 host19 at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1042) > > 2019-11-26T11:26:55+01:00 host19 at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > > 2019-11-26T11:26:55+01:00 host19 at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > > 2019-11-26T11:26:55+01:00 host19 at > java.lang.Thread.run(Thread.java:745) > 2019-11-26T11:26:55+01:00 host19 Caused by: > java.util.concurrent.ExecutionException: > java.lang.ArrayIndexOutOfBoundsException: 67108864 > 2019-11-26T11:26:55+01:00 host19 at > java.util.concurrent.FutureTask.report(FutureTask.java:122) > 2019-11-26T11:26:55+01:00 host19 at > java.util.concurrent.FutureTask.get(FutureTask.java:192) > 2019-11-26T11:26:55+01:00 host19 at > org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:450) > > 2019-11-26T11:26:55+01:00 host19 at > org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.(OperatorSnapshotFinalizer.java:47) > > 2019-11-26T11:26:55+01:00 host19 at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1011) > > 2019-11-26T11:26:55+01:00 host19 ... 3 more > 2019-11-26T11:26:55+01:00 host19 Caused by: > java.lang.ArrayIndexOutOfBoundsException: 67108864 > 2019-11-26T11:26:55+01:00 host19 at > com.esotericsoftware.kryo.util.IdentityObjectIntMap.clear(IdentityObjectIntMap.java:364) > > 2019-11-26T11:26:55+01:00 host19 at > com.esotericsoftware.kryo.util.MapReferenceResolver.reset(MapReferenceResolver.java:47) > > 2019-11-26T11:26:55+01:00 host19 at > com.esotericsoftware.kryo.Kryo.reset(Kryo.java:836) > 2019-11-26T11:26:55+01:00 host19 at > com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:601) > 2019-11-26T11:26:55+01:00 host19 at > org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.serialize(KryoSerializer.java:305) > > 2019-11-26T11:26:55+01:00 host19 at > org.apache.flink.runtime.state.heap.CopyOnWriteStateMapSnapshot.writeState(CopyOnWriteStateMapSnapshot.java:116) > > 2019-11-26T11:26:55+01:00 host19 at > org.apache.flink.runtime.state.heap.AbstractStateTableSnapshot.writeStateInKeyGroup(AbstractStateTableSnapshot.java:121) > > 2019-11-26T11:26:55+01:00 host19 at > org.apache.flink.runtime.state.heap.Copy
AW: ArrayIndexOutOfBoundException on checkpoint creation
Sorry, I forgot to mention the environment. We use Flink 1.9.1 on a cloudera cdh6. 3.1 cluster (with Hadoop 3.0.0 but using Flink shaded 2.8.3-7. Might this be a problem? As it seems to arise from kryo, I doubt it) Our flink is configured as default. Our job uses FsStateBackend and exactly once processing with Kafka source and sink. Best regardsTheo Ursprüngliche Nachricht Betreff: ArrayIndexOutOfBoundException on checkpoint creation Von: Theo Diefenthal An: user Cc: Hi, We have a pipeline with a custom ProcessFunction and state (see [1], implemented as suggested by Fabian with a ValueState and ValueState>) The behavior of that function works fine in our unittests and with low load in our test environment (100.000 records per minute). On the production environment, we observe reproduceable crashes like the attached one. Any idea on why this out of bound could be caused? Every time we read the state and modify it, we are certain that an .update() was called: 2019-11-26T11:26:55+01:00 host19 java.lang.Exception: Could not materialize checkpoint 7 for operator our_operator) (4/8). 2019-11-26T11:26:55+01:00 host19 at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:1100) 2019-11-26T11:26:55+01:00 host19 at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1042) 2019-11-26T11:26:55+01:00 host19 at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 2019-11-26T11:26:55+01:00 host19 at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 2019-11-26T11:26:55+01:00 host19 at java.lang.Thread.run(Thread.java:745) 2019-11-26T11:26:55+01:00 host19 Caused by: java.util.concurrent.ExecutionException: java.lang.ArrayIndexOutOfBoundsException: 67108864 2019-11-26T11:26:55+01:00 host19 at java.util.concurrent.FutureTask.report(FutureTask.java:122) 2019-11-26T11:26:55+01:00 host19 at java.util.concurrent.FutureTask.get(FutureTask.java:192) 2019-11-26T11:26:55+01:00 host19 at org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:450) 2019-11-26T11:26:55+01:00 host19 at org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.(OperatorSnapshotFinalizer.java:47) 2019-11-26T11:26:55+01:00 host19 at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1011) 2019-11-26T11:26:55+01:00 host19 ... 3 more 2019-11-26T11:26:55+01:00 host19 Caused by: java.lang.ArrayIndexOutOfBoundsException: 67108864 2019-11-26T11:26:55+01:00 host19 at com.esotericsoftware.kryo.util.IdentityObjectIntMap.clear(IdentityObjectIntMap.java:364) 2019-11-26T11:26:55+01:00 host19 at com.esotericsoftware.kryo.util.MapReferenceResolver.reset(MapReferenceResolver.java:47) 2019-11-26T11:26:55+01:00 host19 at com.esotericsoftware.kryo.Kryo.reset(Kryo.java:836) 2019-11-26T11:26:55+01:00 host19 at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:601) 2019-11-26T11:26:55+01:00 host19 at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.serialize(KryoSerializer.java:305) 2019-11-26T11:26:55+01:00 host19 at org.apache.flink.runtime.state.heap.CopyOnWriteStateMapSnapshot.writeState(CopyOnWriteStateMapSnapshot.java:116) 2019-11-26T11:26:55+01:00 host19 at org.apache.flink.runtime.state.heap.AbstractStateTableSnapshot.writeStateInKeyGroup(AbstractStateTableSnapshot.java:121) 2019-11-26T11:26:55+01:00 host19 at org.apache.flink.runtime.state.heap.CopyOnWriteStateTableSnapshot.writeStateInKeyGroup(CopyOnWriteStateTableSnapshot.java:37) 2019-11-26T11:26:55+01:00 host19 at org.apache.flink.runtime.state.heap.HeapSnapshotStrategy$1.callInternal(HeapSnapshotStrategy.java:191) 2019-11-26T11:26:55+01:00 host19 at org.apache.flink.runtime.state.heap.HeapSnapshotStrategy$1.callInternal(HeapSnapshotStrategy.java:158) 2019-11-26T11:26:55+01:00 host19 at org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75) 2019-11-26T11:26:55+01:00 host19 at java.util.concurrent.FutureTask.run(FutureTask.java:266) 2019-11-26T11:26:55+01:00 host19 at org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:447) 2019-11-26T11:26:55+01:00 host19 ... 5 more 2019-11-26T11:26:55+01:00 host18 WARN org.apache.hadoop.hdfs.DataStreamer - DataStreamer Exception 2019-11-26T11:26:55+01:00 host18 java.io.FileNotFoundException: File does not exist: /.../STATE/CHECKPOINTS/0a2e111b3a800aae0d3b49f33e0db6f3/chk-7/3da2a0a4-f5ef-4e8c-bc1a-9fe892cb0b18 (inode 577546140) Holder DFSClient_NONMAPREDUCE_-1714419242_95 does not have any open files. 2019-11-26T11:26:55+01:00 host18 at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkLe
Re: ArrayIndexOutOfBoundException on checkpoint creation
Hi Which version are you using now(if on some old version, could you please try if this exception is till there on Flink 1.9), on the other hand, did you try RocksDBStateBackend for this? Best, Congxian Theo Diefenthal 于2019年11月26日周二 下午6:52写道: > Hi, > > We have a pipeline with a custom ProcessFunction and state (see [1], > implemented as suggested by Fabian with a ValueState and > ValueState>) > The behavior of that function works fine in our unittests and with low > load in our test environment (100.000 records per minute). On the > production environment, we observe reproduceable crashes like the attached > one. > Any idea on why this out of bound could be caused? Every time we read the > state and modify it, we are certain that an .update() was called: > > 2019-11-26T11:26:55+01:00 host19 java.lang.Exception: Could not materialize > checkpoint 7 for operator our_operator) (4/8). > 2019-11-26T11:26:55+01:00 host19 at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:1100) > 2019-11-26T11:26:55+01:00 host19 at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1042) > 2019-11-26T11:26:55+01:00 host19 at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > 2019-11-26T11:26:55+01:00 host19 at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > 2019-11-26T11:26:55+01:00 host19 at java.lang.Thread.run(Thread.java:745) > 2019-11-26T11:26:55+01:00 host19 Caused by: > java.util.concurrent.ExecutionException: > java.lang.ArrayIndexOutOfBoundsException: 67108864 > 2019-11-26T11:26:55+01:00 host19 at > java.util.concurrent.FutureTask.report(FutureTask.java:122) > 2019-11-26T11:26:55+01:00 host19 at > java.util.concurrent.FutureTask.get(FutureTask.java:192) > 2019-11-26T11:26:55+01:00 host19 at > org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:450) > 2019-11-26T11:26:55+01:00 host19 at > org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.(OperatorSnapshotFinalizer.java:47) > 2019-11-26T11:26:55+01:00 host19 at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1011) > 2019-11-26T11:26:55+01:00 host19 ... 3 more > 2019-11-26T11:26:55+01:00 host19 Caused by: > java.lang.ArrayIndexOutOfBoundsException: 67108864 > 2019-11-26T11:26:55+01:00 host19 at > com.esotericsoftware.kryo.util.IdentityObjectIntMap.clear(IdentityObjectIntMap.java:364) > 2019-11-26T11:26:55+01:00 host19 at > com.esotericsoftware.kryo.util.MapReferenceResolver.reset(MapReferenceResolver.java:47) > 2019-11-26T11:26:55+01:00 host19 at > com.esotericsoftware.kryo.Kryo.reset(Kryo.java:836) > 2019-11-26T11:26:55+01:00 host19 at > com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:601) > 2019-11-26T11:26:55+01:00 host19 at > org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.serialize(KryoSerializer.java:305) > 2019-11-26T11:26:55+01:00 host19 at > org.apache.flink.runtime.state.heap.CopyOnWriteStateMapSnapshot.writeState(CopyOnWriteStateMapSnapshot.java:116) > 2019-11-26T11:26:55+01:00 host19 at > org.apache.flink.runtime.state.heap.AbstractStateTableSnapshot.writeStateInKeyGroup(AbstractStateTableSnapshot.java:121) > 2019-11-26T11:26:55+01:00 host19 at > org.apache.flink.runtime.state.heap.CopyOnWriteStateTableSnapshot.writeStateInKeyGroup(CopyOnWriteStateTableSnapshot.java:37) > 2019-11-26T11:26:55+01:00 host19 at > org.apache.flink.runtime.state.heap.HeapSnapshotStrategy$1.callInternal(HeapSnapshotStrategy.java:191) > 2019-11-26T11:26:55+01:00 host19 at > org.apache.flink.runtime.state.heap.HeapSnapshotStrategy$1.callInternal(HeapSnapshotStrategy.java:158) > 2019-11-26T11:26:55+01:00 host19 at > org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75) > 2019-11-26T11:26:55+01:00 host19 at > java.util.concurrent.FutureTask.run(FutureTask.java:266) > 2019-11-26T11:26:55+01:00 host19 at > org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:447) > 2019-11-26T11:26:55+01:00 host19 ... 5 more > 2019-11-26T11:26:55+01:00 host18 WARN org.apache.hadoop.hdfs.DataStreamer >- DataStreamer Exception > 2019-11-26T11:26:55+01:00 host18 java.io.FileNotFoundException: File does not > exist: > /.../STATE/CHECKPOINTS/0a2e111b3a800aae0d3b49f33e0db6f3/chk-7/3da2a0a4-f5ef-4e8c-bc1a-9fe892cb0b18 > (inode 577546140) Holder DFSClient_NONMAPREDUCE_-1714419242_95 does not have > any open files. > 2019-11-26T11:26:55+01:00 host18 at > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkLease(FSNamesystem.java:2782) > 2019-11-26T11:26:55+01:00 host18 at >
ArrayIndexOutOfBoundException on checkpoint creation
Hi, We have a pipeline with a custom ProcessFunction and state (see [1], implemented as suggested by Fabian with a ValueState and ValueState>) The behavior of that function works fine in our unittests and with low load in our test environment (100.000 records per minute). On the production environment, we observe reproduceable crashes like the attached one. Any idea on why this out of bound could be caused? Every time we read the state and modify it, we are certain that an .update() was called: 2019-11-26T11:26:55+01:00 host19 java.lang.Exception: Could not materialize checkpoint 7 for operator our_operator) (4/8). 2019-11-26T11:26:55+01:00 host19 at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:1100) 2019-11-26T11:26:55+01:00 host19 at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1042) 2019-11-26T11:26:55+01:00 host19 at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 2019-11-26T11:26:55+01:00 host19 at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 2019-11-26T11:26:55+01:00 host19 at java.lang.Thread.run(Thread.java:745) 2019-11-26T11:26:55+01:00 host19 Caused by: java.util.concurrent.ExecutionException: java.lang.ArrayIndexOutOfBoundsException: 67108864 2019-11-26T11:26:55+01:00 host19 at java.util.concurrent.FutureTask.report(FutureTask.java:122) 2019-11-26T11:26:55+01:00 host19 at java.util.concurrent.FutureTask.get(FutureTask.java:192) 2019-11-26T11:26:55+01:00 host19 at org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:450) 2019-11-26T11:26:55+01:00 host19 at org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.(OperatorSnapshotFinalizer.java:47) 2019-11-26T11:26:55+01:00 host19 at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1011) 2019-11-26T11:26:55+01:00 host19 ... 3 more 2019-11-26T11:26:55+01:00 host19 Caused by: java.lang.ArrayIndexOutOfBoundsException: 67108864 2019-11-26T11:26:55+01:00 host19 at com.esotericsoftware.kryo.util.IdentityObjectIntMap.clear(IdentityObjectIntMap.java:364) 2019-11-26T11:26:55+01:00 host19 at com.esotericsoftware.kryo.util.MapReferenceResolver.reset(MapReferenceResolver.java:47) 2019-11-26T11:26:55+01:00 host19 at com.esotericsoftware.kryo.Kryo.reset(Kryo.java:836) 2019-11-26T11:26:55+01:00 host19 at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:601) 2019-11-26T11:26:55+01:00 host19 at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.serialize(KryoSerializer.java:305) 2019-11-26T11:26:55+01:00 host19 at org.apache.flink.runtime.state.heap.CopyOnWriteStateMapSnapshot.writeState(CopyOnWriteStateMapSnapshot.java:116) 2019-11-26T11:26:55+01:00 host19 at org.apache.flink.runtime.state.heap.AbstractStateTableSnapshot.writeStateInKeyGroup(AbstractStateTableSnapshot.java:121) 2019-11-26T11:26:55+01:00 host19 at org.apache.flink.runtime.state.heap.CopyOnWriteStateTableSnapshot.writeStateInKeyGroup(CopyOnWriteStateTableSnapshot.java:37) 2019-11-26T11:26:55+01:00 host19 at org.apache.flink.runtime.state.heap.HeapSnapshotStrategy$1.callInternal(HeapSnapshotStrategy.java:191) 2019-11-26T11:26:55+01:00 host19 at org.apache.flink.runtime.state.heap.HeapSnapshotStrategy$1.callInternal(HeapSnapshotStrategy.java:158) 2019-11-26T11:26:55+01:00 host19 at org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75) 2019-11-26T11:26:55+01:00 host19 at java.util.concurrent.FutureTask.run(FutureTask.java:266) 2019-11-26T11:26:55+01:00 host19 at org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:447) 2019-11-26T11:26:55+01:00 host19 ... 5 more 2019-11-26T11:26:55+01:00 host18 WARN org.apache.hadoop.hdfs.DataStreamer - DataStreamer Exception 2019-11-26T11:26:55+01:00 host18 java.io.FileNotFoundException: File does not exist: /.../STATE/CHECKPOINTS/0a2e111b3a800aae0d3b49f33e0db6f3/chk-7/3da2a0a4-f5ef-4e8c-bc1a-9fe892cb0b18 (inode 577546140) Holder DFSClient_NONMAPREDUCE_-1714419242_95 does not have any open files. 2019-11-26T11:26:55+01:00 host18 at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkLease(FSNamesystem.java:2782) 2019-11-26T11:26:55+01:00 host18 at org.apache.hadoop.hdfs.server.namenode.FSDirWriteFileOp.analyzeFileState(FSDirWriteFileOp.java:599) 2019-11-26T11:26:55+01:00 host18 at org.apache.hadoop.hdfs.server.namenode.FSDirWriteFileOp.validateAddBlock(FSDirWriteFileOp.java:171) 2019-11-26T11:26:55+01:00 host18 at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getAdditionalBlock(FSNamesystem.java:2661) 2019-11-26T11:26:55+01:00 host18 at