[ 
https://issues.apache.org/jira/browse/FLINK-9654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16529544#comment-16529544
 ] 

ASF GitHub Bot commented on FLINK-9654:
---------------------------------------

Github user tzulitai commented on the issue:

    https://github.com/apache/flink/pull/6206
  
    Thanks for looking into this @zsolt-donca.
    The changes looks good to me, but as @yanghua mentioned, a test to cover 
the previous bug would be nice here and allow the reviewer to understand 
directly that it is a required fix.


> Internal error while deserializing custom Scala TypeSerializer instances
> ------------------------------------------------------------------------
>
>                 Key: FLINK-9654
>                 URL: https://issues.apache.org/jira/browse/FLINK-9654
>             Project: Flink
>          Issue Type: Bug
>            Reporter: Zsolt Donca
>            Priority: Major
>              Labels: pull-request-available
>
> When you are using custom `TypeSerializer` instances implemented in Scala, 
> the Scala issue [SI-2034|https://issues.scala-lang.org/browse/SI-2034] can 
> manifest itself when a Flink job is restored from checkpoint or started with 
> a savepoint.
> The reason is that in such a restore from checkpoint or savepoint, Flink uses 
> `InstantiationUtil.FailureTolerantObjectInputStream` to deserialize the type 
> serializers and their configurations. The deserialization walks through the 
> entire object graph corresponding, and for each class it calls 
> `isAnonymousClass`, which, in turn, calls `getSimpleName` (mechanism in place 
> for FLINK-6869). If there is an internal class defined in a Scala object for 
> which `getSimpleName` fails (see the Scala issue), then a 
> `java.lang.InternalError` is thrown which causes the task manager to restart. 
> In this case, Flink tries to restart the job on another task manager, causing 
> all the task managers to restart, wreaking havoc on the entire Flink cluster.
> There are some alternative type information derivation mechanisms that rely 
> on anonymous classes and, most importantly, classes generated by macros, that 
> can easily trigger the above problem. I am personally working on 
> [https://github.com/zsolt-donca/flink-alt], and there is also 
> [https://github.com/joroKr21/flink-shapeless]
> I prepared a pull request that fixes the issue. 
>  
> Edit: added a stack trace to help demonstrate the issue.
> 2018-06-21 13:08:07.829 [today-stats (2/2)] ERROR 
> org.apache.flink.runtime.taskmanager.Task  - Encountered fatal error 
> java.lang.InternalError - terminating the JVM
>  java.lang.InternalError: Malformed class name
>          at java.lang.Class.getSimpleName(Class.java:1330) ~[na:1.8.0_171]
>          at java.lang.Class.isAnonymousClass(Class.java:1411) ~[na:1.8.0_171]
>          at 
> org.apache.flink.util.InstantiationUtil$FailureTolerantObjectInputStream.readClassDescriptor(InstantiationUtil.java:206)
>  ~[flink-dist_2.11-1.4.2.jar:1.4.2]
>          at 
> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1855) 
> ~[na:1.8.0_171]
>          at 
> java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1749) 
> ~[na:1.8.0_171]
>          at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2040) 
> ~[na:1.8.0_171]
>          at 
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571) 
> ~[na:1.8.0_171]
>          at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2285) 
> ~[na:1.8.0_171]
>          at 
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2209) 
> ~[na:1.8.0_171]
>          at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2067) 
> ~[na:1.8.0_171]
>          at 
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571) 
> ~[na:1.8.0_171]
>          at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2285) 
> ~[na:1.8.0_171]
>          at 
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2209) 
> ~[na:1.8.0_171]
>          at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2067) 
> ~[na:1.8.0_171]
>          at 
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571) 
> ~[na:1.8.0_171]
>          at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431) 
> ~[na:1.8.0_171]
>          at 
> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerSerializationProxy.read(TypeSerializerSerializationUtil.java:375)
>  ~[flink-dist_2.11-1.4.2.jar:1.4.2]
>          at 
> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.tryReadSerializer(TypeSerializerSerializationUtil.java:110)
>  ~[flink-dist_2.11-1.4.2.jar:1.4.2]
>          at 
> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.tryReadSerializer(TypeSerializerSerializationUtil.java:83)
>  ~[flink-dist_2.11-1.4.2.jar:1.4.2]
>          at 
> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.readSerializersAndConfigsWithResilience(TypeSerializerSerializationUtil.java:203)
>  ~[flink-dist_2.11-1.4.2.jar:1.4.2]
>          at 
> org.apache.flink.runtime.state.KeyedBackendSerializationProxy.read(KeyedBackendSerializationProxy.java:138)
>  ~[flink-dist_2.11-1.4.2.jar:1.4.2]
>          at 
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restorePartitionedState(HeapKeyedStateBackend.java:480)
>  ~[flink-dist_2.11-1.4.2.jar:1.4.2]
>          at 
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restore(HeapKeyedStateBackend.java:446)
>  ~[flink-dist_2.11-1.4.2.jar:1.4.2]
>          at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.createKeyedStateBackend(StreamTask.java:773)
>  ~[flink-dist_2.11-1.4.2.jar:1.4.2]
>          at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:282)
>  ~[flink-dist_2.11-1.4.2.jar:1.4.2]
>          at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:224)
>  ~[flink-dist_2.11-1.4.2.jar:1.4.2]
>          at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692)
>  ~[flink-dist_2.11-1.4.2.jar:1.4.2]
>          at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679)
>  ~[flink-dist_2.11-1.4.2.jar:1.4.2]
>          at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
>  ~[flink-dist_2.11-1.4.2.jar:1.4.2]
>          at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) 
> ~[flink-dist_2.11-1.4.2.jar:1.4.2]
>          at java.lang.Thread.run(Thread.java:748) [na:1.8.0_171]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to