Hi Prasanna,

it could be a bug where the ExecutionConfig is not forwarded properly to all locations where the KryoSerializer is used.

As a first step for debugging, I would recommend to create a custom TypeInformation (most methods are not relevant except for createTypeSerializer and getTypeClass) and instantiate the KryoSerializer manually. You can then pass this type information to all operators (using dataStream.returns() or for state). This ensures that all locations use the same KryoSerializer configuration and registered ID get lost in the stack.

You can also check locally (either in your IDE or with a remote debugger) that all KryoSerializer instances have the same registry entries for class ID: 97.

Where is UnmodifiableCollectionsSerializer coming from?

Regards,
Timo


On 01.11.21 11:34, Prasanna kumar wrote:
Any thoughts on these ?

Thanks,
Prasanna.

On Sat, Oct 30, 2021 at 7:25 PM Prasanna kumar <prasannakumarram...@gmail.com <mailto:prasannakumarram...@gmail.com>> wrote:

    Hi ,

    We have the following Flink Job that processes records from kafka
    based on the rules we get from S3 files into broadcasted state.
    Earlier we were able to spin a job with any number of task
    parallelism without any issues.
    Recently we made changes to the Broadcast state Structure and it is
    working well upto parallelism of 3.
    If we give parallelism of 4 or more , we end up getting
    serialization exceptions which result in job failure. ( Block 4 as
    in the Image)
    Also If the leader job manager dies and a new one comes up , the
    other jobs are restarted automatically but this job dies of
    serialization issues.
    But when we start manually with a parallelism <= 3, it is working.

    Programmatically this code is working when we tested with all
    possible test cases.
    How do we debug serialization issues that we face.
    I have attached the exception of logs and the code related to it.

    Let me know if any more details are required.





    *KRYO SERIALIZAER INITIALISATON*

    Class<?> unmodifiableCollectionsSerializer =
         Class.forName("java.util.Collections$UnmodifiableCollection");
    env.getConfig().addDefaultKryoSerializer(
         unmodifiableCollectionsSerializer,
    UnmodifiableCollectionsSerializer.class
    );


    *CONFIGSTATE INTERFACE(USED IN BROADCAST STATE)*

    public interface EventConfigState {


         void createOrUpdateState(String key, DataPair dataPair);

    List<OutputMessage>executeRule(InputMessage inputMessage);

    Map<String, Set<DataPair>>getCurrentState();
    }


    *DERIVED EVENT CONFIG STATE IMPLEMENTATION*

    public class DerivedEventConfigStateimplements EventConfigState {

         Loggerlogger = LoggerFactory.getLogger(DerivedEventConfigState.class);
    private Map<String, Set<DataPair>>derivedConfigMap;

    public DerivedEventConfigState() {
             derivedConfigMap =new HashMap<>();
    }

         public void createOrUpdateState(String key, DataPair dataPair) {

             derivedConfigMap.putIfAbsent(key, new HashSet<>());
    if (derivedConfigMap.get(key).contains(dataPair)) {
                 derivedConfigMap.get(key).remove(dataPair);
    }
             derivedConfigMap.get(key).add(dataPair);
    }

         @Override
    public List<OutputMessage>executeRule(InputMessage inputMessage) {

             String key = inputMessage.getKey();
    List<OutputMessage> outputMessageList =new ArrayList<>();

    if (derivedConfigMap.size() ==0) {
                 logger.error("DerivedEventConfigMap is empty");
    return outputMessageList;
    }

             if (derivedConfigMap.get(key) ==null) {
                 return outputMessageList;
    }

             for (DataPair dataPair :derivedConfigMap.get(key)) {
                 IRule rule = dataPair.getRule();
    if (rule.isSatisfied(inputMessage)) {

                     IMessageBuilder messageBuilder =
                         
MessageBuilderFactory.getMessageBuilder("OutputMessage");
    OutputMessage outputMessage = messageBuilder.build(
                         inputMessage,
    dataPair.getEventMessageDefinition()
                     );
    outputMessageList.add(outputMessage);
    }
             }
             return outputMessageList;
    }

         @Override
    public Map<String, Set<DataPair>>getCurrentState() {
             return Collections.unmodifiableMap(derivedConfigMap);
    }

         @Override
    public StringtoString() {
             return "DerivedEventConfigState{"
    +"derivedConfigMap=" +derivedConfigMap
    +'}';
    }
    }


    Attached are three Exceptions thrown rando

    *Caused by: org.apache.flink.util.SerializedThrowable: TABLE-OP
    at java.net.URLClassLoader.findClass(Unknown Source) ~[?:?]
    at java.lang.ClassLoader.loadClass(Unknown Source) ~[?:?]
    at
    
