Hi Pedro, No idea what might be causing it. Do you perhaps have some code to reproduce it locally?
Pozdrawiam, Jacek Laskowski ---- https://about.me/JacekLaskowski "The Internals Of" Online Books <https://books.japila.pl/> Follow me on https://twitter.com/jaceklaskowski <https://twitter.com/jaceklaskowski> On Tue, Aug 17, 2021 at 4:14 PM Pedro Tuero <tuerope...@gmail.com> wrote: > > Context: spark-core_2.12-3.1.1 > Testing with maven and eclipse. > > I'm modifying a project and a test stops working as expected. > The difference is in the parameters passed to the function aggregateByKey > of JavaPairRDD. > > JavaSparkContext is created this way: > new JavaSparkContext(new SparkConf() > .setMaster("local[1]") > .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")); > Then I construct a JavaPairRdd using sparkContext.paralellizePairs and > call a method which makes an aggregateByKey over the input JavaPairRDD and > test that the result is the expected. > > When I use JavaPairRDD line 369 (doing .aggregateByKey(zeroValue, > combiner, merger); > def aggregateByKey[U](zeroValue: U, seqFunc: JFunction2[U, V, U], > combFunc: JFunction2[U, U, U]): > JavaPairRDD[K, U] = { > implicit val ctag: ClassTag[U] = fakeClassTag > fromRDD(rdd.aggregateByKey(zeroValue)(seqFunc, combFunc)) > } > The test works as expected. > But when I use: JavaPairRDD line 355 (doing .aggregateByKey(zeroValue, > *partitions*,combiner, merger);) > def aggregateByKey[U](zeroValue: U, *numPartitions: Int,* seqFunc: > JFunction2[U, V, U], > combFunc: JFunction2[U, U, U]): JavaPairRDD[K, U] = { > implicit val ctag: ClassTag[U] = fakeClassTag > fromRDD(rdd.aggregateByKey(zeroValue, *numPartitions)*(seqFunc, > combFunc)) > } > The result is always empty. It looks like there is a problem with the > hashPartitioner created at PairRddFunctions : > def aggregateByKey[U: ClassTag](zeroValue: U, numPartitions: Int)(seqOp: > (U, V) => U, > combOp: (U, U) => U): RDD[(K, U)] = self.withScope { > aggregateByKey(zeroValue, *new HashPartitioner(numPartitions)*)(seqOp, > combOp) > } > vs: > def aggregateByKey[U: ClassTag](zeroValue: U)(seqOp: (U, V) => U, > combOp: (U, U) => U): RDD[(K, U)] = self.withScope { > aggregateByKey(zeroValue, *defaultPartitioner*(self))(seqOp, combOp) > } > I can't debug it properly with eclipse, and error occurs when threads are > in spark code (system editor can only open file base resources). > > Does anyone know how to resolve this issue? > > Thanks in advance, > Pedro. > > > >