Any thoughts on these ?

Thanks,
Prasanna.

On Sat, Oct 30, 2021 at 7:25 PM Prasanna kumar <
prasannakumarram...@gmail.com> wrote:

> Hi ,
>
> We have the following Flink Job that processes records from kafka based on
> the rules we get from S3 files into broadcasted state.
> Earlier we were able to spin a job with any number of task parallelism
> without any issues.
> Recently we made changes to the Broadcast state Structure and it is
> working well upto parallelism of 3.
> If we give parallelism of 4 or more , we end up getting
> serialization exceptions which result in job failure. ( Block 4 as in the
> Image)
> Also If the leader job manager dies and a new one comes up , the other
> jobs are restarted automatically but this job dies of serialization issues.
> But when we start manually with a parallelism <= 3, it is working.
>
> Programmatically this code is working when we tested with all possible
> test cases.
> How do we debug serialization issues that we face.
> I have attached the exception of logs and the code related to it.
>
> Let me know if any more details are required.
>
>
>
>
>
> *KRYO SERIALIZAER INITIALISATON*
>
> Class<?> unmodifiableCollectionsSerializer =
>     Class.forName("java.util.Collections$UnmodifiableCollection");
> env.getConfig().addDefaultKryoSerializer(
>     unmodifiableCollectionsSerializer,
>     UnmodifiableCollectionsSerializer.class
> );
>
>
> *CONFIGSTATE INTERFACE(USED IN BROADCAST STATE)*
>
> public interface EventConfigState {
>
>
>     void createOrUpdateState(String key, DataPair dataPair);
>
>     List<OutputMessage> executeRule(InputMessage inputMessage);
>
>     Map<String, Set<DataPair>> getCurrentState();
> }
>
>
> *DERIVED EVENT CONFIG STATE IMPLEMENTATION*
>
> public class DerivedEventConfigState  implements EventConfigState {
>
>     Logger logger = LoggerFactory.getLogger(DerivedEventConfigState.class);
>     private Map<String, Set<DataPair>> derivedConfigMap;
>
>     public DerivedEventConfigState() {
>         derivedConfigMap = new HashMap<>();
>     }
>
>     public void createOrUpdateState(String key, DataPair dataPair) {
>
>         derivedConfigMap.putIfAbsent(key, new HashSet<>());
>         if (derivedConfigMap.get(key).contains(dataPair)) {
>             derivedConfigMap.get(key).remove(dataPair);
>         }
>         derivedConfigMap.get(key).add(dataPair);
>     }
>
>     @Override
>     public List<OutputMessage> executeRule(InputMessage inputMessage) {
>
>         String key = inputMessage.getKey();
>         List<OutputMessage> outputMessageList = new ArrayList<>();
>
>         if (derivedConfigMap.size() == 0) {
>             logger.error("DerivedEventConfigMap is empty");
>             return outputMessageList;
>         }
>
>         if ( derivedConfigMap.get(key) == null) {
>             return outputMessageList;
>         }
>
>         for (DataPair dataPair : derivedConfigMap.get(key)) {
>             IRule rule = dataPair.getRule();
>             if (rule.isSatisfied(inputMessage)) {
>
>                 IMessageBuilder messageBuilder =
>                     MessageBuilderFactory.getMessageBuilder("OutputMessage");
>                 OutputMessage outputMessage = messageBuilder.build(
>                     inputMessage,
>                     dataPair.getEventMessageDefinition()
>                 );
>                 outputMessageList.add(outputMessage);
>             }
>         }
>         return outputMessageList;
>     }
>
>     @Override
>     public Map<String, Set<DataPair>> getCurrentState() {
>         return Collections.unmodifiableMap(derivedConfigMap);
>     }
>
>     @Override
>     public String toString() {
>         return "DerivedEventConfigState{"
>             + "derivedConfigMap=" + derivedConfigMap
>             + '}';
>     }
> }
>
>
> Attached are three Exceptions thrown rando
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> *Caused by: org.apache.flink.util.SerializedThrowable: TABLE-OP at
> java.net.URLClassLoader.findClass(Unknown Source) ~[?:?] at
> java.lang.ClassLoader.loadClass(Unknown Source) ~[?:?] at
> org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:64)
> ~[flink-dist_2.12-1.12.2.jar:1.12.2] at
> org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74)
> ~[flink-dist_2.12-1.12.2.jar:1.12.2] at
> org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)
> ~[flink-dist_2.12-1.12.2.jar:1.12.2] at
> java.lang.ClassLoader.loadClass(Unknown Source) ~[?:?] at
> org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.loadClass(FlinkUserCodeClassLoaders.java:172)
> ~[flink-dist_2.12-1.12.2.jar:1.12.2] at java.lang.Class.forName0(Native
> Method) ~[?:?] at java.lang.Class.forName(Unknown Source) ~[?:?] at
> com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:136)
> ~[flink-dist_2.12-1.12.2.jar:1.12.2] at
> com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115)
> ~[flink-dist_2.12-1.12.2.jar:1.12.2] at
> com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
> ~[flink-dist_2.12-1.12.2.jar:1.12.2] at
> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
> ~[flink-dist_2.12-1.12.2.jar:1.12.2] at
> com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:116)
> ~[flink-dist_2.12-1.12.2.jar:1.12.2] at
> com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22)
> ~[flink-dist_2.12-1.12.2.jar:1.12.2] at
> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
> ~[flink-dist_2.12-1.12.2.jar:1.12.2] at
> com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143)
> ~[flink-dist_2.12-1.12.2.jar:1.12.2] at
> com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21)
> ~[flink-dist_2.12-1.12.2.jar:1.12.2] at
> com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
> ~[flink-dist_2.12-1.12.2.jar:1.12.2] at
> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
> ~[flink-dist_2.12-1.12.2.jar:1.12.2] at
> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
> ~[flink-dist_2.12-1.12.2.jar:1.12.2] at
> com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657)
> ~[flink-dist_2.12-1.12.2.jar:1.12.2] at
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:273)
> ~[flink-dist_2.12-1.12.2.jar:1.12.2] at
> org.apache.flink.api.common.typeutils.base.MapSerializer.copy(MapSerializer.java:111)
> ~[flink-dist_2.12-1.12.2.jar:1.12.2] at
> org.apache.flink.runtime.state.HeapBroadcastState.<init>(HeapBroadcastState.java:69)
> ~[flink-dist_2.12-1.12.2.jar:1.12.2] at
> org.apache.flink.runtime.state.HeapBroadcastState.deepCopy(HeapBroadcastState.java:84)
> ~[flink-dist_2.12-1.12.2.jar:1.12.2] at
> org.apache.flink.runtime.state.HeapBroadcastState.deepCopy(HeapBroadcastState.java:40)
> ~[flink-dist_2.12-1.12.2.jar:1.12.2] at
> org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy.snapshot(DefaultOperatorStateBackendSnapshotStrategy.java:100)
> ~[flink-dist_2.12-1.12.2.jar:1.12.2] at
> org.apache.flink.runtime.state.DefaultOperatorStateBackend.snapshot(DefaultOperatorStateBackend.java:234)
> ~[flink-dist_2.12-1.12.2.jar:1.12.2] at
> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:213)
> ~[flink-dist_2.12-1.12.2.jar:1.12.2] at
> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:162)
> ~[flink-dist_2.12-1.12.2.jar:1.12.2] at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:371)
> ~[flink-dist_2.12-1.12.2.jar:1.12.2] at
> org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointStreamOperator(SubtaskCheckpointCoordinatorImpl.java:686)
> ~[flink-dist_2.12-1.12.2.jar:1.12.2] at
> org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.buildOperatorSnapshotFutures(SubtaskCheckpointCoordinatorImpl.java:607)
> ~[flink-dist_2.12-1.12.2.jar:1.12.2] at
> org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.takeSnapshotSync(SubtaskCheckpointCoordinatorImpl.java:572)
> ~[flink-dist_2.12-1.12.2.jar:1.12.2] at
> org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:298)
> ~[flink-dist_2.12-1.12.2.jar:1.12.2] at
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$9(StreamTask.java:1004)
> ~[flink-dist_2.12-1.12.2.jar:1.12.2] at
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
> ~[flink-dist_2.12-1.12.2.jar:1.12.2] at
> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:988)
> ~[flink-dist_2.12-1.12.2.jar:1.12.2] at
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:947)
> ~[flink-dist_2.12-1.12.2.jar:1.12.2] ... 14 more*
>
>
> *Caused by: org.apache.flink.util.SerializedThrowable: Encountered
> unregistered class ID: 97 Serialization trace: derivedConfigMap
> (com.org.app.model.producer.DerivedEventConfigState) at
> com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:119)
> ~[flink-dist_2.12-1.12.2.jar:1.12.2] at
> com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
> ~[flink-dist_2.12-1.12.2.jar:1.12.2] at
> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
> ~[flink-dist_2.12-1.12.2.jar:1.12.2] at
> com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:135)
> ~[flink-dist_2.12-1.12.2.jar:1.12.2] at
> com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21)
> ~[flink-dist_2.12-1.12.2.jar:1.12.2] at
> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
> ~[flink-dist_2.12-1.12.2.jar:1.12.2] at
> com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143)
> ~[flink-dist_2.12-1.12.2.jar:1.12.2] at
> com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21)
> ~[flink-dist_2.12-1.12.2.jar:1.12.2] at
> com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
> ~[flink-dist_2.12-1.12.2.jar:1.12.2] at
> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
> ~[flink-dist_2.12-1.12.2.jar:1.12.2] at
> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
> ~[flink-dist_2.12-1.12.2.jar:1.12.2] at
> com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657)
> ~[flink-dist_2.12-1.12.2.jar:1.12.2] at
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:273)
> ~[flink-dist_2.12-1.12.2.jar:1.12.2] at
> org.apache.flink.api.common.typeutils.base.MapSerializer.copy(MapSerializer.java:111)
> ~[flink-dist_2.12-1.12.2.jar:1.12.2] at
> org.apache.flink.runtime.state.HeapBroadcastState.<init>(HeapBroadcastState.java:69)
> ~[flink-dist_2.12-1.12.2.jar:1.12.2] at
> org.apache.flink.runtime.state.HeapBroadcastState.deepCopy(HeapBroadcastState.java:84)
> ~[flink-dist_2.12-1.12.2.jar:1.12.2] at
> org.apache.flink.runtime.state.HeapBroadcastState.deepCopy(HeapBroadcastState.java:40)
> ~[flink-dist_2.12-1.12.2.jar:1.12.2] at
> org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy.snapshot(DefaultOperatorStateBackendSnapshotStrategy.java:100)
> ~[flink-dist_2.12-1.12.2.jar:1.12.2] at
> org.apache.flink.runtime.state.DefaultOperatorStateBackend.snapshot(DefaultOperatorStateBackend.java:234)
> ~[flink-dist_2.12-1.12.2.jar:1.12.2] at
> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:213)
> ~[flink-dist_2.12-1.12.2.jar:1.12.2] at
> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:162)
> ~[flink-dist_2.12-1.12.2.jar:1.12.2] at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:371)
> ~[flink-dist_2.12-1.12.2.jar:1.12.2] at
> org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointStreamOperator(SubtaskCheckpointCoordinatorImpl.java:686)
> ~[flink-dist_2.12-1.12.2.jar:1.12.2] at
> org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.buildOperatorSnapshotFutures(SubtaskCheckpointCoordinatorImpl.java:607)
> ~[flink-dist_2.12-1.12.2.jar:1.12.2] at
> org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.takeSnapshotSync(SubtaskCheckpointCoordinatorImpl.java:572)
> ~[flink-dist_2.12-1.12.2.jar:1.12.2] at
> org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:298)
> ~[flink-dist_2.12-1.12.2.jar:1.12.2] at
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$9(StreamTask.java:1004)
> ~[flink-dist_2.12-1.12.2.jar:1.12.2] at
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
> ~[flink-dist_2.12-1.12.2.jar:1.12.2] at
> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:988)
> ~[flink-dist_2.12-1.12.2.jar:1.12.2] at
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:947)
> ~[flink-dist_2.12-1.12.2.jar:1.12.2] ... 14 more*
>
> *Caused by: org.apache.flink.util.SerializedThrowable: Buffer underflow.
> Serialization trace: logger
> (com.org.app.model.producer.AppEventConfigState) at
> com.esotericsoftware.kryo.io.Input.require(Input.java:181)
> ~[flink-dist_2.12-1.12.2.jar:1.12.2] at
> com.esotericsoftware.kryo.io.Input.readVarInt(Input.java:355)
> ~[flink-dist_2.12-1.12.2.jar:1.12.2] at
> com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:109)
> ~[flink-dist_2.12-1.12.2.jar:1.12.2] at
> com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
> ~[flink-dist_2.12-1.12.2.jar:1.12.2] at
> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:99)
> ~[flink-dist_2.12-1.12.2.jar:1.12.2] at
> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
> ~[flink-dist_2.12-1.12.2.jar:1.12.2] at
> com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657)
> ~[flink-dist_2.12-1.12.2.jar:1.12.2] at
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:273)
> ~[flink-dist_2.12-1.12.2.jar:1.12.2] at
> org.apache.flink.api.common.typeutils.base.MapSerializer.copy(MapSerializer.java:111)
> ~[flink-dist_2.12-1.12.2.jar:1.12.2] at
> org.apache.flink.runtime.state.HeapBroadcastState.<init>(HeapBroadcastState.java:69)
> ~[flink-dist_2.12-1.12.2.jar:1.12.2] at
> org.apache.flink.runtime.state.HeapBroadcastState.deepCopy(HeapBroadcastState.java:84)
> ~[flink-dist_2.12-1.12.2.jar:1.12.2] at
> org.apache.flink.runtime.state.HeapBroadcastState.deepCopy(HeapBroadcastState.java:40)
> ~[flink-dist_2.12-1.12.2.jar:1.12.2] at
> org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy.snapshot(DefaultOperatorStateBackendSnapshotStrategy.java:100)
> ~[flink-dist_2.12-1.12.2.jar:1.12.2] at
> org.apache.flink.runtime.state.DefaultOperatorStateBackend.snapshot(DefaultOperatorStateBackend.java:234)
> ~[flink-dist_2.12-1.12.2.jar:1.12.2] at
> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:213)
> ~[flink-dist_2.12-1.12.2.jar:1.12.2] ... 24 more*
>
> Thanks
> Prasanna,
>

Reply via email to