[ 
https://issues.apache.org/jira/browse/FLINK-4719?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15535535#comment-15535535
 ] 

Flavio Pompermaier commented on FLINK-4719:
-------------------------------------------

Using the Flink 1.1.1 code my job fails frequently, instead using the following 
code for the KryoSerializer decrease a lot the frequency of such Exception.
I hope this could help in solving the problem:

{code:java}
/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.flink.api.java.typeutils.runtime.kryo;

import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.KryoException;
import com.esotericsoftware.kryo.Serializer;
import com.esotericsoftware.kryo.factories.ReflectionSerializerFactory;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
import com.esotericsoftware.kryo.serializers.JavaSerializer;

import org.apache.avro.generic.GenericData;

import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.typeutils.runtime.DataInputViewStream;
import org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream;
import org.apache.flink.api.java.typeutils.runtime.NoFetchingInput;
import 
org.apache.flink.api.java.typeutils.runtime.kryo.Serializers.SpecificInstanceCollectionSerializerForArrayList;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;

import org.objenesis.strategy.StdInstantiatorStrategy;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.EOFException;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.lang.reflect.Modifier;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.Map;
import java.util.Objects;

import static org.apache.flink.util.Preconditions.checkNotNull;

/**
 * A type serializer that serializes its type using the Kryo serialization
 * framework (https://github.com/EsotericSoftware/kryo).
 * 
 * This serializer is intended as a fallback serializer for the cases that are
 * not covered by the basic types, tuples, and POJOs.
 *
 * @param <T> The type to be serialized.
 */
public class KryoSerializer<T> extends TypeSerializer<T> {

        private static final long serialVersionUID = 3L;

        private static final Logger LOG = 
LoggerFactory.getLogger(KryoSerializer.class);

        // 
------------------------------------------------------------------------

        private final LinkedHashMap<Class<?>, 
ExecutionConfig.SerializableSerializer<?>> registeredTypesWithSerializers;
        private final LinkedHashMap<Class<?>, Class<? extends Serializer<?>>> 
registeredTypesWithSerializerClasses;
        private final LinkedHashMap<Class<?>, 
ExecutionConfig.SerializableSerializer<?>> defaultSerializers;
        private final LinkedHashMap<Class<?>, Class<? extends Serializer<?>>> 
defaultSerializerClasses;
        private final LinkedHashSet<Class<?>> registeredTypes;

        private final Class<T> type;
        
        // 
------------------------------------------------------------------------
        // The fields below are lazily initialized after duplication or 
deserialization.

        private transient Kryo kryo;
        private transient T copyInstance;
        
        // 
------------------------------------------------------------------------

        public KryoSerializer(Class<T> type, ExecutionConfig executionConfig){
                this.type = checkNotNull(type);

                this.defaultSerializers = 
executionConfig.getDefaultKryoSerializers();
                this.defaultSerializerClasses = 
executionConfig.getDefaultKryoSerializerClasses();
                this.registeredTypesWithSerializers = 
executionConfig.getRegisteredTypesWithKryoSerializers();
                this.registeredTypesWithSerializerClasses = 
executionConfig.getRegisteredTypesWithKryoSerializerClasses();
                this.registeredTypes = executionConfig.getRegisteredKryoTypes();
        }

