[GitHub] spark pull request #21859: [SPARK-24900][SQL]Speed up sort when the dataset ...

2018-10-23 Thread dongjoon-hyun
Github user dongjoon-hyun commented on a diff in the pull request:

https://github.com/apache/spark/pull/21859#discussion_r227592853
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/SmallDataSortBenchmark.scala
 ---
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.benchmark
+
+import java.io.File
+
+import org.apache.spark.SparkConf
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.util.{Benchmark, Utils}
+
+/**
+ * The benchmarks aims to measure performance of
+ * [SPARK-24900][SQL]speed up sort when the dataset is small
+ */
+object SmallDataSortBenchmark {
+
+  val conf = new SparkConf()
+
+  val spark = SparkSession.builder
+.master("local[1]")
+.appName("speed up sort when the dataset is small")
+.config(conf)
+.getOrCreate()
+
+  import spark.implicits._
+
+  def withTempPath(f: File => Unit): Unit = {
+val path = Utils.createTempDir()
+path.delete()
+try f(path) finally Utils.deleteRecursively(path)
+  }
+
+  def run(rowsNum: Int): Unit = {
+val factor = 1000
+val key = rowsNum / 2
+val benchmark = new Benchmark("small data sort", rowsNum * factor)
+withTempPath { path =>
+  // scalastyle:off println
+  benchmark.out.println("Preparing data for benchmarking ...")
+  // scalastyle:on println
+
+  val list = (0 to factor).toList
+
+  spark.sparkContext.range(0, rowsNum, 1)
+.flatMap(num => {
+  list.map(x => (num, x))
+})
+.toDF("key", "value")
+.write
+.option("encoding", "UTF-8")
+.json(path.getAbsolutePath)
+
+  val dataset = spark.read.json(path.getAbsolutePath)
+
+  dataset.createOrReplaceTempView("src")
+
+  benchmark.addCase("with optimization", 10) { _ =>
+// 334 * 3 > 1000, the optimization works
+spark.conf.set("spark.sql.shuffle.partitions", 
dataset.rdd.getNumPartitions)
+
spark.conf.set("spark.sql.execution.rangeExchange.sampleSizePerPartition", 
"334")
+val result = spark.
+  sql(s"select * from src where key = $key order by 
value").collectAsList().size()
+
+  }
+
+  benchmark.addCase("without optimization", 10) { _ =>
+// 333 * 3 < 1000, the optimization doesn't work
+spark.conf.set("spark.sql.shuffle.partitions", 
dataset.rdd.getNumPartitions)
+
spark.conf.set("spark.sql.execution.rangeExchange.sampleSizePerPartition", 
"333")
+val result = spark.
+  sql(s"select * from src where key = $key order by 
value").collectAsList().size()
+
+  }
+
+  /*
+   * Java HotSpot(TM) 64-Bit Server VM 1.8.0_91-b14 on Mac OS X 10.13.6
+   * Intel(R) Core(TM) i5-5257U CPU @ 2.70GHz
+   *
+   * small data sort:   Best/Avg Time(ms)Rate(M/s)   Per 
Row(ns)   Relative
+   * 
--
+   * with optimization 54077 / 57989  1.8 
540.8   1.0X
+   * without optimization111780 / 115001  0.9
1117.8   0.5X
--- End diff --

For the benchmark, we usually put the baseline result at the first line. 
Could you switch line 71~78 and 80~87?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21859: [SPARK-24900][SQL]Speed up sort when the dataset ...

2018-08-29 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21859#discussion_r213560104
  
--- Diff: core/src/main/scala/org/apache/spark/Partitioner.scala ---
@@ -138,7 +138,8 @@ class RangePartitioner[K : Ordering : ClassTag, V](
 partitions: Int,
 rdd: RDD[_ <: Product2[K, V]],
 private var ascending: Boolean = true,
-val samplePointsPerPartitionHint: Int = 20)
+val samplePointsPerPartitionHint: Int = 20,
+needCacheSample: Boolean = false)
--- End diff --

can we create a different `RangePartitioner` in Spark SQL? it's a little 
weird to make a change in the core module which only make sense in the SQL 
module.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21859: [SPARK-24900][SQL]Speed up sort when the dataset ...

2018-08-20 Thread sddyljsx
Github user sddyljsx commented on a diff in the pull request:

https://github.com/apache/spark/pull/21859#discussion_r211270748
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---
@@ -1207,6 +1207,13 @@ object SQLConf {
   .intConf
   .createWithDefault(100)
 
+  val RANGE_EXCHANGE_SAMPLE_CACHE_ENABLE =
+buildConf("spark.sql.execution.rangeExchange.sampleCache.enabled")
--- End diff --

Yes. The RangePartitioner is mainly used for sorting. Not only in Spark 
Sql, it is used in many other places. There is no need for config here. I 
removed the config, and the optimization will work by default.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21859: [SPARK-24900][SQL]Speed up sort when the dataset ...

2018-08-20 Thread sddyljsx
Github user sddyljsx commented on a diff in the pull request:

https://github.com/apache/spark/pull/21859#discussion_r211230520
  
--- Diff: core/src/main/scala/org/apache/spark/Partitioner.scala ---
@@ -166,9 +169,20 @@ class RangePartitioner[K : Ordering : ClassTag, V](
   // Assume the input partitions are roughly balanced and over-sample 
a little bit.
   val sampleSizePerPartition = math.ceil(3.0 * sampleSize / 
rdd.partitions.length).toInt
   val (numItems, sketched) = RangePartitioner.sketch(rdd.map(_._1), 
sampleSizePerPartition)
+  val numSampled = sketched.map(_._3.length).sum
   if (numItems == 0L) {
 Array.empty
   } else {
+// already got the whole data
+if (sampleCacheEnabled && numItems == numSampled) {
+  // get the sampled data
+  sampledArray = new Array[K](numSampled)
+  var curPos = 0
+  sketched.foreach(_._3.foreach(sampleRow => {
--- End diff --

We need the sample data to form a rdd for the next step.
I can't think of a better idea  than merging  the sample data arrays from 
each partition to one array and parallelizing it to a rdd:
```
sparkContext.parallelize(partitioner.getSampledArray.toSeq, 
rdd.getNumPartitions)
```



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21859: [SPARK-24900][SQL]Speed up sort when the dataset ...

2018-08-20 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21859#discussion_r211168107
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---
@@ -1207,6 +1207,13 @@ object SQLConf {
   .intConf
   .createWithDefault(100)
 
+  val RANGE_EXCHANGE_SAMPLE_CACHE_ENABLE =
+buildConf("spark.sql.execution.rangeExchange.sampleCache.enabled")
--- End diff --

I think this is a feature of Spark core instead of Spark SQL. 
`RangePartitioner` is in Spark core and we can apply this optimization to 
`RDD.sort` as well


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21859: [SPARK-24900][SQL]Speed up sort when the dataset ...

2018-08-20 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21859#discussion_r211167603
  
--- Diff: core/src/main/scala/org/apache/spark/Partitioner.scala ---
@@ -166,9 +169,20 @@ class RangePartitioner[K : Ordering : ClassTag, V](
   // Assume the input partitions are roughly balanced and over-sample 
a little bit.
   val sampleSizePerPartition = math.ceil(3.0 * sampleSize / 
rdd.partitions.length).toInt
   val (numItems, sketched) = RangePartitioner.sketch(rdd.map(_._1), 
sampleSizePerPartition)
+  val numSampled = sketched.map(_._3.length).sum
   if (numItems == 0L) {
 Array.empty
   } else {
+// already got the whole data
+if (sampleCacheEnabled && numItems == numSampled) {
+  // get the sampled data
+  sampledArray = new Array[K](numSampled)
+  var curPos = 0
+  sketched.foreach(_._3.foreach(sampleRow => {
--- End diff --

can't we just keep the `sketched` instead of copying it to a new array?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21859: [SPARK-24900][SQL]Speed up sort when the dataset ...

2018-08-20 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21859#discussion_r211165654
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---
@@ -1207,6 +1207,13 @@ object SQLConf {
   .intConf
   .createWithDefault(100)
 
+  val RANGE_EXCHANGE_SAMPLE_CACHE_ENABLE =
+buildConf("spark.sql.execution.rangeExchange.sampleCache.enabled")
+  .doc("When true, if the sampled data in range partitioning covers 
the whole data, " +
+"the sampled data will be cached for the next sort.")
+  .booleanConf
+  .createWithDefault(true)
--- End diff --

Do we expect users to tune it?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21859: [SPARK-24900][SQL]Speed up sort when the dataset ...

2018-08-19 Thread sddyljsx
Github user sddyljsx commented on a diff in the pull request:

https://github.com/apache/spark/pull/21859#discussion_r211131380
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---
@@ -1207,6 +1207,13 @@ object SQLConf {
   .intConf
   .createWithDefault(100)
 
+  val RANGE_EXCHANGE_SAMPLE_CACHE_ENABLE =
+buildConf("spark.sql.execution.rangeExchange.sampleCache.enabled")
+  .doc("When true, if the sampled data in range partitioning covers 
the whole data, " +
+"the sampled data will be cached for the next sort.")
+  .booleanConf
+  .createWithDefault(true)
--- End diff --

I am also somewhat uncertain. What do you think of it?
@kiszk 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21859: [SPARK-24900][SQL]Speed up sort when the dataset ...

2018-08-19 Thread sddyljsx
Github user sddyljsx commented on a diff in the pull request:

https://github.com/apache/spark/pull/21859#discussion_r211131294
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---
@@ -1207,6 +1207,13 @@ object SQLConf {
   .intConf
   .createWithDefault(100)
 
+  val RANGE_EXCHANGE_SAMPLE_CACHE_ENABLE =
+buildConf("spark.sql.execution.rangeExchange.sampleCache.enabled")
+  .doc("When true, if the sampled data in range partitioning covers 
the whole data, " +
+"the sampled data will be cached for the next sort.")
+  .booleanConf
+  .createWithDefault(true)
--- End diff --

I am also somewhat uncertain. What do you think of it?



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21859: [SPARK-24900][SQL]Speed up sort when the dataset ...

2018-08-19 Thread sddyljsx
Github user sddyljsx commented on a diff in the pull request:

https://github.com/apache/spark/pull/21859#discussion_r211130877
  
--- Diff: core/src/main/scala/org/apache/spark/Partitioner.scala ---
@@ -155,6 +156,8 @@ class RangePartitioner[K : Ordering : ClassTag, V](
 
   private var ordering = implicitly[Ordering[K]]
 
+  private var sampledArray: Array[K] = _
--- End diff --

Yes, this will be better


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21859: [SPARK-24900][SQL]Speed up sort when the dataset ...

2018-08-19 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/21859#discussion_r211122674
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---
@@ -1207,6 +1207,13 @@ object SQLConf {
   .intConf
   .createWithDefault(100)
 
+  val RANGE_EXCHANGE_SAMPLE_CACHE_ENABLE =
+buildConf("spark.sql.execution.rangeExchange.sampleCache.enabled")
+  .doc("When true, if the sampled data in range partitioning covers 
the whole data, " +
+"the sampled data will be cached for the next sort.")
+  .booleanConf
+  .createWithDefault(true)
--- End diff --

Should it be an internal config?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21859: [SPARK-24900][SQL]Speed up sort when the dataset ...

2018-08-19 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/21859#discussion_r211122076
  
--- Diff: core/src/main/scala/org/apache/spark/Partitioner.scala ---
@@ -155,6 +156,8 @@ class RangePartitioner[K : Ordering : ClassTag, V](
 
   private var ordering = implicitly[Ordering[K]]
 
+  private var sampledArray: Array[K] = _
--- End diff --

I think this should be `@transient`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21859: [SPARK-24900][SQL]Speed up sort when the dataset ...

2018-08-12 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/21859#discussion_r209486478
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala
 ---
@@ -294,7 +296,12 @@ object ShuffleExchangeExec {
   sorter.sort(iter.asInstanceOf[Iterator[UnsafeRow]])
 }
   } else {
-rdd
+part match {
+  case partitioner: RangePartitioner[InternalRow @unchecked, _]
--- End diff --

Ok. I see.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21859: [SPARK-24900][SQL]Speed up sort when the dataset ...

2018-08-12 Thread sddyljsx
Github user sddyljsx commented on a diff in the pull request:

https://github.com/apache/spark/pull/21859#discussion_r209486199
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala
 ---
@@ -294,7 +296,12 @@ object ShuffleExchangeExec {
   sorter.sort(iter.asInstanceOf[Iterator[UnsafeRow]])
 }
   } else {
-rdd
+part match {
+  case partitioner: RangePartitioner[InternalRow @unchecked, _]
--- End diff --

yes, but 
```
def parallelize[T: ClassTag](
  seq: Seq[T],
  numSlices: Int = defaultParallelism): RDD[T] = withScope {
assertNotStopped()
new ParallelCollectionRDD[T](this, seq, numSlices, Map[Int, 
Seq[String]]())
  }
```
the parallelize function needs this ClassTag, so we must match it here.
I tried to match RangePartitioner[_, _], but there is an error:
```
Error:(302, 37) No ClassTag available for _
Error occurred in an application involving default arguments.
sparkContext.parallelize(partitioner.getSampledArray.toSeq, 
rdd.getNumPartitions)
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21859: [SPARK-24900][SQL]Speed up sort when the dataset ...

2018-08-11 Thread sddyljsx
Github user sddyljsx commented on a diff in the pull request:

https://github.com/apache/spark/pull/21859#discussion_r209420016
  
--- Diff: core/src/main/scala/org/apache/spark/Partitioner.scala ---
@@ -166,9 +169,17 @@ class RangePartitioner[K : Ordering : ClassTag, V](
   // Assume the input partitions are roughly balanced and over-sample 
a little bit.
   val sampleSizePerPartition = math.ceil(3.0 * sampleSize / 
rdd.partitions.length).toInt
   val (numItems, sketched) = RangePartitioner.sketch(rdd.map(_._1), 
sampleSizePerPartition)
+  val numSampled = sketched.map(_._3.length).sum
   if (numItems == 0L) {
 Array.empty
   } else {
+// already got the whole data
+if (sampleCacheEnabled && numItems == numSampled) {
+  // get the sampled data
+  sampledArray = sketched.foldLeft(Array.empty[K])((total, sample) 
=> {
--- End diff --

YES. I have pushed the improvement


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21859: [SPARK-24900][SQL]Speed up sort when the dataset ...

2018-08-11 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/21859#discussion_r209418749
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala
 ---
@@ -294,7 +296,12 @@ object ShuffleExchangeExec {
   sorter.sort(iter.asInstanceOf[Iterator[UnsafeRow]])
 }
   } else {
-rdd
+part match {
+  case partitioner: RangePartitioner[InternalRow @unchecked, _]
--- End diff --

I think we don't get `RangePartitioner` with other type argument than 
`InternalRow`, right?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21859: [SPARK-24900][SQL]Speed up sort when the dataset ...

2018-08-11 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/21859#discussion_r209418206
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala
 ---
@@ -294,7 +296,12 @@ object ShuffleExchangeExec {
   sorter.sort(iter.asInstanceOf[Iterator[UnsafeRow]])
 }
   } else {
-rdd
+part match {
+  case partitioner: RangePartitioner[InternalRow @unchecked, _]
+if partitioner.getSampledArray != null =>
+sparkContext.parallelize(partitioner.getSampledArray.toSeq, 
rdd.getNumPartitions)
--- End diff --

Ah, you are right. The number of partition is not specified here.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21859: [SPARK-24900][SQL]Speed up sort when the dataset ...

2018-08-11 Thread sddyljsx
Github user sddyljsx commented on a diff in the pull request:

https://github.com/apache/spark/pull/21859#discussion_r209418116
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala
 ---
@@ -294,7 +296,12 @@ object ShuffleExchangeExec {
   sorter.sort(iter.asInstanceOf[Iterator[UnsafeRow]])
 }
   } else {
-rdd
+part match {
+  case partitioner: RangePartitioner[InternalRow @unchecked, _]
+if partitioner.getSampledArray != null =>
+sparkContext.parallelize(partitioner.getSampledArray.toSeq, 
rdd.getNumPartitions)
--- End diff --

every partitionId is in the expected range [0, part.numPartitions - 1]


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21859: [SPARK-24900][SQL]Speed up sort when the dataset ...

2018-08-11 Thread sddyljsx
Github user sddyljsx commented on a diff in the pull request:

https://github.com/apache/spark/pull/21859#discussion_r209418058
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala
 ---
@@ -294,7 +296,12 @@ object ShuffleExchangeExec {
   sorter.sort(iter.asInstanceOf[Iterator[UnsafeRow]])
 }
   } else {
-rdd
+part match {
+  case partitioner: RangePartitioner[InternalRow @unchecked, _]
+if partitioner.getSampledArray != null =>
+sparkContext.parallelize(partitioner.getSampledArray.toSeq, 
rdd.getNumPartitions)
--- End diff --

```
newRdd.mapPartitionsInternal { iter =>
  val getPartitionKey = getPartitionKeyExtractor()
  val mutablePair = new MutablePair[Int, InternalRow]()
  iter.map { row => 
mutablePair.update(part.getPartition(getPartitionKey(row)), row) }
```
the newRdd uses the partitioner actually. It will map row to (partitionId, 
row) for the further shuffle


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21859: [SPARK-24900][SQL]Speed up sort when the dataset ...

2018-08-11 Thread sddyljsx
Github user sddyljsx commented on a diff in the pull request:

https://github.com/apache/spark/pull/21859#discussion_r209417745
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala
 ---
@@ -294,7 +296,12 @@ object ShuffleExchangeExec {
   sorter.sort(iter.asInstanceOf[Iterator[UnsafeRow]])
 }
   } else {
-rdd
+part match {
+  case partitioner: RangePartitioner[InternalRow @unchecked, _]
+if partitioner.getSampledArray != null =>
+sparkContext.parallelize(partitioner.getSampledArray.toSeq, 
rdd.getNumPartitions)
--- End diff --

The parallelized sampled data will be assigned to the newRdd.  the newRdd 
will execute:
```
newRdd.mapPartitionsInternal { iter =>
  val getPartitionKey = getPartitionKeyExtractor()
  val mutablePair = new MutablePair[Int, InternalRow]()
  iter.map { row => 
mutablePair.update(part.getPartition(getPartitionKey(row)), row) }
```
the newRdd uses the partitioner actually. It will map row to (partitionId, 
row) for the further shuffle


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21859: [SPARK-24900][SQL]Speed up sort when the dataset ...

2018-08-11 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/21859#discussion_r209417666
  
--- Diff: core/src/main/scala/org/apache/spark/Partitioner.scala ---
@@ -166,9 +169,17 @@ class RangePartitioner[K : Ordering : ClassTag, V](
   // Assume the input partitions are roughly balanced and over-sample 
a little bit.
   val sampleSizePerPartition = math.ceil(3.0 * sampleSize / 
rdd.partitions.length).toInt
   val (numItems, sketched) = RangePartitioner.sketch(rdd.map(_._1), 
sampleSizePerPartition)
+  val numSampled = sketched.map(_._3.length).sum
   if (numItems == 0L) {
 Array.empty
   } else {
+// already got the whole data
+if (sampleCacheEnabled && numItems == numSampled) {
+  // get the sampled data
+  sampledArray = sketched.foldLeft(Array.empty[K])((total, sample) 
=> {
--- End diff --

You don't need to always create the array. You can create here but allocate 
all needed space at once, instead of growing it incrementally.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21859: [SPARK-24900][SQL]Speed up sort when the dataset ...

2018-08-11 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/21859#discussion_r209417629
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala
 ---
@@ -294,7 +296,12 @@ object ShuffleExchangeExec {
   sorter.sort(iter.asInstanceOf[Iterator[UnsafeRow]])
 }
   } else {
-rdd
+part match {
+  case partitioner: RangePartitioner[InternalRow @unchecked, _]
+if partitioner.getSampledArray != null =>
+sparkContext.parallelize(partitioner.getSampledArray.toSeq, 
rdd.getNumPartitions)
--- End diff --

Then you will change the number of partitions this shuffle requires.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21859: [SPARK-24900][SQL]Speed up sort when the dataset ...

2018-08-10 Thread sddyljsx
Github user sddyljsx commented on a diff in the pull request:

https://github.com/apache/spark/pull/21859#discussion_r209417551
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala
 ---
@@ -294,7 +296,12 @@ object ShuffleExchangeExec {
   sorter.sort(iter.asInstanceOf[Iterator[UnsafeRow]])
 }
   } else {
-rdd
+part match {
+  case partitioner: RangePartitioner[InternalRow @unchecked, _]
+if partitioner.getSampledArray != null =>
+sparkContext.parallelize(partitioner.getSampledArray.toSeq, 
rdd.getNumPartitions)
--- End diff --

```
part match {
  case partitioner: RangePartitioner[InternalRow @unchecked, _]
if partitioner.getSampledArray != null =>
sparkContext.parallelize(partitioner.getSampledArray.toSeq, 
rdd.getNumPartitions)
  case _ => rdd
}
```
When the optimization works, It will return the parallelized sampled data 
instead of the rdd. So I keep the number of the partitions same as the rdd's 
here


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21859: [SPARK-24900][SQL]Speed up sort when the dataset ...

2018-08-10 Thread sddyljsx
Github user sddyljsx commented on a diff in the pull request:

https://github.com/apache/spark/pull/21859#discussion_r209417115
  
--- Diff: core/src/main/scala/org/apache/spark/Partitioner.scala ---
@@ -166,9 +169,17 @@ class RangePartitioner[K : Ordering : ClassTag, V](
   // Assume the input partitions are roughly balanced and over-sample 
a little bit.
   val sampleSizePerPartition = math.ceil(3.0 * sampleSize / 
rdd.partitions.length).toInt
   val (numItems, sketched) = RangePartitioner.sketch(rdd.map(_._1), 
sampleSizePerPartition)
+  val numSampled = sketched.map(_._3.length).sum
   if (numItems == 0L) {
 Array.empty
   } else {
+// already got the whole data
+if (sampleCacheEnabled && numItems == numSampled) {
+  // get the sampled data
+  sampledArray = sketched.foldLeft(Array.empty[K])((total, sample) 
=> {
--- End diff --

as @kiszk suggests in his review: 

Do we need to always create sampledArray and to store into var? It may lead 
to overhead when the execution would go to L182.
It would be good to calculate only length here and to create the array at 
L179.

maybe allocate it when necessary is a better choice


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21859: [SPARK-24900][SQL]Speed up sort when the dataset ...

2018-08-10 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/21859#discussion_r209410871
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala
 ---
@@ -294,7 +296,12 @@ object ShuffleExchangeExec {
   sorter.sort(iter.asInstanceOf[Iterator[UnsafeRow]])
 }
   } else {
-rdd
+part match {
+  case partitioner: RangePartitioner[InternalRow @unchecked, _]
+if partitioner.getSampledArray != null =>
+sparkContext.parallelize(partitioner.getSampledArray.toSeq, 
rdd.getNumPartitions)
--- End diff --

When you just parallelize sampled data, it might not have required 
partitioning (rang partitioning), doesn't it affect the result?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21859: [SPARK-24900][SQL]Speed up sort when the dataset ...

2018-08-10 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/21859#discussion_r209407875
  
--- Diff: core/src/main/scala/org/apache/spark/Partitioner.scala ---
@@ -166,9 +169,17 @@ class RangePartitioner[K : Ordering : ClassTag, V](
   // Assume the input partitions are roughly balanced and over-sample 
a little bit.
   val sampleSizePerPartition = math.ceil(3.0 * sampleSize / 
rdd.partitions.length).toInt
   val (numItems, sketched) = RangePartitioner.sketch(rdd.map(_._1), 
sampleSizePerPartition)
+  val numSampled = sketched.map(_._3.length).sum
   if (numItems == 0L) {
 Array.empty
   } else {
+// already got the whole data
+if (sampleCacheEnabled && numItems == numSampled) {
+  // get the sampled data
+  sampledArray = sketched.foldLeft(Array.empty[K])((total, sample) 
=> {
--- End diff --

As you already know the size of `sampledArray`, maybe you can allocate it 
at once in advance.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21859: [SPARK-24900][SQL]Speed up sort when the dataset ...

2018-08-10 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/21859#discussion_r209408395
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala
 ---
@@ -294,7 +296,12 @@ object ShuffleExchangeExec {
   sorter.sort(iter.asInstanceOf[Iterator[UnsafeRow]])
 }
   } else {
-rdd
+part match {
+  case partitioner: RangePartitioner[InternalRow @unchecked, _]
+if partitioner.getSampledArray != null =>
+sparkContext.parallelize(partitioner.getSampledArray.toSeq, 
rdd.getNumPartitions)
--- End diff --

Instead of `rdd.getNumPartitions`, I think we should use 
`partitioner.numPartitions`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21859: [SPARK-24900][SQL]Speed up sort when the dataset ...

2018-08-08 Thread sddyljsx
Github user sddyljsx commented on a diff in the pull request:

https://github.com/apache/spark/pull/21859#discussion_r208804032
  
--- Diff: core/src/main/scala/org/apache/spark/Partitioner.scala ---
@@ -166,7 +169,16 @@ class RangePartitioner[K : Ordering : ClassTag, V](
   // Assume the input partitions are roughly balanced and over-sample 
a little bit.
   val sampleSizePerPartition = math.ceil(3.0 * sampleSize / 
rdd.partitions.length).toInt
   val (numItems, sketched) = RangePartitioner.sketch(rdd.map(_._1), 
sampleSizePerPartition)
-  if (numItems == 0L) {
+  val numSampled = sketched.map(_._3.length).sum
+  if (numItems == 0) {
+Array.empty
+  }
+  // already got the whole data
+  else if (sampleCacheEnabled && numItems == numSampled) {
--- End diff --

fixed


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21859: [SPARK-24900][SQL]Speed up sort when the dataset ...

2018-08-08 Thread sddyljsx
Github user sddyljsx commented on a diff in the pull request:

https://github.com/apache/spark/pull/21859#discussion_r208803622
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala
 ---
@@ -294,7 +296,12 @@ object ShuffleExchangeExec {
   sorter.sort(iter.asInstanceOf[Iterator[UnsafeRow]])
 }
   } else {
-rdd
+part match {
+  case partitioner: RangePartitioner[InternalRow @unchecked, _]
--- End diff --

```
def prepareShuffleDependency(
  rdd: RDD[InternalRow],
  outputAttributes: Seq[Attribute],
  newPartitioning: Partitioning,
  serializer: Serializer,
  sparkContext: SparkContext = null): ShuffleDependency[Int, 
InternalRow, InternalRow] = {
```
the rdd is defined as RDD[InternalRow], so we must match type argument here


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21859: [SPARK-24900][SQL]Speed up sort when the dataset ...

2018-08-08 Thread sddyljsx
Github user sddyljsx commented on a diff in the pull request:

https://github.com/apache/spark/pull/21859#discussion_r208803055
  
--- Diff: core/src/main/scala/org/apache/spark/Partitioner.scala ---
@@ -166,7 +169,16 @@ class RangePartitioner[K : Ordering : ClassTag, V](
   // Assume the input partitions are roughly balanced and over-sample 
a little bit.
   val sampleSizePerPartition = math.ceil(3.0 * sampleSize / 
rdd.partitions.length).toInt
   val (numItems, sketched) = RangePartitioner.sketch(rdd.map(_._1), 
sampleSizePerPartition)
-  if (numItems == 0L) {
+  val numSampled = sketched.map(_._3.length).sum
+  if (numItems == 0) {
+Array.empty
+  }
+  // already got the whole data
+  else if (sampleCacheEnabled && numItems == numSampled) {
+// get the sampled data
+sampledArray = sketched.foldLeft(Array.empty[K])((total, sample) 
=> {
+  total ++ sample._3
+})
 Array.empty
--- End diff --

it may cause errors, I have fixed it


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21859: [SPARK-24900][SQL]Speed up sort when the dataset ...

2018-08-08 Thread sddyljsx
Github user sddyljsx commented on a diff in the pull request:

https://github.com/apache/spark/pull/21859#discussion_r208801641
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala
 ---
@@ -294,7 +296,12 @@ object ShuffleExchangeExec {
   sorter.sort(iter.asInstanceOf[Iterator[UnsafeRow]])
 }
   } else {
-rdd
+part match {
+  case partitioner: RangePartitioner[InternalRow @unchecked, _]
+if partitioner.getSampledArray != null =>
+sparkContext.parallelize(partitioner.getSampledArray.toSeq)
--- End diff --

change to : sparkContext.parallelize(partitioner.getSampledArray.toSeq, 
rdd.getNumPartitions)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21859: [SPARK-24900][SQL]Speed up sort when the dataset ...

2018-08-08 Thread sddyljsx
Github user sddyljsx commented on a diff in the pull request:

https://github.com/apache/spark/pull/21859#discussion_r208801492
  
--- Diff: sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala 
---
@@ -2799,6 +2799,26 @@ class SQLQuerySuite extends QueryTest with 
SharedSQLContext {
 }
   }
 
+  test("SPARK-24900: speed up sort when the dataset is small") {
--- End diff --

UT for RangePartitioner has been added.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21859: [SPARK-24900][SQL]Speed up sort when the dataset ...

2018-08-08 Thread sddyljsx
Github user sddyljsx commented on a diff in the pull request:

https://github.com/apache/spark/pull/21859#discussion_r208801520
  
--- Diff: sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala 
---
@@ -2799,6 +2799,26 @@ class SQLQuerySuite extends QueryTest with 
SharedSQLContext {
 }
   }
 
+  test("SPARK-24900: speed up sort when the dataset is small") {
+withTempView("src") {
+  // the optimization works
--- End diff --

UT for RangePartitioner has been added.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21859: [SPARK-24900][SQL]Speed up sort when the dataset ...

2018-08-08 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/21859#discussion_r208775191
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala
 ---
@@ -294,7 +296,12 @@ object ShuffleExchangeExec {
   sorter.sort(iter.asInstanceOf[Iterator[UnsafeRow]])
 }
   } else {
-rdd
+part match {
+  case partitioner: RangePartitioner[InternalRow @unchecked, _]
--- End diff --

Do we need to match type argument of `RangePartitioner`?

Maybe just:

```scala
part match {
  case partitioner: RangePartitioner[_, _] if partitioner.getSampledArray 
!= null =>
...
  case _ => rdd
}
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21859: [SPARK-24900][SQL]Speed up sort when the dataset ...

2018-08-08 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/21859#discussion_r208775694
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala
 ---
@@ -294,7 +296,12 @@ object ShuffleExchangeExec {
   sorter.sort(iter.asInstanceOf[Iterator[UnsafeRow]])
 }
   } else {
-rdd
+part match {
+  case partitioner: RangePartitioner[InternalRow @unchecked, _]
+if partitioner.getSampledArray != null =>
+sparkContext.parallelize(partitioner.getSampledArray.toSeq)
--- End diff --

This should parallelize the sampled array to required partition number too.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21859: [SPARK-24900][SQL]Speed up sort when the dataset ...

2018-08-08 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/21859#discussion_r208774474
  
--- Diff: sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala 
---
@@ -2799,6 +2799,26 @@ class SQLQuerySuite extends QueryTest with 
SharedSQLContext {
 }
   }
 
+  test("SPARK-24900: speed up sort when the dataset is small") {
+withTempView("src") {
+  // the optimization works
--- End diff --

It is hard to tell if this optimization actually works from this test. Can 
we check if `getSampledArray` returns correct data?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21859: [SPARK-24900][SQL]Speed up sort when the dataset ...

2018-08-08 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/21859#discussion_r208775584
  
--- Diff: core/src/main/scala/org/apache/spark/Partitioner.scala ---
@@ -166,7 +169,16 @@ class RangePartitioner[K : Ordering : ClassTag, V](
   // Assume the input partitions are roughly balanced and over-sample 
a little bit.
   val sampleSizePerPartition = math.ceil(3.0 * sampleSize / 
rdd.partitions.length).toInt
   val (numItems, sketched) = RangePartitioner.sketch(rdd.map(_._1), 
sampleSizePerPartition)
-  if (numItems == 0L) {
+  val numSampled = sketched.map(_._3.length).sum
+  if (numItems == 0) {
+Array.empty
+  }
+  // already got the whole data
+  else if (sampleCacheEnabled && numItems == numSampled) {
+// get the sampled data
+sampledArray = sketched.foldLeft(Array.empty[K])((total, sample) 
=> {
+  total ++ sample._3
+})
 Array.empty
--- End diff --

Returning `Array.empty` here will result in single partition.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21859: [SPARK-24900][SQL]Speed up sort when the dataset ...

2018-08-08 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/21859#discussion_r208774179
  
--- Diff: sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala 
---
@@ -2799,6 +2799,26 @@ class SQLQuerySuite extends QueryTest with 
SharedSQLContext {
 }
   }
 
+  test("SPARK-24900: speed up sort when the dataset is small") {
--- End diff --

We may need unit test for `RangePartitioner` to test it correctly works 
with this change.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21859: [SPARK-24900][SQL]Speed up sort when the dataset ...

2018-08-08 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/21859#discussion_r208770924
  
--- Diff: core/src/main/scala/org/apache/spark/Partitioner.scala ---
@@ -166,7 +169,16 @@ class RangePartitioner[K : Ordering : ClassTag, V](
   // Assume the input partitions are roughly balanced and over-sample 
a little bit.
   val sampleSizePerPartition = math.ceil(3.0 * sampleSize / 
rdd.partitions.length).toInt
   val (numItems, sketched) = RangePartitioner.sketch(rdd.map(_._1), 
sampleSizePerPartition)
-  if (numItems == 0L) {
+  val numSampled = sketched.map(_._3.length).sum
+  if (numItems == 0) {
+Array.empty
+  }
+  // already got the whole data
+  else if (sampleCacheEnabled && numItems == numSampled) {
--- End diff --

This is style issue.

```scala
if (numItems == 0) {
} else if {
  ...
}
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21859: [SPARK-24900][SQL]Speed up sort when the dataset ...

2018-08-07 Thread sddyljsx
Github user sddyljsx commented on a diff in the pull request:

https://github.com/apache/spark/pull/21859#discussion_r208441135
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/SmallDataSortBenchmark.scala
 ---
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.benchmark
+
+import java.io.File
+
+import org.apache.spark.SparkConf
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.util.{Benchmark, Utils}
+
+/**
+  * The benchmarks aims to measure performance of
+  * [SPARK-24900][SQL]speed up sort when the dataset is small
+  */
+object SmallDataSortBenchmark {
+
+  val conf = new SparkConf()
+
+  val spark = SparkSession.builder
+.master("local[1]")
+.appName("speed up sort when the dataset is small")
+.config(conf)
+.getOrCreate()
+
+  import spark.implicits._
+
+  def withTempPath(f: File => Unit): Unit = {
+val path = Utils.createTempDir()
+path.delete()
+try f(path) finally Utils.deleteRecursively(path)
+  }
+
+  def run(rowsNum: Int): Unit = {
+val factor = 1000
+val key = rowsNum / 2
+val benchmark = new Benchmark("speed up sort when the dataset is 
small", rowsNum * factor)
+withTempPath { path =>
+  // scalastyle:off println
+  benchmark.out.println("Preparing data for benchmarking ...")
+  // scalastyle:on println
+
+  val list = (0 to factor).toList
+
+  spark.sparkContext.range(0, rowsNum, 1)
+.flatMap(num => {
+  list.map(x => (num, x))
+})
+.toDF("key", "value")
+.write
+.option("encoding", "UTF-8")
+.json(path.getAbsolutePath)
+
+  val dataset = spark.read.json(path.getAbsolutePath)
+
+  dataset.createOrReplaceTempView("src")
+
+  benchmark.addCase("sort with optimization", 10) { _ =>
+
spark.conf.set("spark.sql.execution.rangeExchange.sampleCache.enabled", "true")
+val result = spark.
+  sql(s"select * from src where key = $key order by 
value").collectAsList().size()
+
+  }
+
+  benchmark.addCase("sort without optimization", 10) { _ =>
+
spark.conf.set("spark.sql.execution.rangeExchange.sampleCache.enabled", "false")
+val result = spark.
+  sql(s"select * from src where key = $key order by 
value").collectAsList().size()
+
+  }
+
+  benchmark.run()
+}
+
--- End diff --

I have removed them.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21859: [SPARK-24900][SQL]Speed up sort when the dataset ...

2018-08-07 Thread sddyljsx
Github user sddyljsx commented on a diff in the pull request:

https://github.com/apache/spark/pull/21859#discussion_r208441067
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/SmallDataSortBenchmark.scala
 ---
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.benchmark
+
+import java.io.File
+
+import org.apache.spark.SparkConf
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.util.{Benchmark, Utils}
+
+/**
+  * The benchmarks aims to measure performance of
+  * [SPARK-24900][SQL]speed up sort when the dataset is small
+  */
+object SmallDataSortBenchmark {
+
+  val conf = new SparkConf()
+
+  val spark = SparkSession.builder
+.master("local[1]")
+.appName("speed up sort when the dataset is small")
+.config(conf)
+.getOrCreate()
+
+  import spark.implicits._
+
+  def withTempPath(f: File => Unit): Unit = {
+val path = Utils.createTempDir()
+path.delete()
+try f(path) finally Utils.deleteRecursively(path)
+  }
+
+  def run(rowsNum: Int): Unit = {
+val factor = 1000
+val key = rowsNum / 2
+val benchmark = new Benchmark("speed up sort when the dataset is 
small", rowsNum * factor)
+withTempPath { path =>
+  // scalastyle:off println
+  benchmark.out.println("Preparing data for benchmarking ...")
+  // scalastyle:on println
+
+  val list = (0 to factor).toList
+
+  spark.sparkContext.range(0, rowsNum, 1)
+.flatMap(num => {
+  list.map(x => (num, x))
+})
+.toDF("key", "value")
+.write
+.option("encoding", "UTF-8")
+.json(path.getAbsolutePath)
+
+  benchmark.addCase("sort", 10) { _ =>
+val dataset = spark.read.json(path.getAbsolutePath)
+dataset.createOrReplaceTempView("src")
+val result = spark.
+  sql(s"select * from src where key = $key order by 
value").collectAsList().size()
+
+  }
+
+  benchmark.run()
--- End diff --

Yes, I forgot it. I have added it.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21859: [SPARK-24900][SQL]Speed up sort when the dataset ...

2018-08-07 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/21859#discussion_r208437010
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/SmallDataSortBenchmark.scala
 ---
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.benchmark
+
+import java.io.File
+
+import org.apache.spark.SparkConf
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.util.{Benchmark, Utils}
+
+/**
+  * The benchmarks aims to measure performance of
+  * [SPARK-24900][SQL]speed up sort when the dataset is small
+  */
+object SmallDataSortBenchmark {
+
+  val conf = new SparkConf()
+
+  val spark = SparkSession.builder
+.master("local[1]")
+.appName("speed up sort when the dataset is small")
+.config(conf)
+.getOrCreate()
+
+  import spark.implicits._
+
+  def withTempPath(f: File => Unit): Unit = {
+val path = Utils.createTempDir()
+path.delete()
+try f(path) finally Utils.deleteRecursively(path)
+  }
+
+  def run(rowsNum: Int): Unit = {
+val factor = 1000
+val key = rowsNum / 2
+val benchmark = new Benchmark("speed up sort when the dataset is 
small", rowsNum * factor)
+withTempPath { path =>
+  // scalastyle:off println
+  benchmark.out.println("Preparing data for benchmarking ...")
+  // scalastyle:on println
+
+  val list = (0 to factor).toList
+
+  spark.sparkContext.range(0, rowsNum, 1)
+.flatMap(num => {
+  list.map(x => (num, x))
+})
+.toDF("key", "value")
+.write
+.option("encoding", "UTF-8")
+.json(path.getAbsolutePath)
+
+  benchmark.addCase("sort", 10) { _ =>
+val dataset = spark.read.json(path.getAbsolutePath)
+dataset.createOrReplaceTempView("src")
+val result = spark.
+  sql(s"select * from src where key = $key order by 
value").collectAsList().size()
+
+  }
+
+  benchmark.run()
--- End diff --

It would be good to put the result into the source file as a comment. WDYT?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21859: [SPARK-24900][SQL]Speed up sort when the dataset ...

2018-08-07 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/21859#discussion_r208436900
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/SmallDataSortBenchmark.scala
 ---
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.benchmark
+
+import java.io.File
+
+import org.apache.spark.SparkConf
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.util.{Benchmark, Utils}
+
+/**
+  * The benchmarks aims to measure performance of
+  * [SPARK-24900][SQL]speed up sort when the dataset is small
+  */
+object SmallDataSortBenchmark {
+
+  val conf = new SparkConf()
+
+  val spark = SparkSession.builder
+.master("local[1]")
+.appName("speed up sort when the dataset is small")
+.config(conf)
+.getOrCreate()
+
+  import spark.implicits._
+
+  def withTempPath(f: File => Unit): Unit = {
+val path = Utils.createTempDir()
+path.delete()
+try f(path) finally Utils.deleteRecursively(path)
+  }
+
+  def run(rowsNum: Int): Unit = {
+val factor = 1000
+val key = rowsNum / 2
+val benchmark = new Benchmark("speed up sort when the dataset is 
small", rowsNum * factor)
+withTempPath { path =>
+  // scalastyle:off println
+  benchmark.out.println("Preparing data for benchmarking ...")
+  // scalastyle:on println
+
+  val list = (0 to factor).toList
+
+  spark.sparkContext.range(0, rowsNum, 1)
+.flatMap(num => {
+  list.map(x => (num, x))
+})
+.toDF("key", "value")
+.write
+.option("encoding", "UTF-8")
+.json(path.getAbsolutePath)
+
+  val dataset = spark.read.json(path.getAbsolutePath)
+
+  dataset.createOrReplaceTempView("src")
+
+  benchmark.addCase("sort with optimization", 10) { _ =>
+
spark.conf.set("spark.sql.execution.rangeExchange.sampleCache.enabled", "true")
+val result = spark.
+  sql(s"select * from src where key = $key order by 
value").collectAsList().size()
+
+  }
+
+  benchmark.addCase("sort without optimization", 10) { _ =>
+
spark.conf.set("spark.sql.execution.rangeExchange.sampleCache.enabled", "false")
+val result = spark.
+  sql(s"select * from src where key = $key order by 
value").collectAsList().size()
+
+  }
+
+  benchmark.run()
+}
+
--- End diff --

nit: remove unnecessary blank lines. Lines 90 and 94, too.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21859: [SPARK-24900][SQL]Speed up sort when the dataset ...

2018-08-07 Thread sddyljsx
Github user sddyljsx commented on a diff in the pull request:

https://github.com/apache/spark/pull/21859#discussion_r208129700
  
--- Diff: core/src/main/scala/org/apache/spark/Partitioner.scala ---
@@ -166,7 +170,13 @@ class RangePartitioner[K : Ordering : ClassTag, V](
   // Assume the input partitions are roughly balanced and over-sample 
a little bit.
   val sampleSizePerPartition = math.ceil(3.0 * sampleSize / 
rdd.partitions.length).toInt
   val (numItems, sketched) = RangePartitioner.sketch(rdd.map(_._1), 
sampleSizePerPartition)
-  if (numItems == 0L) {
+  // get the sampled data
+  sampledArray = sketched.foldLeft(sampledArray)((total, sample) => {
--- End diff --

good idea. I have updated code. 'calculate the sampledArray only when 
necessary'



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21859: [SPARK-24900][SQL]Speed up sort when the dataset ...

2018-08-07 Thread sddyljsx
Github user sddyljsx commented on a diff in the pull request:

https://github.com/apache/spark/pull/21859#discussion_r208125902
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/SmallDataSortBenchmark.scala
 ---
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.benchmark
+
+import java.io.File
+
+import org.apache.spark.SparkConf
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.util.{Benchmark, Utils}
+
+/**
+  * The benchmarks aims to measure performance of
+  * [SPARK-24900][SQL]speed up sort when the dataset is small
+  */
+object SmallDataSortBenchmark {
+
+  val conf = new SparkConf()
+
+  val spark = SparkSession.builder
+.master("local[1]")
+.appName("speed up sort when the dataset is small")
+.config(conf)
+.getOrCreate()
+
+  import spark.implicits._
+
+  def withTempPath(f: File => Unit): Unit = {
+val path = Utils.createTempDir()
+path.delete()
+try f(path) finally Utils.deleteRecursively(path)
+  }
+
+  def run(rowsNum: Int): Unit = {
+val factor = 1000
+val key = rowsNum / 2
+val benchmark = new Benchmark("speed up sort when the dataset is 
small", rowsNum * factor)
+withTempPath { path =>
+  // scalastyle:off println
+  benchmark.out.println("Preparing data for benchmarking ...")
+  // scalastyle:on println
+
+  val list = (0 to factor).toList
+
+  spark.sparkContext.range(0, rowsNum, 1)
+.flatMap(num => {
+  list.map(x => (num, x))
+})
+.toDF("key", "value")
+.write
+.option("encoding", "UTF-8")
+.json(path.getAbsolutePath)
+
+  benchmark.addCase("sort", 10) { _ =>
+val dataset = spark.read.json(path.getAbsolutePath)
+dataset.createOrReplaceTempView("src")
+val result = spark.
+  sql(s"select * from src where key = $key order by 
value").collectAsList().size()
+
+  }
+
+  benchmark.run()
--- End diff --

the benchmark has been updated, and the result is
  ```
Java HotSpot(TM) 64-Bit Server VM 1.8.0_91-b14 on Mac OS X 10.13.6
Intel(R) Core(TM) i5-5257U CPU @ 2.70GHz

speed up sort when the dataset is small: Best/Avg Time(ms)Rate(M/s)   
Per Row(ns)   Relative


sort with optimization  56695 / 61700  1.8  
   566.9   1.0X
sort without optimization 112698 / 115274  0.9  
  1127.0   0.5X

```
  
the sql speeds ​​up almost by 100% with this optimization.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21859: [SPARK-24900][SQL]Speed up sort when the dataset ...

2018-08-06 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/21859#discussion_r207819972
  
--- Diff: core/src/main/scala/org/apache/spark/Partitioner.scala ---
@@ -166,7 +170,13 @@ class RangePartitioner[K : Ordering : ClassTag, V](
   // Assume the input partitions are roughly balanced and over-sample 
a little bit.
   val sampleSizePerPartition = math.ceil(3.0 * sampleSize / 
rdd.partitions.length).toInt
   val (numItems, sketched) = RangePartitioner.sketch(rdd.map(_._1), 
sampleSizePerPartition)
-  if (numItems == 0L) {
+  // get the sampled data
+  sampledArray = sketched.foldLeft(sampledArray)((total, sample) => {
--- End diff --

Do we need to always create `sampledArray` and to store into `var`? It may 
lead to overhead when the execution would go to L182.
It would be good to calculate only length here and to create the array at 
L179.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21859: [SPARK-24900][SQL]Speed up sort when the dataset ...

2018-08-06 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/21859#discussion_r207817068
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/SmallDataSortBenchmark.scala
 ---
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.benchmark
+
+import java.io.File
+
+import org.apache.spark.SparkConf
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.util.{Benchmark, Utils}
+
+/**
+  * The benchmarks aims to measure performance of
+  * [SPARK-24900][SQL]speed up sort when the dataset is small
+  */
+object SmallDataSortBenchmark {
+
+  val conf = new SparkConf()
+
+  val spark = SparkSession.builder
+.master("local[1]")
+.appName("speed up sort when the dataset is small")
+.config(conf)
+.getOrCreate()
+
+  import spark.implicits._
+
+  def withTempPath(f: File => Unit): Unit = {
+val path = Utils.createTempDir()
+path.delete()
+try f(path) finally Utils.deleteRecursively(path)
+  }
+
+  def run(rowsNum: Int): Unit = {
+val factor = 1000
+val key = rowsNum / 2
+val benchmark = new Benchmark("speed up sort when the dataset is 
small", rowsNum * factor)
+withTempPath { path =>
+  // scalastyle:off println
+  benchmark.out.println("Preparing data for benchmarking ...")
+  // scalastyle:on println
+
+  val list = (0 to factor).toList
+
+  spark.sparkContext.range(0, rowsNum, 1)
+.flatMap(num => {
+  list.map(x => (num, x))
+})
+.toDF("key", "value")
+.write
+.option("encoding", "UTF-8")
+.json(path.getAbsolutePath)
+
+  benchmark.addCase("sort", 10) { _ =>
+val dataset = spark.read.json(path.getAbsolutePath)
+dataset.createOrReplaceTempView("src")
+val result = spark.
+  sql(s"select * from src where key = $key order by 
value").collectAsList().size()
+
+  }
+
+  benchmark.run()
--- End diff --

Would you please add the performance results without and with this PR to 
this space like other benchmarks?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21859: [SPARK-24900][SQL]speed up sort when the dataset ...

2018-07-24 Thread sddyljsx
GitHub user sddyljsx opened a pull request:

https://github.com/apache/spark/pull/21859

[SPARK-24900][SQL]speed up sort when the dataset is small

## What changes were proposed in this pull request?

when running the sql like 'select * from order where order_status = 4 order 
by order_id'. The filescan and filter will be executed twice, it may take a 
long time. If the final dataset is small, and the sample data covers all the 
data, there is no need to do so.

## How was this patch tested?

(Please explain how this patch was tested. E.g. unit tests, integration 
tests, manual tests)
(If this patch involves UI changes, please attach a screenshot; otherwise, 
remove this)

Please review http://spark.apache.org/contributing.html before opening a 
pull request.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/sddyljsx/spark order-optimization

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/21859.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #21859


commit dd50783d638ca5804531061c0a8aef2c8fef9dc1
Author: neal 
Date:   2018-07-24T07:26:58Z

speed up sort when the dataset is small




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org