[ https://issues.apache.org/jira/browse/FLINK-11420?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16781413#comment-16781413 ]
Dawid Wysakowicz commented on FLINK-11420: ------------------------------------------ I found out that there is a bug in {{TraversableSerializer#duplicate}} method. > Serialization of case classes containing a Map[String, Any] sometimes throws > ArrayIndexOutOfBoundsException > ----------------------------------------------------------------------------------------------------------- > > Key: FLINK-11420 > URL: https://issues.apache.org/jira/browse/FLINK-11420 > Project: Flink > Issue Type: Bug > Components: API / Type Serialization System > Affects Versions: 1.7.1 > Reporter: Jürgen Kreileder > Priority: Blocker > Labels: pull-request-available > Fix For: 1.8.0 > > Time Spent: 20m > Remaining Estimate: 0h > > We frequently run into random ArrayIndexOutOfBounds exceptions when flink > tries to serialize Scala case classes containing a Map[String, Any] (Any > being String, Long, Int, or Boolean) with the FsStateBackend. (This probably > happens with any case class containing a type requiring Kryo, see this thread > for instance: > [http://mail-archives.apache.org/mod_mbox/flink-user/201710.mbox/%3cCANNGFpjX4gjV=df6tlfeojsb_rhwxs_ruoylqcqv2gvwqtt...@mail.gmail.com%3e]) > Disabling asynchronous snapshots seems to work around the problem, so maybe > something is not thread-safe in CaseClassSerializer. > Our objects look like this: > {code} > case class Event(timestamp: Long, [...], content: Map[String, Any] > case class EnrichedEvent(event: Event, additionalInfo: Map[String, Any]) > {code} > I've looked at a few of the exceptions in a debugger. It always happens when > serializing the right-hand side a tuple from EnrichedEvent -> Event -> > content, e.g: 13 from ("foo", 13) or false from ("bar", false). > Stacktrace: > {code:java} > java.lang.ArrayIndexOutOfBoundsException: Index -1 out of bounds for length 0 > at com.esotericsoftware.kryo.util.IntArray.pop(IntArray.java:157) > at com.esotericsoftware.kryo.Kryo.reference(Kryo.java:822) > at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:863) > at > org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:217) > at > org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:101) > at > org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32) > at > org.apache.flink.api.scala.typeutils.TraversableSerializer.$anonfun$copy$1(TraversableSerializer.scala:69) > at scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:234) > at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:465) > at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:465) > at > org.apache.flink.api.scala.typeutils.TraversableSerializer.copy(TraversableSerializer.scala:69) > at > org.apache.flink.api.scala.typeutils.TraversableSerializer.copy(TraversableSerializer.scala:33) > at > org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:101) > at > org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32) > at > org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:101) > at > org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32) > at > org.apache.flink.api.common.typeutils.base.ListSerializer.copy(ListSerializer.java:99) > at > org.apache.flink.api.common.typeutils.base.ListSerializer.copy(ListSerializer.java:42) > at > org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:287) > at > org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:311) > at > org.apache.flink.runtime.state.heap.HeapListState.add(HeapListState.java:95) > at > org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:391) > at > org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202) > at > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704) > at java.base/java.lang.Thread.run(Thread.java:834){code} > > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)