Hi all, We found the solution: the problem is Comparator in TreeSet we used as the value of broadcast state. Kryo is unable to serialize lambda in Comparator, so we changed to regular class - and everything is fine now.
С уважением, Василий Мельник *Glow**Byte Consulting* <http://www.gbconsulting.ru/> =================== Моб. тел.: +7 (903) 101-43-71 vasily.mel...@glowbyteconsulting.com On Fri, 15 Nov 2019 at 14:29, Vasily Melnik < vasily.mel...@glowbyteconsulting.com> wrote: > Hi all. > In Flink 1.8 we have strange exception that causes job failing: > > 2019-11-14 15:52:52,071 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph - op4 (1/1) > (797d4c2b85010dab6be5e1d06ff6493a) switched from RUNNING to FAILED. > AsynchronousException{java.lang.Exception: Could not materialize > checkpoint 2 for operator op4 (1/1).} > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointExceptionHandler.tryHandleCheckpointException(StreamTask.java:1153) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:947) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:884) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.lang.Exception: Could not materialize checkpoint 2 for > operator op4 (1/1). > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:942) > ... 6 more > Caused by: java.util.concurrent.ExecutionException: > java.lang.NullPointerException > at java.util.concurrent.FutureTask.report(FutureTask.java:122) > at java.util.concurrent.FutureTask.get(FutureTask.java:192) > at > org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:394) > at > org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:53) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:853) > ... 5 more > Caused by: java.lang.NullPointerException > at > com.esotericsoftware.kryo.util.DefaultClassResolver.writeClass(DefaultClassResolver.java:80) > at com.esotericsoftware.kryo.Kryo.writeClass(Kryo.java:488) > at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:593) > at > com.esotericsoftware.kryo.serializers.DefaultSerializers$TreeSetSerializer.write(DefaultSerializers.java:608) > at > com.esotericsoftware.kryo.serializers.DefaultSerializers$TreeSetSerializer.write(DefaultSerializers.java:605) > at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:599) > at > org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.serialize(KryoSerializer.java:305) > at > org.apache.flink.runtime.state.HeapBroadcastState.write(HeapBroadcastState.java:109) > at > org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackendSnapshotStrategy.java:167) > at > org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackendSnapshotStrategy.java:108) > at > org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:391) > ... 7 more > > As we see, exception occurs in > *org.apache.flink.runtime.state.HeapBroadcastState.write(HeapBroadcastState.java:109) > * > but what exactly is the reason? > > We configured RocksDB state backend for job with local filesystem storage. > > > С уважением, > Василий Мельник >