It's on 0.10. I've tried explicitly registering SparseVector (which is done anyway by registerFlinkMLTypes <https://github.com/apache/flink/blob/e9bf13d8626099a1d6ddb6ebe98c50be848fe79e/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/common/FlinkMLTools.scala#L49> which is called when the SVM predict or evaluate functions are called <https://github.com/apache/flink/blob/e9bf13d8626099a1d6ddb6ebe98c50be848fe79e/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/pipeline/Predictor.scala#L58>) in my job but I still get the same. I will try a couple different datasets and try to see if it's the number of features that is causing this or something else.
So far it works fine for a dataset with 8 features, but the large one has 2000 and I get the above error there. I will try large datasets with a few features and small datasets with many features as well. On Wed, Jan 20, 2016 at 11:39 AM, Stephan Ewen <se...@apache.org> wrote: > Hi! > > Does this error occur in 0.10 or im 1.0-SNAPSHOT? > > It is probably an incorrectly configured Kryo instance (not a problem of > the sorter). > What is strange is that it occurs in the "MapReferenceResolver" - there > should be no reference resolution during serialization / deserialization. > > Can you try what happens when you explicitly register the type > SparseVector at the ExecutionEnvironment? > > Stephan > > > On Wed, Jan 20, 2016 at 11:24 AM, Theodore Vasiloudis < > theodoros.vasilou...@gmail.com> wrote: > >> Hello all, >> >> I'm trying to run a job using FlinkML and I'm confused about the source >> of an error. >> >> The job reads a libSVM formatted file and trains an SVM classifier on it. >> >> I've tried this with small datasets and everything works out fine. >> >> When trying to run the same job on a large dataset (~11GB uncompressed) >> however, I get the following error: >> >> >>> java.lang.RuntimeException: Error obtaining the sorted input: Thread >>> 'SortMerger spilling thread' terminated due to an exception: >>> java.lang.IndexOutOfBoundsException: Index: 14, Size: 2 >>> Serialization trace: >>> indices (org.apache.flink.ml.math.SparseVector) >>> at >>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619) >>> at >>> org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1089) >>> at >>> org.apache.flink.runtime.operators.NoOpDriver.run(NoOpDriver.java:78) >>> at >>> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:489) >>> at >>> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:354) >>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584) >>> at java.lang.Thread.run(Thread.java:745) >>> Caused by: java.io.IOException: Thread 'SortMerger spilling thread' >>> terminated due to an exception: java.lang.IndexOutOfBoundsException: Index: >>> 14, Size: 2 >>> Serialization trace: >>> indices (org.apache.flink.ml.math.SparseVector) >>> at >>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:800) >>> Caused by: com.esotericsoftware.kryo.KryoException: >>> java.lang.IndexOutOfBoundsException: Index: 14, Size: 2 >>> Serialization trace: >>> indices (org.apache.flink.ml.math.SparseVector) >>> at >>> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125) >>> at >>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528) >>> at >>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) >>> at >>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:222) >>> at >>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:236) >>> at >>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:246) >>> at >>> org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73) >>> at >>> org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73) >>> at >>> org.apache.flink.runtime.operators.sort.NormalizedKeySorter.writeToOutput(NormalizedKeySorter.java:499) >>> at >>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.go(UnilateralSortMerger.java:1344) >>> at >>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796) >>> Caused by: java.lang.IndexOutOfBoundsException: Index: 14, Size: 2 >>> at java.util.ArrayList.rangeCheck(ArrayList.java:653) >>> at java.util.ArrayList.set(ArrayList.java:444) >>> at >>> com.esotericsoftware.kryo.util.MapReferenceResolver.setReadObject(MapReferenceResolver.java:38) >>> at com.esotericsoftware.kryo.Kryo.reference(Kryo.java:823) >>> at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:731) >>> at >>> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:113) >>> ... 10 more >> >> >> >> Any idea what might be causing this? I'm running the job in local mode, 1 >> TM with 8 slots and ~32GB heap size. >> >> All the vectors created by the libSVM loader have the correct size. >> > >