Hi Xiangrui,

I'm sorry. I didn't recognize your mail.
What I did is a workaround only working for my special case.
It does not scale and only works for small data sets but that is fine
for me so far.

Kind Regards,
Niklas

  def securlyZipRdds[A, B: ClassTag](rdd1: RDD[A], rdd2: RDD[B]):
RDD[(A, B)] = {
    val rdd1Repartitioned = rdd1.repartition(1)
    val rdd2Repartitioned = rdd2.repartition(1)
    val (rdd1Balanced, rdd2Balanced) =
balanceRddSizes(rdd1Repartitioned, rdd2Repartitioned)
    rdd1Balanced.zip(rdd2Balanced)
  }

  def balanceRddSizes[A, B](rdd1: RDD[A], rdd2: RDD[B]): (RDD[A],
RDD[B]) = {
    val rdd1count = rdd1.count()
    val rdd2count = rdd2.count()
    val difference = math.abs(rdd1count - rdd2count).toInt
    if (rdd1count > rdd2count) {
      (removeRandomElements(rdd1, difference), rdd2)
    } else if (rdd2count > rdd1count) {
      (rdd1, removeRandomElements(rdd2, difference))
    } else {
      (rdd1, rdd2)
    }
  }

  def removeRandomElements[A](rdd: RDD[A], numberOfElements: Int):
RDD[A] = {
    val sample: Array[A] = rdd.takeSample(false, numberOfElements)
    val set: Set[A] = Set(sample: _*)
    rdd.filter(x => if (set.contains(x)) false else true)
  }

On 10.01.2015 06:56, Xiangrui Meng wrote:
> "sample 2 * n tuples, split them into two parts, balance the sizes of
> these parts by filtering some tuples out"
> 
> How do you guarantee that the two RDDs have the same size?
> 
> -Xiangrui
> 
> On Fri, Jan 9, 2015 at 3:40 AM, Niklas Wilcke
> <1wil...@informatik.uni-hamburg.de> wrote:
>> Hi Spark community,
>>
>> I have a problem with zipping two RDDs of the same size and same number of
>> partitions.
>> The error message says that zipping is only allowed on RDDs which are
>> partitioned into chunks of exactly the same sizes.
>> How can I assure this? My workaround at the moment is to repartition both
>> RDDs to only one partition but that obviously
>> does not scale.
>>
>> This problem originates from my problem to draw n random tuple pairs (Tuple,
>> Tuple) from an RDD[Tuple].
>> What I do is to sample 2 * n tuples, split them into two parts, balance the
>> sizes of these parts
>> by filtering some tuples out and zipping them together.
>>
>> I would appreciate to read better approaches for both problems.
>>
>> Thanks in advance,
>> Niklas

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to