Haven't looked to deeply into this, but this sounds like object reuse is
enabled, at which point buffering values effectively causes you to store
the same value multiple times.
can you try disabling objectReuse using
env.getConfig().disableObjectReuse() ?
On 22.03.2016 16:53, Sergio Ramírez wrote:
Hi all,
I've been having some problems with RichMapPartitionFunction. Firstly,
I tried to convert the iterable into an array unsuccessfully. Then, I
have used some buffers to store the values per column. I am trying to
transpose the local matrix of LabeledVectors that I have in each
partition.
None of these solutions have worked. For example, for partition 7 and
feature 10, the vector is empty, whereas for the same partition and
feature 11, the vectors contains 200 elements. And this change on each
execution, different partitions and features.
I think there is a problem with using the collect method out of the
iterable loop.
new RichMapPartitionFunction[LabeledVector, ((Int, Int),
Array[Byte])]() {
def mapPartition(it: java.lang.Iterable[LabeledVector], out:
Collector[((Int, Int), Array[Byte])]): Unit = {
val index = getRuntimeContext().getIndexOfThisSubtask()
val mat = for (i <- 0 until nFeatures) yield new
scala.collection.mutable.ListBuffer[Byte]
for(reg <- it.asScala) {
for (i <- 0 until (nFeatures - 1)) mat(i) +=
reg.vector(i).toByte
mat(nFeatures - 1) += classMap(reg.label)
}
for(i <- 0 until nFeatures) out.collect((i, index) ->
mat(i).toArray) // numPartitions
}
}
Regards