org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:64)
    ~[flink-dist_2.12-1.12.2.jar:1.12.2]
    at
    
org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74)
    ~[flink-dist_2.12-1.12.2.jar:1.12.2]
    at
    
org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)
    ~[flink-dist_2.12-1.12.2.jar:1.12.2]
    at java.lang.ClassLoader.loadClass(Unknown Source) ~[?:?]
    at
    
org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.loadClass(FlinkUserCodeClassLoaders.java:172)
    ~[flink-dist_2.12-1.12.2.jar:1.12.2]
    at java.lang.Class.forName0(Native Method) ~[?:?]
    at java.lang.Class.forName(Unknown Source) ~[?:?]
    at
    
com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:136)
    ~[flink-dist_2.12-1.12.2.jar:1.12.2]
    at
    
com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115)
    ~[flink-dist_2.12-1.12.2.jar:1.12.2]
    at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
    ~[flink-dist_2.12-1.12.2.jar:1.12.2]
    at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
    ~[flink-dist_2.12-1.12.2.jar:1.12.2]
    at
    
com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:116)
    ~[flink-dist_2.12-1.12.2.jar:1.12.2]
    at
    
com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22)
    ~[flink-dist_2.12-1.12.2.jar:1.12.2]
    at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
    ~[flink-dist_2.12-1.12.2.jar:1.12.2]
    at
    
com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143)
    ~[flink-dist_2.12-1.12.2.jar:1.12.2]
    at
    
com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21)
    ~[flink-dist_2.12-1.12.2.jar:1.12.2]
    at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
    ~[flink-dist_2.12-1.12.2.jar:1.12.2]
    at
    com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
    ~[flink-dist_2.12-1.12.2.jar:1.12.2]
    at
    
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
    ~[flink-dist_2.12-1.12.2.jar:1.12.2]
    at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657)
    ~[flink-dist_2.12-1.12.2.jar:1.12.2]
    at
    
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:273)
    ~[flink-dist_2.12-1.12.2.jar:1.12.2]
    at
    
org.apache.flink.api.common.typeutils.base.MapSerializer.copy(MapSerializer.java:111)
    ~[flink-dist_2.12-1.12.2.jar:1.12.2]
    at
    
org.apache.flink.runtime.state.HeapBroadcastState.<init>(HeapBroadcastState.java:69)
    ~[flink-dist_2.12-1.12.2.jar:1.12.2]
    at
    
org.apache.flink.runtime.state.HeapBroadcastState.deepCopy(HeapBroadcastState.java:84)
    ~[flink-dist_2.12-1.12.2.jar:1.12.2]
    at
    
org.apache.flink.runtime.state.HeapBroadcastState.deepCopy(HeapBroadcastState.java:40)
    ~[flink-dist_2.12-1.12.2.jar:1.12.2]
    at
    
org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy.snapshot(DefaultOperatorStateBackendSnapshotStrategy.java:100)
    ~[flink-dist_2.12-1.12.2.jar:1.12.2]
    at
    
