[jira] [Commented] (SPARK-2104) RangePartitioner should use user specified serializer to serialize range bounds
[ https://issues.apache.org/jira/browse/SPARK-2104?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14045419#comment-14045419 ] Reynold Xin commented on SPARK-2104: That sounds good! RangePartitioner should use user specified serializer to serialize range bounds --- Key: SPARK-2104 URL: https://issues.apache.org/jira/browse/SPARK-2104 Project: Spark Issue Type: Bug Reporter: Reynold Xin Otherwise it is pretty annoying to do a sort on types that are not java serializable. To reproduce, just set the serializer to Kryo, and run the following job: {code} class JavaNonSerializableClass extends Comparable { override def compareTo(o: JavaNonSerializableClass) = 0 } sc.parallelize(Seq(new JavaNonSerializableClass, new JavaNonSerializableClass), 2).map(x = (x,x)).sortByKey() {code} Basically the partitioner will always be serialized using Java (by the task closure serializer). However, the rangeBounds variable in RangePartitioner should be serialized with the user specified serializer. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (SPARK-2104) RangePartitioner should use user specified serializer to serialize range bounds
[ https://issues.apache.org/jira/browse/SPARK-2104?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14045430#comment-14045430 ] Saisai Shao commented on SPARK-2104: Ok, got it. I will try to fix this issue :) RangePartitioner should use user specified serializer to serialize range bounds --- Key: SPARK-2104 URL: https://issues.apache.org/jira/browse/SPARK-2104 Project: Spark Issue Type: Bug Reporter: Reynold Xin Otherwise it is pretty annoying to do a sort on types that are not java serializable. To reproduce, just set the serializer to Kryo, and run the following job: {code} class JavaNonSerializableClass extends Comparable { override def compareTo(o: JavaNonSerializableClass) = 0 } sc.parallelize(Seq(new JavaNonSerializableClass, new JavaNonSerializableClass), 2).map(x = (x,x)).sortByKey() {code} Basically the partitioner will always be serialized using Java (by the task closure serializer). However, the rangeBounds variable in RangePartitioner should be serialized with the user specified serializer. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (SPARK-2104) RangePartitioner should use user specified serializer to serialize range bounds
[ https://issues.apache.org/jira/browse/SPARK-2104?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14045504#comment-14045504 ] Reynold Xin commented on SPARK-2104: BTW I have some old code I wrote -- you can do your changes based on this {code} /** * A [[org.apache.spark.Partitioner]] that partitions sortable records by range into roughly * equal ranges. The ranges are determined by sampling the content of the RDD passed in. * * Note that the actual number of partitions created by the RangePartitioner might not be the same * as the `partitions` parameter, in the case where the number of sampled records is less than * the value of `partitions`. */ class RangePartitioner[K : Ordering : ClassTag, V]( var partitions: Int, @transient rdd: RDD[_ : Product2[K,V]], private val ascending: Boolean = true) extends Partitioner { private var ordering = implicitly[Ordering[K]] // An array of upper bounds for the first (partitions - 1) partitions var rangeBounds: Array[K] = { if (partitions == 1) { Array() } else { val rddSize = rdd.count() val maxSampleSize = partitions * 20.0 val frac = math.min(maxSampleSize / math.max(rddSize, 1), 1.0) val rddSample = rdd.sample(false, frac, 1).map(_._1).collect().sorted if (rddSample.length == 0) { Array() } else { val bounds = new Array[K](partitions - 1) for (i - 0 until partitions - 1) { val index = (rddSample.length - 1) * (i + 1) / partitions bounds(i) = rddSample(index) } bounds } } } @throws(classOf[IOException]) private def writeObject(out: ObjectOutputStream): Unit = { val sfactory = SparkEnv.get.serializer // Treat java serializer with default action rather than going thru serialization, to avoid a // separate serialization header. sfactory match { case js: JavaSerializer = out.defaultWriteObject() case _ = out.writeInt(partitions) val ser = sfactory.newInstance() Utils.serializeViaNestedStream(out, ser) { stream = stream.writeObject(ordering) stream.writeObject(scala.reflect.classTag[K]) stream.writeObject(rangeBounds) } } } @throws(classOf[IOException]) private def readObject(in: ObjectInputStream): Unit = { val sfactory = SparkEnv.get.serializer sfactory match { case js: JavaSerializer = in.defaultReadObject() case _ = partitions = in.readInt() val ser = sfactory.newInstance() Utils.deserializeViaNestedStream(in, ser) { ds = println(ds) ordering = ds.readObject[Ordering[K]]() implicit val classTag = ds.readObject[ClassTag[Array[K]]]() rangeBounds = ds.readObject[Array[K]]()(classTag) binarySearch = CollectionsUtils.makeBinarySearch[K] } } } def numPartitions = rangeBounds.length + 1 private var binarySearch: ((Array[K], K) = Int) = CollectionsUtils.makeBinarySearch[K] def getPartition(key: Any): Int = { val k = key.asInstanceOf[K] var partition = 0 if (rangeBounds.length 1000) { // If we have less than 100 partitions naive search while (partition rangeBounds.length ordering.gt(k, rangeBounds(partition))) { partition += 1 } } else { // Determine which binary search method to use only once. partition = binarySearch(rangeBounds, k) // binarySearch either returns the match location or -[insertion point]-1 if (partition 0) { partition = -partition-1 } if (partition rangeBounds.length) { partition = rangeBounds.length } } if (ascending) { partition } else { rangeBounds.length - partition } } override def equals(other: Any): Boolean = other match { case r: RangePartitioner[_,_] = r.rangeBounds.sameElements(rangeBounds) r.ascending == ascending case _ = false } override def hashCode(): Int = { val prime = 31 var result = 1 var i = 0 while (i rangeBounds.length) { result = prime * result + rangeBounds(i).hashCode i += 1 } result = prime * result + ascending.hashCode result } } {code} RangePartitioner should use user specified serializer to serialize range bounds --- Key: SPARK-2104 URL: https://issues.apache.org/jira/browse/SPARK-2104 Project: Spark Issue Type: Bug Reporter: Reynold Xin Otherwise it is pretty annoying to do a sort on types that are not java serializable. To reproduce, just set the serializer to Kryo, and run the following job: {code} class JavaNonSerializableClass extends Comparable { override def compareTo(o: JavaNonSerializableClass) = 0
[jira] [Commented] (SPARK-2104) RangePartitioner should use user specified serializer to serialize range bounds
[ https://issues.apache.org/jira/browse/SPARK-2104?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14045519#comment-14045519 ] Saisai Shao commented on SPARK-2104: Hi Reynold, thanks a lot for your code. At first glance seems it is quite OK for the problem you mentioned, I'm not sure is there any hiding corners I missed? RangePartitioner should use user specified serializer to serialize range bounds --- Key: SPARK-2104 URL: https://issues.apache.org/jira/browse/SPARK-2104 Project: Spark Issue Type: Bug Reporter: Reynold Xin Otherwise it is pretty annoying to do a sort on types that are not java serializable. To reproduce, just set the serializer to Kryo, and run the following job: {code} class JavaNonSerializableClass extends Comparable { override def compareTo(o: JavaNonSerializableClass) = 0 } sc.parallelize(Seq(new JavaNonSerializableClass, new JavaNonSerializableClass), 2).map(x = (x,x)).sortByKey() {code} Basically the partitioner will always be serialized using Java (by the task closure serializer). However, the rangeBounds variable in RangePartitioner should be serialized with the user specified serializer. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (SPARK-2104) RangePartitioner should use user specified serializer to serialize range bounds
[ https://issues.apache.org/jira/browse/SPARK-2104?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14045520#comment-14045520 ] Reynold Xin commented on SPARK-2104: I don't really remember ... :) It's been a while I guess adding tests would be important. RangePartitioner should use user specified serializer to serialize range bounds --- Key: SPARK-2104 URL: https://issues.apache.org/jira/browse/SPARK-2104 Project: Spark Issue Type: Bug Reporter: Reynold Xin Otherwise it is pretty annoying to do a sort on types that are not java serializable. To reproduce, just set the serializer to Kryo, and run the following job: {code} class JavaNonSerializableClass extends Comparable { override def compareTo(o: JavaNonSerializableClass) = 0 } sc.parallelize(Seq(new JavaNonSerializableClass, new JavaNonSerializableClass), 2).map(x = (x,x)).sortByKey() {code} Basically the partitioner will always be serialized using Java (by the task closure serializer). However, the rangeBounds variable in RangePartitioner should be serialized with the user specified serializer. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (SPARK-2104) RangePartitioner should use user specified serializer to serialize range bounds
[ https://issues.apache.org/jira/browse/SPARK-2104?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14045524#comment-14045524 ] Saisai Shao commented on SPARK-2104: OK, got it. Thanks a lot RangePartitioner should use user specified serializer to serialize range bounds --- Key: SPARK-2104 URL: https://issues.apache.org/jira/browse/SPARK-2104 Project: Spark Issue Type: Bug Reporter: Reynold Xin Assignee: Saisai Shao Otherwise it is pretty annoying to do a sort on types that are not java serializable. To reproduce, just set the serializer to Kryo, and run the following job: {code} class JavaNonSerializableClass extends Comparable { override def compareTo(o: JavaNonSerializableClass) = 0 } sc.parallelize(Seq(new JavaNonSerializableClass, new JavaNonSerializableClass), 2).map(x = (x,x)).sortByKey() {code} Basically the partitioner will always be serialized using Java (by the task closure serializer). However, the rangeBounds variable in RangePartitioner should be serialized with the user specified serializer. -- This message was sent by Atlassian JIRA (v6.2#6252)