OK here's what I tried:

* Build Flink (mvn clean install) from the branch you linked (kryo)
* Build my uber-jar, I use SBT with 1.0-SNAPSHOT as the Flink version,
added local maven repo to resolvers so that it picks up the previously
installed version (I hope)
* Launch local cluster from newly built Flink, try to run job

Still getting the same error.

Is there a way to ensure that SBT is picking up the local version of Flink
to build the uber-jar?
Does it matter in this case, or is it enough that I'm sure the launched
Flink instance comes from the branch you linked?

On Wed, Jan 20, 2016 at 4:30 PM, Stephan Ewen <se...@apache.org> wrote:

> The bug looks to be in the serialization via Kryo while spilling windows.
> Note that Kryo is here used as a fallback serializer, since the
> SparseVector is not transparent type to Flink.
>
> I think there are two possible reasons:
>   1) Kryo, or our Kryo setup has an issue here
>   2) Kryo is inconsistently configured. There are multiple Kryo instances
> used across the serializers in the sorter. There may be a bug that they are
> not initialized in sync.
>
>
> To check this, can you build Flink with this pull request (
> https://github.com/apache/flink/pull/1528) or from this branch (
> https://github.com/StephanEwen/incubator-flink kryo) and see if that
> fixes it?
>
>
> Thanks,
> Stephan
>
>
>
>
>
> On Wed, Jan 20, 2016 at 3:33 PM, Theodore Vasiloudis <
> theodoros.vasilou...@gmail.com> wrote:
>
>> I haven't been able to reproduce this with other datasets. Taking a
>> smaller sample from the large dataset I'm using (link to data
>> <http://www.csie.ntu.edu.tw/~cjlin/libsvmtools/datasets/binary.html#epsilon>)
>> causes the same problem however.
>>
>> I'm wondering if the implementation of readLibSVM is what's wrong here.
>> I've tried the new version commited recently by Chiwan, but I still get the
>> same error.
>>
>> I'll see if I can spot a bug in readLibSVM.
>>
>> On Wed, Jan 20, 2016 at 1:43 PM, Theodore Vasiloudis <
>> theodoros.vasilou...@gmail.com> wrote:
>>
>>> It's on 0.10.
>>>
>>> I've tried explicitly registering SparseVector (which is done anyway by
>>> registerFlinkMLTypes
>>> <https://github.com/apache/flink/blob/e9bf13d8626099a1d6ddb6ebe98c50be848fe79e/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/common/FlinkMLTools.scala#L49>
>>> which is called when the SVM predict or evaluate functions are called
>>> <https://github.com/apache/flink/blob/e9bf13d8626099a1d6ddb6ebe98c50be848fe79e/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/pipeline/Predictor.scala#L58>)
>>> in my job but I still get the same. I will try a couple different datasets
>>> and try to see if it's the number of features that is causing this or
>>> something else.
>>>
>>> So far it works fine for a dataset with 8 features, but the large one
>>> has 2000 and I get the above error there. I will try large datasets with a
>>> few features and small datasets with many features as well.
>>>
>>> On Wed, Jan 20, 2016 at 11:39 AM, Stephan Ewen <se...@apache.org> wrote:
>>>
>>>> Hi!
>>>>
>>>> Does this error occur in 0.10 or im 1.0-SNAPSHOT?
>>>>
>>>> It is probably an incorrectly configured Kryo instance (not a problem
>>>> of the sorter).
>>>> What is strange is that it occurs in the "MapReferenceResolver" -
>>>> there should be no reference resolution during serialization /
>>>> deserialization.
>>>>
>>>> Can you try what happens when you explicitly register the type
>>>> SparseVector at the ExecutionEnvironment?
>>>>
>>>> Stephan
>>>>
>>>>
>>>> On Wed, Jan 20, 2016 at 11:24 AM, Theodore Vasiloudis <
>>>> theodoros.vasilou...@gmail.com> wrote:
>>>>
>>>>> Hello all,
>>>>>
>>>>> I'm trying to run a job using FlinkML and I'm confused about the
>>>>> source of an error.
>>>>>
>>>>> The job reads a libSVM formatted file and trains an SVM classifier on
>>>>> it.
>>>>>
>>>>> I've tried this with small datasets and everything works out fine.
>>>>>
>>>>> When trying to run the same job on a large dataset (~11GB
>>>>> uncompressed) however, I get the following error:
>>>>>
>>>>>
>>>>>> java.lang.RuntimeException: Error obtaining the sorted input: Thread
>>>>>> 'SortMerger spilling thread' terminated due to an exception:
>>>>>> java.lang.IndexOutOfBoundsException: Index: 14, Size: 2
>>>>>> Serialization trace:
>>>>>> indices (org.apache.flink.ml.math.SparseVector)
>>>>>>         at
>>>>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
>>>>>>         at
>>>>>> org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1089)
>>>>>>         at
>>>>>> org.apache.flink.runtime.operators.NoOpDriver.run(NoOpDriver.java:78)
>>>>>>         at
>>>>>> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:489)
>>>>>>         at
>>>>>> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:354)
>>>>>>         at
>>>>>> org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
>>>>>>         at java.lang.Thread.run(Thread.java:745)
>>>>>> Caused by: java.io.IOException: Thread 'SortMerger spilling thread'
>>>>>> terminated due to an exception: java.lang.IndexOutOfBoundsException: 
>>>>>> Index:
>>>>>> 14, Size: 2
>>>>>> Serialization trace:
>>>>>> indices (org.apache.flink.ml.math.SparseVector)
>>>>>>         at
>>>>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:800)
>>>>>> Caused by: com.esotericsoftware.kryo.KryoException:
>>>>>> java.lang.IndexOutOfBoundsException: Index: 14, Size: 2
>>>>>> Serialization trace:
>>>>>> indices (org.apache.flink.ml.math.SparseVector)
>>>>>>         at
>>>>>> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
>>>>>>         at
>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
>>>>>>         at
>>>>>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
>>>>>>         at
>>>>>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:222)
>>>>>>         at
>>>>>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:236)
>>>>>>         at
>>>>>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:246)
>>>>>>         at
>>>>>> org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)
>>>>>>         at
>>>>>> org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)
>>>>>>         at
>>>>>> org.apache.flink.runtime.operators.sort.NormalizedKeySorter.writeToOutput(NormalizedKeySorter.java:499)
>>>>>>         at
>>>>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.go(UnilateralSortMerger.java:1344)
>>>>>>         at
>>>>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)
>>>>>> Caused by: java.lang.IndexOutOfBoundsException: Index: 14, Size: 2
>>>>>>         at java.util.ArrayList.rangeCheck(ArrayList.java:653)
>>>>>>         at java.util.ArrayList.set(ArrayList.java:444)
>>>>>>         at
>>>>>> com.esotericsoftware.kryo.util.MapReferenceResolver.setReadObject(MapReferenceResolver.java:38)
>>>>>>         at com.esotericsoftware.kryo.Kryo.reference(Kryo.java:823)
>>>>>>         at
>>>>>> com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:731)
>>>>>>         at
>>>>>> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:113)
>>>>>>         ... 10 more
>>>>>
>>>>>
>>>>>
>>>>> Any idea what might be causing this? I'm running the job in local
>>>>> mode, 1 TM with 8 slots and ~32GB heap size.
>>>>>
>>>>> All the vectors created by the libSVM loader have the correct size.
>>>>>
>>>>
>>>>
>>>
>>
>

Reply via email to