Adaptive behavior of Spark at different network transfer rates?
Hello, I'm facing a strange behavior regarding a larger data processing pipeline consisting of multiple steps involving Spark core and GraphX. Increasing the network transfer rate in the 5 node cluster from 100 Mbit/s to 1 Gbit/s the runtime also increases from around 15 minutes to 19 Minutes. This only holds for large input files. On small files the faster transfer rate decreases the runtime by around one third. I tested the network transfer rate by transmitting files from node to node. On 100 Mbit/s I get 11,7 MByte/s and on 1 Gbit/s I get 67 MByte/s. For that reason the network itself should not be the reason. My question is. Does Spark and especially GraphX adapt its behavior to the available network transfer rate? Does anybody have an idea how a faster network could decrease the performance? Thank you very much! Kind regards, Niklas Wilcke - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Zipping RDDs of equal size not possible
Hi Xiangrui, I'm sorry. I didn't recognize your mail. What I did is a workaround only working for my special case. It does not scale and only works for small data sets but that is fine for me so far. Kind Regards, Niklas def securlyZipRdds[A, B: ClassTag](rdd1: RDD[A], rdd2: RDD[B]): RDD[(A, B)] = { val rdd1Repartitioned = rdd1.repartition(1) val rdd2Repartitioned = rdd2.repartition(1) val (rdd1Balanced, rdd2Balanced) = balanceRddSizes(rdd1Repartitioned, rdd2Repartitioned) rdd1Balanced.zip(rdd2Balanced) } def balanceRddSizes[A, B](rdd1: RDD[A], rdd2: RDD[B]): (RDD[A], RDD[B]) = { val rdd1count = rdd1.count() val rdd2count = rdd2.count() val difference = math.abs(rdd1count - rdd2count).toInt if (rdd1count rdd2count) { (removeRandomElements(rdd1, difference), rdd2) } else if (rdd2count rdd1count) { (rdd1, removeRandomElements(rdd2, difference)) } else { (rdd1, rdd2) } } def removeRandomElements[A](rdd: RDD[A], numberOfElements: Int): RDD[A] = { val sample: Array[A] = rdd.takeSample(false, numberOfElements) val set: Set[A] = Set(sample: _*) rdd.filter(x = if (set.contains(x)) false else true) } On 10.01.2015 06:56, Xiangrui Meng wrote: sample 2 * n tuples, split them into two parts, balance the sizes of these parts by filtering some tuples out How do you guarantee that the two RDDs have the same size? -Xiangrui On Fri, Jan 9, 2015 at 3:40 AM, Niklas Wilcke 1wil...@informatik.uni-hamburg.de wrote: Hi Spark community, I have a problem with zipping two RDDs of the same size and same number of partitions. The error message says that zipping is only allowed on RDDs which are partitioned into chunks of exactly the same sizes. How can I assure this? My workaround at the moment is to repartition both RDDs to only one partition but that obviously does not scale. This problem originates from my problem to draw n random tuple pairs (Tuple, Tuple) from an RDD[Tuple]. What I do is to sample 2 * n tuples, split them into two parts, balance the sizes of these parts by filtering some tuples out and zipping them together. I would appreciate to read better approaches for both problems. Thanks in advance, Niklas - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Zipping RDDs of equal size not possible
Hi Spark community, I have a problem with zipping two RDDs of the same size and same number of partitions. The error message says that zipping is only allowed on RDDs which are partitioned into chunks of exactly the same sizes. How can I assure this? My workaround at the moment is to repartition both RDDs to only one partition but that obviously does not scale. This problem originates from my problem to draw n random tuple pairs (Tuple, Tuple) from an RDD[Tuple]. What I do is to sample 2 * n tuples, split them into two parts, balance the sizes of these parts by filtering some tuples out and zipping them together. I would appreciate to read better approaches for both problems. Thanks in advance, Niklas
Re: unable to make a custom class as a key in a pairrdd
Hi Jao, I don't really know why this doesn't work but I have two hints. You don't need to override hashCode and equals. The modifier case is doing that for you. Writing case class PersonID(id: String) would be enough to get the class you want I think. If I change the type of the id param to Int it works for me but I don't know why. case class PersonID(id: Int) Looks like a strange behavior to me. Have a try. Good luck, Niklas On 23.10.2014 21:52, Jaonary Rabarisoa wrote: Hi all, I have the following case class that I want to use as a key in a key-value rdd. I defined the equals and hashCode methode but it's not working. What I'm doing wrong ? /case class PersonID(id: String) {/ / / / override def hashCode = id.hashCode/ / / / override def equals(other: Any) = other match {/ / / / case that: PersonID = this.id http://this.id == that.id http://that.id this.getClass == that.getClass/ / case _ = false/ / } / / } / / / / / / val p = sc.parallelize((1 until 10).map(x = (PersonID(1),x )))/ / / / / /p.groupByKey.collect foreach println/ / / /(PersonID(1),CompactBuffer(5))/ /(PersonID(1),CompactBuffer(6))/ /(PersonID(1),CompactBuffer(7))/ /(PersonID(1),CompactBuffer(8, 9))/ /(PersonID(1),CompactBuffer(1))/ /(PersonID(1),CompactBuffer(2))/ /(PersonID(1),CompactBuffer(3))/ /(PersonID(1),CompactBuffer(4))/ / / / / Best, Jao