Hi all,
We found the solution:
the problem is Comparator in TreeSet we used as the value of broadcast
state. Kryo is unable to serialize lambda in Comparator, so we changed to
regular class - and everything is fine now.


С уважением,
Василий Мельник

*Glow**Byte Consulting* <http://www.gbconsulting.ru/>

===================

Моб. тел.: +7 (903) 101-43-71
vasily.mel...@glowbyteconsulting.com


On Fri, 15 Nov 2019 at 14:29, Vasily Melnik <
vasily.mel...@glowbyteconsulting.com> wrote:

> Hi all.
> In Flink 1.8 we have strange exception that causes job failing:
>
> 2019-11-14 15:52:52,071 INFO
>  org.apache.flink.runtime.executiongraph.ExecutionGraph        - op4 (1/1)
> (797d4c2b85010dab6be5e1d06ff6493a) switched from RUNNING to FAILED.
> AsynchronousException{java.lang.Exception: Could not materialize
> checkpoint 2 for operator op4 (1/1).}
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointExceptionHandler.tryHandleCheckpointException(StreamTask.java:1153)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:947)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:884)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.Exception: Could not materialize checkpoint 2 for
> operator op4 (1/1).
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:942)
> ... 6 more
> Caused by: java.util.concurrent.ExecutionException:
> java.lang.NullPointerException
> at java.util.concurrent.FutureTask.report(FutureTask.java:122)
> at java.util.concurrent.FutureTask.get(FutureTask.java:192)
> at
> org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:394)
> at
> org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:53)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:853)
> ... 5 more
> Caused by: java.lang.NullPointerException
> at
> com.esotericsoftware.kryo.util.DefaultClassResolver.writeClass(DefaultClassResolver.java:80)
> at com.esotericsoftware.kryo.Kryo.writeClass(Kryo.java:488)
> at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:593)
> at
> com.esotericsoftware.kryo.serializers.DefaultSerializers$TreeSetSerializer.write(DefaultSerializers.java:608)
> at
> com.esotericsoftware.kryo.serializers.DefaultSerializers$TreeSetSerializer.write(DefaultSerializers.java:605)
> at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:599)
> at
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.serialize(KryoSerializer.java:305)
> at
> org.apache.flink.runtime.state.HeapBroadcastState.write(HeapBroadcastState.java:109)
> at
> org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackendSnapshotStrategy.java:167)
> at
> org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackendSnapshotStrategy.java:108)
> at
> org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at
> org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:391)
> ... 7 more
>
> As we see, exception occurs in  
> *org.apache.flink.runtime.state.HeapBroadcastState.write(HeapBroadcastState.java:109)
> *
> but what exactly is the reason?
>
> We configured RocksDB state backend for job with local filesystem storage.
>
>
> С уважением,
> Василий Мельник
>

Reply via email to