        /**
         * Copy-constructor that does not copy transient fields. They will be 
initialized once required.
         */
        protected KryoSerializer(KryoSerializer<T> toCopy) {
                registeredTypesWithSerializers = 
toCopy.registeredTypesWithSerializers;
                registeredTypesWithSerializerClasses = 
toCopy.registeredTypesWithSerializerClasses;
                defaultSerializers = toCopy.defaultSerializers;
                defaultSerializerClasses = toCopy.defaultSerializerClasses;
                registeredTypes = toCopy.registeredTypes;

                type = toCopy.type;
                if(type == null){
                        throw new NullPointerException("Type class cannot be 
null.");
                }
        }

        // 
------------------------------------------------------------------------

        @Override
        public boolean isImmutableType() {
                return false;
        }

        @Override
        public KryoSerializer<T> duplicate() {
                return new KryoSerializer<T>(this);
        }

        @Override
        public T createInstance() {
                if(Modifier.isAbstract(type.getModifiers()) || 
Modifier.isInterface(type.getModifiers()) ) {
                        return null;
                } else {
                        checkKryoInitialized();
                        try {
                                return kryo.newInstance(type);
                        } catch(Throwable e) {
                                return null;
                        }
                }
        }

        @SuppressWarnings("unchecked")
        @Override
        public T copy(T from) {
                if (from == null) {
                        return null;
                }
                checkKryoInitialized();
                try {
                        return kryo.copy(from);
                }
                catch(KryoException ke) {
                        // kryo was unable to copy it, so we do it through 
serialization:
                        ByteArrayOutputStream baout = new 
ByteArrayOutputStream();
                        Output output = new Output(baout);

                        kryo.writeObject(output, from);

                        output.close();

                        ByteArrayInputStream bain = new 
ByteArrayInputStream(baout.toByteArray());
                        Input input = new Input(bain);
                        T ret = (T)kryo.readObject(input, from.getClass());
                        
                        input.close();

                        return ret;
                }
        }
        
        @Override
        public T copy(T from, T reuse) {
                return copy(from);
        }

        @Override
        public int getLength() {
                return -1;
        }

        @Override
        public void serialize(T record, DataOutputView target) throws 
IOException {
                checkKryoInitialized();
                DataOutputViewStream outputStream = new 
DataOutputViewStream(target);
                Output output = new Output(outputStream);

                try {
                        kryo.writeClassAndObject(output, record);
                }
                catch (KryoException ke) {
                        Throwable cause = ke.getCause();
                        if (cause instanceof EOFException) {
                                throw (EOFException) cause;
                        }
                        else {
                                throw ke;
                        }
                } finally {
                                try{
                                        output.close();
                                } catch (KryoException ke) {
                                                Throwable cause = ke.getCause();

                                                if (cause instanceof 
EOFException) {
                                                        throw (EOFException) 
cause;
                                                } else {
                                                                throw ke;
                                                }
                                }
                }
        }

        @SuppressWarnings("unchecked")
        @Override
        public T deserialize(DataInputView source) throws IOException {
                checkKryoInitialized();
                DataInputViewStream inputStream = new 
DataInputViewStream(source);
                Input input = new NoFetchingInput(inputStream);

                try {
                        return (T) kryo.readClassAndObject(input);
                } catch (KryoException ke) {
                        Throwable cause = ke.getCause();

                        if (cause instanceof EOFException) {
                                throw (EOFException) cause;
                        } else {
                                throw ke;
                        }
                } finally {
                        try{
                                input.close();
                        } catch (KryoException ke) {
                                        Throwable cause = ke.getCause();

                                        if (cause instanceof EOFException) {
                                                throw (EOFException) cause;
                                        } else {
                                                        throw ke;
                                        }
                                }
                }
        }
        
        @Override
        public T deserialize(T reuse, DataInputView source) throws IOException {
                return deserialize(source);
        }

        @Override
        public void copy(DataInputView source, DataOutputView target) throws 
IOException {
                checkKryoInitialized();
                if(this.copyInstance == null){
                        this.copyInstance = createInstance();
                }

                T tmp = deserialize(copyInstance, source);
                serialize(tmp, target);
        }
        
        // 
--------------------------------------------------------------------------------------------
        
        @Override
        public int hashCode() {
                return Objects.hash(
                        type,
                        registeredTypes,
                        registeredTypesWithSerializerClasses,
                        defaultSerializerClasses);
        }
        
        @Override
        public boolean equals(Object obj) {
                if (obj instanceof KryoSerializer) {
                        KryoSerializer<?> other = (KryoSerializer<?>) obj;

                        // we cannot include the Serializers here because they 
don't implement the equals method
                        return other.canEqual(this) &&
                                type == other.type &&
                                registeredTypes.equals(other.registeredTypes) &&
                                
registeredTypesWithSerializerClasses.equals(other.registeredTypesWithSerializerClasses)
 &&
                                
defaultSerializerClasses.equals(other.defaultSerializerClasses);
                } else {
                        return false;
                }
        }

        @Override
        public boolean canEqual(Object obj) {
                return obj instanceof KryoSerializer;
        }

        // 
--------------------------------------------------------------------------------------------

        /**
         * Returns the Chill Kryo Serializer which is implictly added to the 
classpath via flink-runtime.
         * Falls back to the default Kryo serializer if it can't be found.
         * @return The Kryo serializer instance.
         */
        private Kryo getKryoInstance() {

                try {
                        // check if ScalaKryoInstantiator is in class path 
(coming from Twitter's Chill library).
                        // This will be true if Flink's Scala API is used.
                        Class<?> chillInstantiatorClazz = 
Class.forName("com.twitter.chill.ScalaKryoInstantiator");
                        Object chillInstantiator = 
chillInstantiatorClazz.newInstance();

                        // obtain a Kryo instance through Twitter Chill
                        Method m = chillInstantiatorClazz.getMethod("newKryo");

                        return (Kryo) m.invoke(chillInstantiator);
                } catch (ClassNotFoundException | InstantiationException | 
NoSuchMethodException |
                        IllegalAccessException | InvocationTargetException e) {

                        LOG.warn("Falling back to default Kryo serializer 
because Chill serializer couldn't be found.", e);

                        Kryo.DefaultInstantiatorStrategy initStrategy = new 
Kryo.DefaultInstantiatorStrategy();
                        initStrategy.setFallbackInstantiatorStrategy(new 
StdInstantiatorStrategy());

                        Kryo kryo = new Kryo();
                        kryo.setInstantiatorStrategy(initStrategy);

                        return kryo;
                }
        }

        private void checkKryoInitialized() {
                if (this.kryo == null) {
                        this.kryo = getKryoInstance();

                        // Enable reference tracking. 
                        kryo.setReferences(true);
                        
                        // Throwable and all subclasses should be serialized 
via java serialization
                        kryo.addDefaultSerializer(Throwable.class, new 
JavaSerializer());

                        // Add default serializers first, so that they type 
registrations without a serializer
                        // are registered with a default serializer
                        for (Map.Entry<Class<?>, 
ExecutionConfig.SerializableSerializer<?>> entry: 
defaultSerializers.entrySet()) {
                                kryo.addDefaultSerializer(entry.getKey(), 
entry.getValue().getSerializer());
                        }

                        for (Map.Entry<Class<?>, Class<? extends 
Serializer<?>>> entry: defaultSerializerClasses.entrySet()) {
                                kryo.addDefaultSerializer(entry.getKey(), 
entry.getValue());
                        }

                        // register the type of our class
                        kryo.register(type);

                        // register given types. we do this first so that any 
registration of a
                        // more specific serializer overrides this
                        for (Class<?> type : registeredTypes) {
                                kryo.register(type);
                        }

                        // register given serializer classes
                        for (Map.Entry<Class<?>, Class<? extends 
Serializer<?>>> e : registeredTypesWithSerializerClasses.entrySet()) {
                                Class<?> typeClass = e.getKey();
                                Class<? extends Serializer<?>> serializerClass 
= e.getValue();

                                Serializer<?> serializer =
                                                
ReflectionSerializerFactory.makeSerializer(kryo, serializerClass, typeClass);
                                kryo.register(typeClass, serializer);
                        }

                        // register given serializers
                        for (Map.Entry<Class<?>, 
ExecutionConfig.SerializableSerializer<?>> e : 
registeredTypesWithSerializers.entrySet()) {
                                kryo.register(e.getKey(), 
e.getValue().getSerializer());
                        }
                        // this is needed for Avro but can not be added on 
demand.
                        kryo.register(GenericData.Array.class, new 
SpecificInstanceCollectionSerializerForArrayList());

                        kryo.setRegistrationRequired(false);
                        
kryo.setClassLoader(Thread.currentThread().getContextClassLoader());
                }
        }

        // 
--------------------------------------------------------------------------------------------
        // For testing
        // 
--------------------------------------------------------------------------------------------
        
        public Kryo getKryo() {
                checkKryoInitialized();
                return this.kryo;
        }
}
{code}

> KryoSerializer random exception
> -------------------------------
>
>                 Key: FLINK-4719
>                 URL: https://issues.apache.org/jira/browse/FLINK-4719
>             Project: Flink
>          Issue Type: Bug
>          Components: Core
>    Affects Versions: 1.1.1
>            Reporter: Flavio Pompermaier
>              Labels: kryo, serialization
>
> There's a random exception that involves somehow the KryoSerializer when 
> using POJOs in Flink jobs reading large volumes of data.
> It is usually thrown in several places, e.g. (the Exceptions reported here 
> can refer to previous versions of Flink...):
> {code}
> java.lang.Exception: The data preparation for task 'CHAIN GroupReduce 
> (GroupReduce at createResult(IndexMappingExecutor.java:42)) -> Map (Map at 
> main(Jsonizer.java:128))' , caused an error: Error obtaining the sorted 
> input: Thread 'SortMerger spilling thread' terminated due to an exception: 
> Unable to find class: java.ttil.HashSet
>         at 
> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:456)
>         at 
> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345)
>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>         at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.RuntimeException: Error obtaining the sorted input: 
> Thread 'SortMerger spilling thread' terminated due to an exception: Unable to 
> find class: java.ttil.HashSet
>         at 
> org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
>         at 
> org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1079)
>         at 
> org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:94)
>         at 
> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:450)
>         ... 3 more
> Caused by: java.io.IOException: Thread 'SortMerger spilling thread' 
> terminated due to an exception: Unable to find class: java.ttil.HashSet
>         at 
> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:800)
> Caused by: com.esotericsoftware.kryo.KryoException: Unable to find class: 
> java.ttil.HashSet
>         at 
> com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:138)
>         at 
> com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115)
>         at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
>         at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
>         at 
> com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143)
>         at 
> com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21)
>         at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
>         at 
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:228)
>         at 
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:242)
>         at 
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:252)
>         at 
> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.copy(PojoSerializer.java:556)
>         at 
> org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:75)
>         at 
> org.apache.flink.runtime.operators.sort.NormalizedKeySorter.writeToOutput(NormalizedKeySorter.java:499)
>         at 
> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.go(UnilateralSortMerger.java:1344)
>         at 
> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)
> Caused by: java.lang.ClassNotFoundException: java.ttil.HashSet
>         at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
>         at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>         at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>         at java.lang.Class.forName0(Native Method)
>         at java.lang.Class.forName(Class.java:348)
>         at 
> com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:136)
> {code}
> {code}
> Caused by: java.io.IOException: Serializer consumed more bytes than the 
> record had. This indicates broken serialization. If you are using custom 
> serialization types (Value or Writable), check their serialization methods. 
> If you are using a Kryo-serialized type, check the corresponding Kryo 
> serializer.
>     at 
> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:142)
>     at 
> org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:65)
>     at 
> org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:34)
>     at 
> org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73)
>     at 
> org.apache.flink.runtime.operators.FlatMapDriver.run(FlatMapDriver.java:101)
>     at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:480)
>     at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345)
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>     at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.ArrayIndexOutOfBoundsException: -2
>     at java.util.ArrayList.elementData(ArrayList.java:418)
>     at java.util.ArrayList.get(ArrayList.java:431)
>     at 
> com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42)
>     at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805)
>     at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:759)
>     at 
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:228)
>     at 
> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:431)
>     at 
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:135)
>     at 
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
>     at 
> org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
>     at 
> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:124)
> {code}
> {code}
> java.lang.RuntimeException: Cannot instantiate class.
>       at 
> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:407)
>       at 
> org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
>       at 
> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:124)
>       at 
> org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:65)
>       at 
> org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:34)
>       at 
> org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73)
>       at 
> org.apache.flink.runtime.operators.FlatMapDriver.run(FlatMapDriver.java:101)
>       at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:480)
>       at 
> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345)
>       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>       at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.ClassNotFoundException: 
> it.okkam.flink.test.model.pojo.VdhicleEvent
>       at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
>       at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>       at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>       at java.lang.Class.forName0(Native Method)
>       at java.lang.Class.forName(Class.java:348)
>       at 
> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:405)
>       ... 10 more
> {code}
> {code}
> com.esotericsoftware.kryo.KryoException: Unable to find class: ^Z^A
>         at 
> com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:138)
>         at 
> com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115)
>         at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
>         at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
>         at 
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:228)
>         at 
> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:431)
>         at 
> org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
>         at 
> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:124)
>         at 
> org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:65)
>         at 
> org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:34)
>         at 
> org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73)
>         at org.apache.flink.runtime.operators.MapDriver.run(MapDriver.java:96)
>         at 
> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:480)
>         at 
> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345)
>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>         at java.lang.Thread.run(Thread.java:745)
> {code}
> {code}
> Caused by: java.lang.ArrayIndexOutOfBoundsException: -2
>       at java.util.ArrayList.elementData(ArrayList.java:418)
>       at java.util.ArrayList.get(ArrayList.java:431)
>       at 
> com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42)
>       at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805)
>       at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:759)
>       at 
> com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:135)
>       at 
> com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21)
>       at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
>       at 
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:232)
>       at 
> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:431)
>       at 
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:135)
>       at 
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
>       at 
> org.apache.flink.runtime.io.disk.ChannelReaderInputViewIterator.next(ChannelReaderInputViewIterator.java:100)
>       at 
> org.apache.flink.runtime.operators.sort.MergeIterator$HeadStream.nextHead(MergeIterator.java:161)
>       at 
> org.apache.flink.runtime.operators.sort.MergeIterator.next(MergeIterator.java:113)
>       at 
> org.apache.flink.runtime.operators.util.metrics.CountingMutableObjectIterator.next(CountingMutableObjectIterator.java:45)
>       at 
> org.apache.flink.runtime.util.NonReusingKeyGroupedIterator.advanceToNext(NonReusingKeyGroupedIterator.java:130)
>       at 
> org.apache.flink.runtime.util.NonReusingKeyGroupedIterator.access$300(NonReusingKeyGroupedIterator.java:32)
>       at 
> org.apache.flink.runtime.util.NonReusingKeyGroupedIterator$ValuesIterator.next(NonReusingKeyGroupedIterator.java:192)
>       at 
> org.okkam.entitons.mapping.flink.IndexMappingExecutor$TupleToEntitonJsonNode.reduce(IndexMappingExecutor.java:64)
>       at 
> org.apache.flink.runtime.operators.GroupReduceDriver.run(GroupReduceDriver.java:131)
>       at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:486)
>       at 
> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:351)
>       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
>       at java.lang.Thread.run(Thread.java:745)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to