Thomas Bay created FLINK-37830:
----------------------------------
Summary: Restore checkpoint state in the ES connector fails with
NPE on ArrayList when using flink-connector-elasticsearch8 ver. 3.1.0-1.20 with
flink ver. 1.20.1
Key: FLINK-37830
URL: https://issues.apache.org/jira/browse/FLINK-37830
Project: Flink
Issue Type: Bug
Components: Connectors / ElasticSearch
Environment: flink-connector-elasticsearch8 ver. 3.1.0-1.20
flink ver. 1.20.1,
Reporter: Thomas Bay
When using flink-connector-elasticsearch8 ver. 3.1.0-1.20 with flink ver.
1.20.1, restore checkpoint state in the connector fails the
NullPointerException for ArrayList:
{noformat}
2025-05-12 08:14:09 com.esotericsoftware.kryo.KryoException:
java.lang.NullPointerException Serialization trace: _children
(com.fasterxml.jackson.databind.node.ArrayNode) _children
(com.fasterxml.jackson.databind.node.ObjectNode) _children
(com.fasterxml.jackson.databind.node.ObjectNode) _children
(com.fasterxml.jackson.databind.node.ObjectNode) _children
(com.fasterxml.jackson.databind.node.ObjectNode) document
(co.elastic.clients.elasticsearch.core.bulk.IndexOperation)
bulkOperationVariant (org.apache.flink.connector.elasticsearch.sink.Operation)
at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
at
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) at
com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143)
at
com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679) at
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106) at
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) at
com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143)
at
com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679) at
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106) at
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) at
com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143)
at
com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679) at
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106) at
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) at
com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143)
at
com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679) at
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106) at
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679) at
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106) at
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679) at
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106) at
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657) at
org.apache.flink.connector.elasticsearch.sink.OperationSerializer.deserialize(OperationSerializer.java:51)
at
org.apache.flink.connector.elasticsearch.sink.Elasticsearch8AsyncSinkSerializer.deserializeRequestFromStream(Elasticsearch8AsyncSinkSerializer.java:39)
at
org.apache.flink.connector.elasticsearch.sink.Elasticsearch8AsyncSinkSerializer.deserializeRequestFromStream(Elasticsearch8AsyncSinkSerializer.java:30)
at
org.apache.flink.connector.base.sink.writer.AsyncSinkWriterStateSerializer.deserialize(AsyncSinkWriterStateSerializer.java:81)
at
org.apache.flink.connector.base.sink.writer.AsyncSinkWriterStateSerializer.deserialize(AsyncSinkWriterStateSerializer.java:39)
at
org.apache.flink.core.io.SimpleVersionedSerialization.readVersionAndDeSerialize(SimpleVersionedSerialization.java:227)
at
org.apache.flink.streaming.api.operators.util.SimpleVersionedListState$DeserializingIterator.next(SimpleVersionedListState.java:138)
at java.base/java.util.Iterator.forEachRemaining(Unknown Source) at
org.apache.flink.util.CollectionUtil.iterableToList(CollectionUtil.java:119) at
org.apache.flink.streaming.runtime.operators.sink.StatefulSinkWriterStateHandler.createWriter(StatefulSinkWriterStateHandler.java:103)
at
org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.initializeState(SinkWriterOperator.java:163)
at
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:147)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:294)
at
org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreStateAndGates(StreamTask.java:858)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$restoreInternal$5(StreamTask.java:812)
at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:812)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:771)
at
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:970)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:939)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:763) at
org.apache.flink.runtime.taskmanager.Task.run(Task.java:575) at
java.base/java.lang.Thread.run(Unknown Source) Caused by:
java.lang.NullPointerException at
java.base/java.util.ArrayList.ensureCapacity(Unknown Source) at
com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:96)
at
com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679) at
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
... 56 more {noformat}
{{}}
The fix is to use kryo StdInstantiatorStrategy() as fallback instantiator
strategy instead of main - like in other places in flink code.
The fix is included in [Pull Request
126|https://github.com/apache/flink-connector-elasticsearch/pull/126]
The committed fixed the problem for me.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)