It's on 0.10.

I've tried explicitly registering SparseVector (which is done anyway by
registerFlinkMLTypes
<https://github.com/apache/flink/blob/e9bf13d8626099a1d6ddb6ebe98c50be848fe79e/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/common/FlinkMLTools.scala#L49>
which is called when the SVM predict or evaluate functions are called
<https://github.com/apache/flink/blob/e9bf13d8626099a1d6ddb6ebe98c50be848fe79e/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/pipeline/Predictor.scala#L58>)
in my job but I still get the same. I will try a couple different datasets
and try to see if it's the number of features that is causing this or
something else.

So far it works fine for a dataset with 8 features, but the large one has
2000 and I get the above error there. I will try large datasets with a few
features and small datasets with many features as well.

On Wed, Jan 20, 2016 at 11:39 AM, Stephan Ewen <se...@apache.org> wrote:

> Hi!
>
> Does this error occur in 0.10 or im 1.0-SNAPSHOT?
>
> It is probably an incorrectly configured Kryo instance (not a problem of
> the sorter).
> What is strange is that it occurs in the "MapReferenceResolver" - there
> should be no reference resolution during serialization / deserialization.
>
> Can you try what happens when you explicitly register the type
> SparseVector at the ExecutionEnvironment?
>
> Stephan
>
>
> On Wed, Jan 20, 2016 at 11:24 AM, Theodore Vasiloudis <
> theodoros.vasilou...@gmail.com> wrote:
>
>> Hello all,
>>
>> I'm trying to run a job using FlinkML and I'm confused about the source
>> of an error.
>>
>> The job reads a libSVM formatted file and trains an SVM classifier on it.
>>
>> I've tried this with small datasets and everything works out fine.
>>
>> When trying to run the same job on a large dataset (~11GB uncompressed)
>> however, I get the following error:
>>
>>
>>> java.lang.RuntimeException: Error obtaining the sorted input: Thread
>>> 'SortMerger spilling thread' terminated due to an exception:
>>> java.lang.IndexOutOfBoundsException: Index: 14, Size: 2
>>> Serialization trace:
>>> indices (org.apache.flink.ml.math.SparseVector)
>>>         at
>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
>>>         at
>>> org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1089)
>>>         at
>>> org.apache.flink.runtime.operators.NoOpDriver.run(NoOpDriver.java:78)
>>>         at
>>> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:489)
>>>         at
>>> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:354)
>>>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
>>>         at java.lang.Thread.run(Thread.java:745)
>>> Caused by: java.io.IOException: Thread 'SortMerger spilling thread'
>>> terminated due to an exception: java.lang.IndexOutOfBoundsException: Index:
>>> 14, Size: 2
>>> Serialization trace:
>>> indices (org.apache.flink.ml.math.SparseVector)
>>>         at
>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:800)
>>> Caused by: com.esotericsoftware.kryo.KryoException:
>>> java.lang.IndexOutOfBoundsException: Index: 14, Size: 2
>>> Serialization trace:
>>> indices (org.apache.flink.ml.math.SparseVector)
>>>         at
>>> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
>>>         at
>>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
>>>         at
>>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
>>>         at
>>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:222)
>>>         at
>>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:236)
>>>         at
>>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:246)
>>>         at
>>> org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)
>>>         at
>>> org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)
>>>         at
>>> org.apache.flink.runtime.operators.sort.NormalizedKeySorter.writeToOutput(NormalizedKeySorter.java:499)
>>>         at
>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.go(UnilateralSortMerger.java:1344)
>>>         at
>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)
>>> Caused by: java.lang.IndexOutOfBoundsException: Index: 14, Size: 2
>>>         at java.util.ArrayList.rangeCheck(ArrayList.java:653)
>>>         at java.util.ArrayList.set(ArrayList.java:444)
>>>         at
>>> com.esotericsoftware.kryo.util.MapReferenceResolver.setReadObject(MapReferenceResolver.java:38)
>>>         at com.esotericsoftware.kryo.Kryo.reference(Kryo.java:823)
>>>         at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:731)
>>>         at
>>> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:113)
>>>         ... 10 more
>>
>>
>>
>> Any idea what might be causing this? I'm running the job in local mode, 1
>> TM with 8 slots and ~32GB heap size.
>>
>> All the vectors created by the libSVM loader have the correct size.
>>
>
>

Reply via email to