org.apache.flink.runtime.state.DefaultOperatorStateBackend.snapshot(DefaultOperatorStateBackend.java:234)
    ~[flink-dist_2.12-1.12.2.jar:1.12.2]
    at
    
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:213)
    ~[flink-dist_2.12-1.12.2.jar:1.12.2]
    at
    
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:162)
    ~[flink-dist_2.12-1.12.2.jar:1.12.2]
    at
    
org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:371)
    ~[flink-dist_2.12-1.12.2.jar:1.12.2]
    at
    
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointStreamOperator(SubtaskCheckpointCoordinatorImpl.java:686)
    ~[flink-dist_2.12-1.12.2.jar:1.12.2]
    at
    
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.buildOperatorSnapshotFutures(SubtaskCheckpointCoordinatorImpl.java:607)
    ~[flink-dist_2.12-1.12.2.jar:1.12.2]
    at
    
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.takeSnapshotSync(SubtaskCheckpointCoordinatorImpl.java:572)
    ~[flink-dist_2.12-1.12.2.jar:1.12.2]
    at
    
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:298)
    ~[flink-dist_2.12-1.12.2.jar:1.12.2]
    at
    
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$9(StreamTask.java:1004)
    ~[flink-dist_2.12-1.12.2.jar:1.12.2]
    at
    
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
    ~[flink-dist_2.12-1.12.2.jar:1.12.2]
    at
    
org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:988)
    ~[flink-dist_2.12-1.12.2.jar:1.12.2]
    at
    
org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:947)
    ~[flink-dist_2.12-1.12.2.jar:1.12.2]
    ... 14 more*


    *Caused by: org.apache.flink.util.SerializedThrowable: Encountered
    unregistered class ID: 97 Serialization trace: derivedConfigMap
    (com.org.app.model.producer.DerivedEventConfigState) at
    
com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:119)
    ~[flink-dist_2.12-1.12.2.jar:1.12.2] at
    com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
    ~[flink-dist_2.12-1.12.2.jar:1.12.2] at
    com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
    ~[flink-dist_2.12-1.12.2.jar:1.12.2] at
    
com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:135)
    ~[flink-dist_2.12-1.12.2.jar:1.12.2] at
    
com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21)
    ~[flink-dist_2.12-1.12.2.jar:1.12.2] at
    com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
    ~[flink-dist_2.12-1.12.2.jar:1.12.2] at
    
com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143)
    ~[flink-dist_2.12-1.12.2.jar:1.12.2] at
    
com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21)
    ~[flink-dist_2.12-1.12.2.jar:1.12.2] at
    com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
    ~[flink-dist_2.12-1.12.2.jar:1.12.2] at
    com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
    ~[flink-dist_2.12-1.12.2.jar:1.12.2] at
    
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
    ~[flink-dist_2.12-1.12.2.jar:1.12.2] at
    com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657)
    ~[flink-dist_2.12-1.12.2.jar:1.12.2] at
    
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:273)
    ~[flink-dist_2.12-1.12.2.jar:1.12.2] at
    
org.apache.flink.api.common.typeutils.base.MapSerializer.copy(MapSerializer.java:111)
    ~[flink-dist_2.12-1.12.2.jar:1.12.2] at
    
org.apache.flink.runtime.state.HeapBroadcastState.<init>(HeapBroadcastState.java:69)
    ~[flink-dist_2.12-1.12.2.jar:1.12.2] at
    
org.apache.flink.runtime.state.HeapBroadcastState.deepCopy(HeapBroadcastState.java:84)
    ~[flink-dist_2.12-1.12.2.jar:1.12.2] at
    
org.apache.flink.runtime.state.HeapBroadcastState.deepCopy(HeapBroadcastState.java:40)
    ~[flink-dist_2.12-1.12.2.jar:1.12.2] at
    
org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy.snapshot(DefaultOperatorStateBackendSnapshotStrategy.java:100)
    ~[flink-dist_2.12-1.12.2.jar:1.12.2] at
    
