[ 
https://issues.apache.org/jira/browse/FLINK-2800?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14959143#comment-14959143
 ] 

Stefano Bortoli commented on FLINK-2800:
----------------------------------------

Ok, I have run a couple of tests more, and the process was completed without 
the KryoPool. However, when I reverted back to the global variable for Input 
and Output, I could reproduce the error. 

{quote}
10/15/2015 18:05:59     Job execution switched to status FAILED.
2015-10-15 18:05:59 INFO  JobManager:137 - Status of job 
1b05e39a7ea019d0a57702eb2a06d64a (Flink Java Job at Thu Oct 15 18:03:33 CEST 
2015) changed to FAILED.
com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: 115
        at 
com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:137)
        at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:667)
        at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:778)
        at 
com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:153)
        at 
com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:39)
        at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:787)
        at 
com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:161)
        at 
com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:39)
        at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:787)
        at 
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:251)
        at 
org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:127)
        at 
org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:1)
        at 
org.apache.flink.runtime.operators.resettable.AbstractBlockResettableIterator.getNextRecord(AbstractBlockResettableIterator.java:180)
        at 
org.apache.flink.runtime.operators.resettable.BlockResettableMutableObjectIterator.next(BlockResettableMutableObjectIterator.java:111)
        at 
org.apache.flink.runtime.operators.CrossDriver.runBlockedOuterSecond(CrossDriver.java:309)
        at 
org.apache.flink.runtime.operators.CrossDriver.run(CrossDriver.java:162)
        at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:489)
        at 
org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:354)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
        at java.lang.Thread.run(Thread.java:745)
2015-10-15 18:05:59 INFO  JobClient:200 - Job execution failed
{quote}

This version of the method does not work:
{code}
@SuppressWarnings("unchecked")
        @Override
        public T deserialize(DataInputView source) throws IOException {
                if (source != previousIn) {
                        previousIn = source;
                        DataInputViewStream inputStream = new 
DataInputViewStream(source);
                        input = new NoFetchingInput(inputStream);
                }
//              DataInputViewStream inputStream = new 
DataInputViewStream(source);
//              Input input = new NoFetchingInput(inputStream);
                checkKryoPoolInitialization();
//              Kryo kryo = kryoPool.borrow();
                try {
                        return (T) kryo.readClassAndObject(input);
                } catch (KryoException ke) {
                        Throwable cause = ke.getCause();

                        if (cause instanceof EOFException) {
                                throw (EOFException) cause;
                        } else {
                                throw ke;
                        }
                } finally {
//                      kryoPool.release(kryo);
                }
        }
{code}

whereas this one worked:
{code}
@SuppressWarnings("unchecked")
        @Override
        public T deserialize(DataInputView source) throws IOException {
                if (source != previousIn) {
                        previousIn = source;
//                      DataInputViewStream inputStream = new 
DataInputViewStream(source);
//                      input = new NoFetchingInput(inputStream);
                }
                DataInputViewStream inputStream = new 
DataInputViewStream(source);
                Input input = new NoFetchingInput(inputStream);
                checkKryoPoolInitialization();
//              Kryo kryo = kryoPool.borrow();
                try {
                        return (T) kryo.readClassAndObject(input);
                } catch (KryoException ke) {
                        Throwable cause = ke.getCause();

                        if (cause instanceof EOFException) {
                                throw (EOFException) cause;
                        } else {
                                throw ke;
                        }
                } finally {
//                      kryoPool.release(kryo);
                }
        }

{code}


I hope this helps either solving the problem, or finding the cause of the 
problem. 

> kryo serialization problem
> --------------------------
>
>                 Key: FLINK-2800
>                 URL: https://issues.apache.org/jira/browse/FLINK-2800
>             Project: Flink
>          Issue Type: Bug
>          Components: Type Serialization System
>    Affects Versions: 0.10
>         Environment: linux ubuntu 12.04 LTS, Java 7
>            Reporter: Stefano Bortoli
>
> Performing a cross of two dataset of POJOs I have got the exception below. 
> The first time I run the process, there was no problem. When I run it the 
> second time, I have got the exception. My guess is that it could be a race 
> condition related to the reuse of the Kryo serializer object. However, it 
> could also be "a bug where type registrations are not properly forwarded to 
> all Serializers", as suggested by Stephan.
> ------------------------------------------------------------------------
> 2015-10-01 18:18:21 INFO  JobClient:161 - 10/01/2015 18:18:21 Cross(Cross at 
> main(FlinkMongoHadoop2LinkPOI2CDA.java:160))(3/4) switched to FAILED 
> com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: 
> 114
>       at 
> com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:119)
>       at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
>       at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
>       at 
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:210)
>       at 
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:127)
>       at 
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
>       at 
> org.apache.flink.runtime.operators.resettable.AbstractBlockResettableIterator.getNextRecord(AbstractBlockResettableIterator.java:180)
>       at 
> org.apache.flink.runtime.operators.resettable.BlockResettableMutableObjectIterator.next(BlockResettableMutableObjectIterator.java:111)
>       at 
> org.apache.flink.runtime.operators.CrossDriver.runBlockedOuterSecond(CrossDriver.java:309)
>       at 
> org.apache.flink.runtime.operators.CrossDriver.run(CrossDriver.java:162)
>       at 
> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:489)
>       at 
> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:354)
>       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:581)
>       at java.lang.Thread.run(Thread.java:745)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to