Hi Pedro,

No idea what might be causing it. Do you perhaps have some code to
reproduce it locally?

Pozdrawiam,
Jacek Laskowski
----
https://about.me/JacekLaskowski
"The Internals Of" Online Books <https://books.japila.pl/>
Follow me on https://twitter.com/jaceklaskowski

<https://twitter.com/jaceklaskowski>


On Tue, Aug 17, 2021 at 4:14 PM Pedro Tuero <tuerope...@gmail.com> wrote:

>
> Context: spark-core_2.12-3.1.1
> Testing with maven and eclipse.
>
> I'm modifying a project and a test stops working as expected.
> The difference is in the parameters passed to the function aggregateByKey
> of JavaPairRDD.
>
> JavaSparkContext is created this way:
> new JavaSparkContext(new SparkConf()
> .setMaster("local[1]")
> .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"));
> Then I construct a JavaPairRdd using sparkContext.paralellizePairs and
> call a method which makes an aggregateByKey over the input JavaPairRDD  and
> test that the result is the expected.
>
> When I use JavaPairRDD line 369 (doing .aggregateByKey(zeroValue,
> combiner, merger);
>  def aggregateByKey[U](zeroValue: U, seqFunc: JFunction2[U, V, U],
> combFunc: JFunction2[U, U, U]):
>       JavaPairRDD[K, U] = {
>     implicit val ctag: ClassTag[U] = fakeClassTag
>     fromRDD(rdd.aggregateByKey(zeroValue)(seqFunc, combFunc))
>   }
> The test works as expected.
> But when I use: JavaPairRDD line 355 (doing .aggregateByKey(zeroValue,
> *partitions*,combiner, merger);)
> def aggregateByKey[U](zeroValue: U, *numPartitions: Int,* seqFunc:
> JFunction2[U, V, U],
>       combFunc: JFunction2[U, U, U]): JavaPairRDD[K, U] = {
>     implicit val ctag: ClassTag[U] = fakeClassTag
>     fromRDD(rdd.aggregateByKey(zeroValue, *numPartitions)*(seqFunc,
> combFunc))
>   }
> The result is always empty. It looks like there is a problem with the
> hashPartitioner created at PairRddFunctions :
>  def aggregateByKey[U: ClassTag](zeroValue: U, numPartitions: Int)(seqOp:
> (U, V) => U,
>       combOp: (U, U) => U): RDD[(K, U)] = self.withScope {
>     aggregateByKey(zeroValue, *new HashPartitioner(numPartitions)*)(seqOp,
> combOp)
>   }
> vs:
>  def aggregateByKey[U: ClassTag](zeroValue: U)(seqOp: (U, V) => U,
>       combOp: (U, U) => U): RDD[(K, U)] = self.withScope {
>     aggregateByKey(zeroValue, *defaultPartitioner*(self))(seqOp, combOp)
>   }
> I can't debug it properly with eclipse, and error occurs when threads are
> in spark code (system editor can only open file base resources).
>
> Does anyone know how to resolve this issue?
>
> Thanks in advance,
> Pedro.
>
>
>
>

Reply via email to