org.apache.flink.runtime.state.DefaultOperatorStateBackend.snapshot(DefaultOperatorStateBackend.java:234)
    ~[flink-dist_2.12-1.12.2.jar:1.12.2] at
    
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:213)
    ~[flink-dist_2.12-1.12.2.jar:1.12.2] at
    
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:162)
    ~[flink-dist_2.12-1.12.2.jar:1.12.2] at
    
org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:371)
    ~[flink-dist_2.12-1.12.2.jar:1.12.2] at
    
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointStreamOperator(SubtaskCheckpointCoordinatorImpl.java:686)
    ~[flink-dist_2.12-1.12.2.jar:1.12.2] at
    
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.buildOperatorSnapshotFutures(SubtaskCheckpointCoordinatorImpl.java:607)
    ~[flink-dist_2.12-1.12.2.jar:1.12.2] at
    
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.takeSnapshotSync(SubtaskCheckpointCoordinatorImpl.java:572)
    ~[flink-dist_2.12-1.12.2.jar:1.12.2] at
    
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:298)
    ~[flink-dist_2.12-1.12.2.jar:1.12.2] at
    
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$9(StreamTask.java:1004)
    ~[flink-dist_2.12-1.12.2.jar:1.12.2] at
    
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
    ~[flink-dist_2.12-1.12.2.jar:1.12.2] at
    
org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:988)
    ~[flink-dist_2.12-1.12.2.jar:1.12.2] at
    
org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:947)
    ~[flink-dist_2.12-1.12.2.jar:1.12.2] ... 14 more*


    *Caused by: org.apache.flink.util.SerializedThrowable: Buffer
    underflow. Serialization trace: logger
    (com.org.app.model.producer.AppEventConfigState) at
    com.esotericsoftware.kryo.io.Input.require(Input.java:181)
    ~[flink-dist_2.12-1.12.2.jar:1.12.2] at
    com.esotericsoftware.kryo.io.Input.readVarInt(Input.java:355)
    ~[flink-dist_2.12-1.12.2.jar:1.12.2] at
    
com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:109)
    ~[flink-dist_2.12-1.12.2.jar:1.12.2] at
    com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
    ~[flink-dist_2.12-1.12.2.jar:1.12.2] at
    com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:99)
    ~[flink-dist_2.12-1.12.2.jar:1.12.2] at
    
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
    ~[flink-dist_2.12-1.12.2.jar:1.12.2] at
    com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657)
    ~[flink-dist_2.12-1.12.2.jar:1.12.2] at
    
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:273)
    ~[flink-dist_2.12-1.12.2.jar:1.12.2] at
    
org.apache.flink.api.common.typeutils.base.MapSerializer.copy(MapSerializer.java:111)
    ~[flink-dist_2.12-1.12.2.jar:1.12.2] at
    
org.apache.flink.runtime.state.HeapBroadcastState.<init>(HeapBroadcastState.java:69)
    ~[flink-dist_2.12-1.12.2.jar:1.12.2] at
    
org.apache.flink.runtime.state.HeapBroadcastState.deepCopy(HeapBroadcastState.java:84)
    ~[flink-dist_2.12-1.12.2.jar:1.12.2] at
    
org.apache.flink.runtime.state.HeapBroadcastState.deepCopy(HeapBroadcastState.java:40)
    ~[flink-dist_2.12-1.12.2.jar:1.12.2] at
    
org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy.snapshot(DefaultOperatorStateBackendSnapshotStrategy.java:100)
    ~[flink-dist_2.12-1.12.2.jar:1.12.2] at
    
org.apache.flink.runtime.state.DefaultOperatorStateBackend.snapshot(DefaultOperatorStateBackend.java:234)
    ~[flink-dist_2.12-1.12.2.jar:1.12.2] at
    
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:213)
    ~[flink-dist_2.12-1.12.2.jar:1.12.2] ... 24 more*

    Thanks
    Prasanna,


Reply via email to