[GitHub] spark pull request #21859: [SPARK-24900][SQL]Speed up sort when the dataset ...
Github user dongjoon-hyun commented on a diff in the pull request: https://github.com/apache/spark/pull/21859#discussion_r227592853 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/SmallDataSortBenchmark.scala --- @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.benchmark + +import java.io.File + +import org.apache.spark.SparkConf +import org.apache.spark.sql.SparkSession +import org.apache.spark.util.{Benchmark, Utils} + +/** + * The benchmarks aims to measure performance of + * [SPARK-24900][SQL]speed up sort when the dataset is small + */ +object SmallDataSortBenchmark { + + val conf = new SparkConf() + + val spark = SparkSession.builder +.master("local[1]") +.appName("speed up sort when the dataset is small") +.config(conf) +.getOrCreate() + + import spark.implicits._ + + def withTempPath(f: File => Unit): Unit = { +val path = Utils.createTempDir() +path.delete() +try f(path) finally Utils.deleteRecursively(path) + } + + def run(rowsNum: Int): Unit = { +val factor = 1000 +val key = rowsNum / 2 +val benchmark = new Benchmark("small data sort", rowsNum * factor) +withTempPath { path => + // scalastyle:off println + benchmark.out.println("Preparing data for benchmarking ...") + // scalastyle:on println + + val list = (0 to factor).toList + + spark.sparkContext.range(0, rowsNum, 1) +.flatMap(num => { + list.map(x => (num, x)) +}) +.toDF("key", "value") +.write +.option("encoding", "UTF-8") +.json(path.getAbsolutePath) + + val dataset = spark.read.json(path.getAbsolutePath) + + dataset.createOrReplaceTempView("src") + + benchmark.addCase("with optimization", 10) { _ => +// 334 * 3 > 1000, the optimization works +spark.conf.set("spark.sql.shuffle.partitions", dataset.rdd.getNumPartitions) + spark.conf.set("spark.sql.execution.rangeExchange.sampleSizePerPartition", "334") +val result = spark. + sql(s"select * from src where key = $key order by value").collectAsList().size() + + } + + benchmark.addCase("without optimization", 10) { _ => +// 333 * 3 < 1000, the optimization doesn't work +spark.conf.set("spark.sql.shuffle.partitions", dataset.rdd.getNumPartitions) + spark.conf.set("spark.sql.execution.rangeExchange.sampleSizePerPartition", "333") +val result = spark. + sql(s"select * from src where key = $key order by value").collectAsList().size() + + } + + /* + * Java HotSpot(TM) 64-Bit Server VM 1.8.0_91-b14 on Mac OS X 10.13.6 + * Intel(R) Core(TM) i5-5257U CPU @ 2.70GHz + * + * small data sort: Best/Avg Time(ms)Rate(M/s) Per Row(ns) Relative + * -- + * with optimization 54077 / 57989 1.8 540.8 1.0X + * without optimization111780 / 115001 0.9 1117.8 0.5X --- End diff -- For the benchmark, we usually put the baseline result at the first line. Could you switch line 71~78 and 80~87? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21859: [SPARK-24900][SQL]Speed up sort when the dataset ...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21859#discussion_r213560104 --- Diff: core/src/main/scala/org/apache/spark/Partitioner.scala --- @@ -138,7 +138,8 @@ class RangePartitioner[K : Ordering : ClassTag, V]( partitions: Int, rdd: RDD[_ <: Product2[K, V]], private var ascending: Boolean = true, -val samplePointsPerPartitionHint: Int = 20) +val samplePointsPerPartitionHint: Int = 20, +needCacheSample: Boolean = false) --- End diff -- can we create a different `RangePartitioner` in Spark SQL? it's a little weird to make a change in the core module which only make sense in the SQL module. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21859: [SPARK-24900][SQL]Speed up sort when the dataset ...
Github user sddyljsx commented on a diff in the pull request: https://github.com/apache/spark/pull/21859#discussion_r211270748 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -1207,6 +1207,13 @@ object SQLConf { .intConf .createWithDefault(100) + val RANGE_EXCHANGE_SAMPLE_CACHE_ENABLE = +buildConf("spark.sql.execution.rangeExchange.sampleCache.enabled") --- End diff -- Yes. The RangePartitioner is mainly used for sorting. Not only in Spark Sql, it is used in many other places. There is no need for config here. I removed the config, and the optimization will work by default. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21859: [SPARK-24900][SQL]Speed up sort when the dataset ...
Github user sddyljsx commented on a diff in the pull request: https://github.com/apache/spark/pull/21859#discussion_r211230520 --- Diff: core/src/main/scala/org/apache/spark/Partitioner.scala --- @@ -166,9 +169,20 @@ class RangePartitioner[K : Ordering : ClassTag, V]( // Assume the input partitions are roughly balanced and over-sample a little bit. val sampleSizePerPartition = math.ceil(3.0 * sampleSize / rdd.partitions.length).toInt val (numItems, sketched) = RangePartitioner.sketch(rdd.map(_._1), sampleSizePerPartition) + val numSampled = sketched.map(_._3.length).sum if (numItems == 0L) { Array.empty } else { +// already got the whole data +if (sampleCacheEnabled && numItems == numSampled) { + // get the sampled data + sampledArray = new Array[K](numSampled) + var curPos = 0 + sketched.foreach(_._3.foreach(sampleRow => { --- End diff -- We need the sample data to form a rdd for the next step. I can't think of a better idea than merging the sample data arrays from each partition to one array and parallelizing it to a rdd: ``` sparkContext.parallelize(partitioner.getSampledArray.toSeq, rdd.getNumPartitions) ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21859: [SPARK-24900][SQL]Speed up sort when the dataset ...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21859#discussion_r211168107 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -1207,6 +1207,13 @@ object SQLConf { .intConf .createWithDefault(100) + val RANGE_EXCHANGE_SAMPLE_CACHE_ENABLE = +buildConf("spark.sql.execution.rangeExchange.sampleCache.enabled") --- End diff -- I think this is a feature of Spark core instead of Spark SQL. `RangePartitioner` is in Spark core and we can apply this optimization to `RDD.sort` as well --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21859: [SPARK-24900][SQL]Speed up sort when the dataset ...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21859#discussion_r211167603 --- Diff: core/src/main/scala/org/apache/spark/Partitioner.scala --- @@ -166,9 +169,20 @@ class RangePartitioner[K : Ordering : ClassTag, V]( // Assume the input partitions are roughly balanced and over-sample a little bit. val sampleSizePerPartition = math.ceil(3.0 * sampleSize / rdd.partitions.length).toInt val (numItems, sketched) = RangePartitioner.sketch(rdd.map(_._1), sampleSizePerPartition) + val numSampled = sketched.map(_._3.length).sum if (numItems == 0L) { Array.empty } else { +// already got the whole data +if (sampleCacheEnabled && numItems == numSampled) { + // get the sampled data + sampledArray = new Array[K](numSampled) + var curPos = 0 + sketched.foreach(_._3.foreach(sampleRow => { --- End diff -- can't we just keep the `sketched` instead of copying it to a new array? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21859: [SPARK-24900][SQL]Speed up sort when the dataset ...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21859#discussion_r211165654 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -1207,6 +1207,13 @@ object SQLConf { .intConf .createWithDefault(100) + val RANGE_EXCHANGE_SAMPLE_CACHE_ENABLE = +buildConf("spark.sql.execution.rangeExchange.sampleCache.enabled") + .doc("When true, if the sampled data in range partitioning covers the whole data, " + +"the sampled data will be cached for the next sort.") + .booleanConf + .createWithDefault(true) --- End diff -- Do we expect users to tune it? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21859: [SPARK-24900][SQL]Speed up sort when the dataset ...
Github user sddyljsx commented on a diff in the pull request: https://github.com/apache/spark/pull/21859#discussion_r211131380 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -1207,6 +1207,13 @@ object SQLConf { .intConf .createWithDefault(100) + val RANGE_EXCHANGE_SAMPLE_CACHE_ENABLE = +buildConf("spark.sql.execution.rangeExchange.sampleCache.enabled") + .doc("When true, if the sampled data in range partitioning covers the whole data, " + +"the sampled data will be cached for the next sort.") + .booleanConf + .createWithDefault(true) --- End diff -- I am also somewhat uncertain. What do you think of it? @kiszk --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21859: [SPARK-24900][SQL]Speed up sort when the dataset ...
Github user sddyljsx commented on a diff in the pull request: https://github.com/apache/spark/pull/21859#discussion_r211131294 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -1207,6 +1207,13 @@ object SQLConf { .intConf .createWithDefault(100) + val RANGE_EXCHANGE_SAMPLE_CACHE_ENABLE = +buildConf("spark.sql.execution.rangeExchange.sampleCache.enabled") + .doc("When true, if the sampled data in range partitioning covers the whole data, " + +"the sampled data will be cached for the next sort.") + .booleanConf + .createWithDefault(true) --- End diff -- I am also somewhat uncertain. What do you think of it? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21859: [SPARK-24900][SQL]Speed up sort when the dataset ...
Github user sddyljsx commented on a diff in the pull request: https://github.com/apache/spark/pull/21859#discussion_r211130877 --- Diff: core/src/main/scala/org/apache/spark/Partitioner.scala --- @@ -155,6 +156,8 @@ class RangePartitioner[K : Ordering : ClassTag, V]( private var ordering = implicitly[Ordering[K]] + private var sampledArray: Array[K] = _ --- End diff -- Yes, this will be better --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21859: [SPARK-24900][SQL]Speed up sort when the dataset ...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/21859#discussion_r211122674 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -1207,6 +1207,13 @@ object SQLConf { .intConf .createWithDefault(100) + val RANGE_EXCHANGE_SAMPLE_CACHE_ENABLE = +buildConf("spark.sql.execution.rangeExchange.sampleCache.enabled") + .doc("When true, if the sampled data in range partitioning covers the whole data, " + +"the sampled data will be cached for the next sort.") + .booleanConf + .createWithDefault(true) --- End diff -- Should it be an internal config? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21859: [SPARK-24900][SQL]Speed up sort when the dataset ...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/21859#discussion_r211122076 --- Diff: core/src/main/scala/org/apache/spark/Partitioner.scala --- @@ -155,6 +156,8 @@ class RangePartitioner[K : Ordering : ClassTag, V]( private var ordering = implicitly[Ordering[K]] + private var sampledArray: Array[K] = _ --- End diff -- I think this should be `@transient`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21859: [SPARK-24900][SQL]Speed up sort when the dataset ...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/21859#discussion_r209486478 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala --- @@ -294,7 +296,12 @@ object ShuffleExchangeExec { sorter.sort(iter.asInstanceOf[Iterator[UnsafeRow]]) } } else { -rdd +part match { + case partitioner: RangePartitioner[InternalRow @unchecked, _] --- End diff -- Ok. I see. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21859: [SPARK-24900][SQL]Speed up sort when the dataset ...
Github user sddyljsx commented on a diff in the pull request: https://github.com/apache/spark/pull/21859#discussion_r209486199 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala --- @@ -294,7 +296,12 @@ object ShuffleExchangeExec { sorter.sort(iter.asInstanceOf[Iterator[UnsafeRow]]) } } else { -rdd +part match { + case partitioner: RangePartitioner[InternalRow @unchecked, _] --- End diff -- yes, but ``` def parallelize[T: ClassTag]( seq: Seq[T], numSlices: Int = defaultParallelism): RDD[T] = withScope { assertNotStopped() new ParallelCollectionRDD[T](this, seq, numSlices, Map[Int, Seq[String]]()) } ``` the parallelize function needs this ClassTag, so we must match it here. I tried to match RangePartitioner[_, _], but there is an error: ``` Error:(302, 37) No ClassTag available for _ Error occurred in an application involving default arguments. sparkContext.parallelize(partitioner.getSampledArray.toSeq, rdd.getNumPartitions) ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21859: [SPARK-24900][SQL]Speed up sort when the dataset ...
Github user sddyljsx commented on a diff in the pull request: https://github.com/apache/spark/pull/21859#discussion_r209420016 --- Diff: core/src/main/scala/org/apache/spark/Partitioner.scala --- @@ -166,9 +169,17 @@ class RangePartitioner[K : Ordering : ClassTag, V]( // Assume the input partitions are roughly balanced and over-sample a little bit. val sampleSizePerPartition = math.ceil(3.0 * sampleSize / rdd.partitions.length).toInt val (numItems, sketched) = RangePartitioner.sketch(rdd.map(_._1), sampleSizePerPartition) + val numSampled = sketched.map(_._3.length).sum if (numItems == 0L) { Array.empty } else { +// already got the whole data +if (sampleCacheEnabled && numItems == numSampled) { + // get the sampled data + sampledArray = sketched.foldLeft(Array.empty[K])((total, sample) => { --- End diff -- YES. I have pushed the improvement --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21859: [SPARK-24900][SQL]Speed up sort when the dataset ...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/21859#discussion_r209418749 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala --- @@ -294,7 +296,12 @@ object ShuffleExchangeExec { sorter.sort(iter.asInstanceOf[Iterator[UnsafeRow]]) } } else { -rdd +part match { + case partitioner: RangePartitioner[InternalRow @unchecked, _] --- End diff -- I think we don't get `RangePartitioner` with other type argument than `InternalRow`, right? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21859: [SPARK-24900][SQL]Speed up sort when the dataset ...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/21859#discussion_r209418206 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala --- @@ -294,7 +296,12 @@ object ShuffleExchangeExec { sorter.sort(iter.asInstanceOf[Iterator[UnsafeRow]]) } } else { -rdd +part match { + case partitioner: RangePartitioner[InternalRow @unchecked, _] +if partitioner.getSampledArray != null => +sparkContext.parallelize(partitioner.getSampledArray.toSeq, rdd.getNumPartitions) --- End diff -- Ah, you are right. The number of partition is not specified here. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21859: [SPARK-24900][SQL]Speed up sort when the dataset ...
Github user sddyljsx commented on a diff in the pull request: https://github.com/apache/spark/pull/21859#discussion_r209418116 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala --- @@ -294,7 +296,12 @@ object ShuffleExchangeExec { sorter.sort(iter.asInstanceOf[Iterator[UnsafeRow]]) } } else { -rdd +part match { + case partitioner: RangePartitioner[InternalRow @unchecked, _] +if partitioner.getSampledArray != null => +sparkContext.parallelize(partitioner.getSampledArray.toSeq, rdd.getNumPartitions) --- End diff -- every partitionId is in the expected range [0, part.numPartitions - 1] --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21859: [SPARK-24900][SQL]Speed up sort when the dataset ...
Github user sddyljsx commented on a diff in the pull request: https://github.com/apache/spark/pull/21859#discussion_r209418058 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala --- @@ -294,7 +296,12 @@ object ShuffleExchangeExec { sorter.sort(iter.asInstanceOf[Iterator[UnsafeRow]]) } } else { -rdd +part match { + case partitioner: RangePartitioner[InternalRow @unchecked, _] +if partitioner.getSampledArray != null => +sparkContext.parallelize(partitioner.getSampledArray.toSeq, rdd.getNumPartitions) --- End diff -- ``` newRdd.mapPartitionsInternal { iter => val getPartitionKey = getPartitionKeyExtractor() val mutablePair = new MutablePair[Int, InternalRow]() iter.map { row => mutablePair.update(part.getPartition(getPartitionKey(row)), row) } ``` the newRdd uses the partitioner actually. It will map row to (partitionId, row) for the further shuffle --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21859: [SPARK-24900][SQL]Speed up sort when the dataset ...
Github user sddyljsx commented on a diff in the pull request: https://github.com/apache/spark/pull/21859#discussion_r209417745 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala --- @@ -294,7 +296,12 @@ object ShuffleExchangeExec { sorter.sort(iter.asInstanceOf[Iterator[UnsafeRow]]) } } else { -rdd +part match { + case partitioner: RangePartitioner[InternalRow @unchecked, _] +if partitioner.getSampledArray != null => +sparkContext.parallelize(partitioner.getSampledArray.toSeq, rdd.getNumPartitions) --- End diff -- The parallelized sampled data will be assigned to the newRdd. the newRdd will execute: ``` newRdd.mapPartitionsInternal { iter => val getPartitionKey = getPartitionKeyExtractor() val mutablePair = new MutablePair[Int, InternalRow]() iter.map { row => mutablePair.update(part.getPartition(getPartitionKey(row)), row) } ``` the newRdd uses the partitioner actually. It will map row to (partitionId, row) for the further shuffle --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21859: [SPARK-24900][SQL]Speed up sort when the dataset ...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/21859#discussion_r209417666 --- Diff: core/src/main/scala/org/apache/spark/Partitioner.scala --- @@ -166,9 +169,17 @@ class RangePartitioner[K : Ordering : ClassTag, V]( // Assume the input partitions are roughly balanced and over-sample a little bit. val sampleSizePerPartition = math.ceil(3.0 * sampleSize / rdd.partitions.length).toInt val (numItems, sketched) = RangePartitioner.sketch(rdd.map(_._1), sampleSizePerPartition) + val numSampled = sketched.map(_._3.length).sum if (numItems == 0L) { Array.empty } else { +// already got the whole data +if (sampleCacheEnabled && numItems == numSampled) { + // get the sampled data + sampledArray = sketched.foldLeft(Array.empty[K])((total, sample) => { --- End diff -- You don't need to always create the array. You can create here but allocate all needed space at once, instead of growing it incrementally. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21859: [SPARK-24900][SQL]Speed up sort when the dataset ...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/21859#discussion_r209417629 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala --- @@ -294,7 +296,12 @@ object ShuffleExchangeExec { sorter.sort(iter.asInstanceOf[Iterator[UnsafeRow]]) } } else { -rdd +part match { + case partitioner: RangePartitioner[InternalRow @unchecked, _] +if partitioner.getSampledArray != null => +sparkContext.parallelize(partitioner.getSampledArray.toSeq, rdd.getNumPartitions) --- End diff -- Then you will change the number of partitions this shuffle requires. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21859: [SPARK-24900][SQL]Speed up sort when the dataset ...
Github user sddyljsx commented on a diff in the pull request: https://github.com/apache/spark/pull/21859#discussion_r209417551 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala --- @@ -294,7 +296,12 @@ object ShuffleExchangeExec { sorter.sort(iter.asInstanceOf[Iterator[UnsafeRow]]) } } else { -rdd +part match { + case partitioner: RangePartitioner[InternalRow @unchecked, _] +if partitioner.getSampledArray != null => +sparkContext.parallelize(partitioner.getSampledArray.toSeq, rdd.getNumPartitions) --- End diff -- ``` part match { case partitioner: RangePartitioner[InternalRow @unchecked, _] if partitioner.getSampledArray != null => sparkContext.parallelize(partitioner.getSampledArray.toSeq, rdd.getNumPartitions) case _ => rdd } ``` When the optimization works, It will return the parallelized sampled data instead of the rdd. So I keep the number of the partitions same as the rdd's here --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21859: [SPARK-24900][SQL]Speed up sort when the dataset ...
Github user sddyljsx commented on a diff in the pull request: https://github.com/apache/spark/pull/21859#discussion_r209417115 --- Diff: core/src/main/scala/org/apache/spark/Partitioner.scala --- @@ -166,9 +169,17 @@ class RangePartitioner[K : Ordering : ClassTag, V]( // Assume the input partitions are roughly balanced and over-sample a little bit. val sampleSizePerPartition = math.ceil(3.0 * sampleSize / rdd.partitions.length).toInt val (numItems, sketched) = RangePartitioner.sketch(rdd.map(_._1), sampleSizePerPartition) + val numSampled = sketched.map(_._3.length).sum if (numItems == 0L) { Array.empty } else { +// already got the whole data +if (sampleCacheEnabled && numItems == numSampled) { + // get the sampled data + sampledArray = sketched.foldLeft(Array.empty[K])((total, sample) => { --- End diff -- as @kiszk suggests in his review: Do we need to always create sampledArray and to store into var? It may lead to overhead when the execution would go to L182. It would be good to calculate only length here and to create the array at L179. maybe allocate it when necessary is a better choice --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21859: [SPARK-24900][SQL]Speed up sort when the dataset ...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/21859#discussion_r209410871 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala --- @@ -294,7 +296,12 @@ object ShuffleExchangeExec { sorter.sort(iter.asInstanceOf[Iterator[UnsafeRow]]) } } else { -rdd +part match { + case partitioner: RangePartitioner[InternalRow @unchecked, _] +if partitioner.getSampledArray != null => +sparkContext.parallelize(partitioner.getSampledArray.toSeq, rdd.getNumPartitions) --- End diff -- When you just parallelize sampled data, it might not have required partitioning (rang partitioning), doesn't it affect the result? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21859: [SPARK-24900][SQL]Speed up sort when the dataset ...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/21859#discussion_r209407875 --- Diff: core/src/main/scala/org/apache/spark/Partitioner.scala --- @@ -166,9 +169,17 @@ class RangePartitioner[K : Ordering : ClassTag, V]( // Assume the input partitions are roughly balanced and over-sample a little bit. val sampleSizePerPartition = math.ceil(3.0 * sampleSize / rdd.partitions.length).toInt val (numItems, sketched) = RangePartitioner.sketch(rdd.map(_._1), sampleSizePerPartition) + val numSampled = sketched.map(_._3.length).sum if (numItems == 0L) { Array.empty } else { +// already got the whole data +if (sampleCacheEnabled && numItems == numSampled) { + // get the sampled data + sampledArray = sketched.foldLeft(Array.empty[K])((total, sample) => { --- End diff -- As you already know the size of `sampledArray`, maybe you can allocate it at once in advance. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21859: [SPARK-24900][SQL]Speed up sort when the dataset ...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/21859#discussion_r209408395 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala --- @@ -294,7 +296,12 @@ object ShuffleExchangeExec { sorter.sort(iter.asInstanceOf[Iterator[UnsafeRow]]) } } else { -rdd +part match { + case partitioner: RangePartitioner[InternalRow @unchecked, _] +if partitioner.getSampledArray != null => +sparkContext.parallelize(partitioner.getSampledArray.toSeq, rdd.getNumPartitions) --- End diff -- Instead of `rdd.getNumPartitions`, I think we should use `partitioner.numPartitions`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21859: [SPARK-24900][SQL]Speed up sort when the dataset ...
Github user sddyljsx commented on a diff in the pull request: https://github.com/apache/spark/pull/21859#discussion_r208804032 --- Diff: core/src/main/scala/org/apache/spark/Partitioner.scala --- @@ -166,7 +169,16 @@ class RangePartitioner[K : Ordering : ClassTag, V]( // Assume the input partitions are roughly balanced and over-sample a little bit. val sampleSizePerPartition = math.ceil(3.0 * sampleSize / rdd.partitions.length).toInt val (numItems, sketched) = RangePartitioner.sketch(rdd.map(_._1), sampleSizePerPartition) - if (numItems == 0L) { + val numSampled = sketched.map(_._3.length).sum + if (numItems == 0) { +Array.empty + } + // already got the whole data + else if (sampleCacheEnabled && numItems == numSampled) { --- End diff -- fixed --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21859: [SPARK-24900][SQL]Speed up sort when the dataset ...
Github user sddyljsx commented on a diff in the pull request: https://github.com/apache/spark/pull/21859#discussion_r208803622 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala --- @@ -294,7 +296,12 @@ object ShuffleExchangeExec { sorter.sort(iter.asInstanceOf[Iterator[UnsafeRow]]) } } else { -rdd +part match { + case partitioner: RangePartitioner[InternalRow @unchecked, _] --- End diff -- ``` def prepareShuffleDependency( rdd: RDD[InternalRow], outputAttributes: Seq[Attribute], newPartitioning: Partitioning, serializer: Serializer, sparkContext: SparkContext = null): ShuffleDependency[Int, InternalRow, InternalRow] = { ``` the rdd is defined as RDD[InternalRow], so we must match type argument here --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21859: [SPARK-24900][SQL]Speed up sort when the dataset ...
Github user sddyljsx commented on a diff in the pull request: https://github.com/apache/spark/pull/21859#discussion_r208803055 --- Diff: core/src/main/scala/org/apache/spark/Partitioner.scala --- @@ -166,7 +169,16 @@ class RangePartitioner[K : Ordering : ClassTag, V]( // Assume the input partitions are roughly balanced and over-sample a little bit. val sampleSizePerPartition = math.ceil(3.0 * sampleSize / rdd.partitions.length).toInt val (numItems, sketched) = RangePartitioner.sketch(rdd.map(_._1), sampleSizePerPartition) - if (numItems == 0L) { + val numSampled = sketched.map(_._3.length).sum + if (numItems == 0) { +Array.empty + } + // already got the whole data + else if (sampleCacheEnabled && numItems == numSampled) { +// get the sampled data +sampledArray = sketched.foldLeft(Array.empty[K])((total, sample) => { + total ++ sample._3 +}) Array.empty --- End diff -- it may cause errors, I have fixed it --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21859: [SPARK-24900][SQL]Speed up sort when the dataset ...
Github user sddyljsx commented on a diff in the pull request: https://github.com/apache/spark/pull/21859#discussion_r208801641 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala --- @@ -294,7 +296,12 @@ object ShuffleExchangeExec { sorter.sort(iter.asInstanceOf[Iterator[UnsafeRow]]) } } else { -rdd +part match { + case partitioner: RangePartitioner[InternalRow @unchecked, _] +if partitioner.getSampledArray != null => +sparkContext.parallelize(partitioner.getSampledArray.toSeq) --- End diff -- change to : sparkContext.parallelize(partitioner.getSampledArray.toSeq, rdd.getNumPartitions) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21859: [SPARK-24900][SQL]Speed up sort when the dataset ...
Github user sddyljsx commented on a diff in the pull request: https://github.com/apache/spark/pull/21859#discussion_r208801492 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala --- @@ -2799,6 +2799,26 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { } } + test("SPARK-24900: speed up sort when the dataset is small") { --- End diff -- UT for RangePartitioner has been added. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21859: [SPARK-24900][SQL]Speed up sort when the dataset ...
Github user sddyljsx commented on a diff in the pull request: https://github.com/apache/spark/pull/21859#discussion_r208801520 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala --- @@ -2799,6 +2799,26 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { } } + test("SPARK-24900: speed up sort when the dataset is small") { +withTempView("src") { + // the optimization works --- End diff -- UT for RangePartitioner has been added. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21859: [SPARK-24900][SQL]Speed up sort when the dataset ...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/21859#discussion_r208775191 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala --- @@ -294,7 +296,12 @@ object ShuffleExchangeExec { sorter.sort(iter.asInstanceOf[Iterator[UnsafeRow]]) } } else { -rdd +part match { + case partitioner: RangePartitioner[InternalRow @unchecked, _] --- End diff -- Do we need to match type argument of `RangePartitioner`? Maybe just: ```scala part match { case partitioner: RangePartitioner[_, _] if partitioner.getSampledArray != null => ... case _ => rdd } ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21859: [SPARK-24900][SQL]Speed up sort when the dataset ...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/21859#discussion_r208775694 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala --- @@ -294,7 +296,12 @@ object ShuffleExchangeExec { sorter.sort(iter.asInstanceOf[Iterator[UnsafeRow]]) } } else { -rdd +part match { + case partitioner: RangePartitioner[InternalRow @unchecked, _] +if partitioner.getSampledArray != null => +sparkContext.parallelize(partitioner.getSampledArray.toSeq) --- End diff -- This should parallelize the sampled array to required partition number too. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21859: [SPARK-24900][SQL]Speed up sort when the dataset ...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/21859#discussion_r208774474 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala --- @@ -2799,6 +2799,26 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { } } + test("SPARK-24900: speed up sort when the dataset is small") { +withTempView("src") { + // the optimization works --- End diff -- It is hard to tell if this optimization actually works from this test. Can we check if `getSampledArray` returns correct data? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21859: [SPARK-24900][SQL]Speed up sort when the dataset ...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/21859#discussion_r208775584 --- Diff: core/src/main/scala/org/apache/spark/Partitioner.scala --- @@ -166,7 +169,16 @@ class RangePartitioner[K : Ordering : ClassTag, V]( // Assume the input partitions are roughly balanced and over-sample a little bit. val sampleSizePerPartition = math.ceil(3.0 * sampleSize / rdd.partitions.length).toInt val (numItems, sketched) = RangePartitioner.sketch(rdd.map(_._1), sampleSizePerPartition) - if (numItems == 0L) { + val numSampled = sketched.map(_._3.length).sum + if (numItems == 0) { +Array.empty + } + // already got the whole data + else if (sampleCacheEnabled && numItems == numSampled) { +// get the sampled data +sampledArray = sketched.foldLeft(Array.empty[K])((total, sample) => { + total ++ sample._3 +}) Array.empty --- End diff -- Returning `Array.empty` here will result in single partition. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21859: [SPARK-24900][SQL]Speed up sort when the dataset ...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/21859#discussion_r208774179 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala --- @@ -2799,6 +2799,26 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { } } + test("SPARK-24900: speed up sort when the dataset is small") { --- End diff -- We may need unit test for `RangePartitioner` to test it correctly works with this change. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21859: [SPARK-24900][SQL]Speed up sort when the dataset ...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/21859#discussion_r208770924 --- Diff: core/src/main/scala/org/apache/spark/Partitioner.scala --- @@ -166,7 +169,16 @@ class RangePartitioner[K : Ordering : ClassTag, V]( // Assume the input partitions are roughly balanced and over-sample a little bit. val sampleSizePerPartition = math.ceil(3.0 * sampleSize / rdd.partitions.length).toInt val (numItems, sketched) = RangePartitioner.sketch(rdd.map(_._1), sampleSizePerPartition) - if (numItems == 0L) { + val numSampled = sketched.map(_._3.length).sum + if (numItems == 0) { +Array.empty + } + // already got the whole data + else if (sampleCacheEnabled && numItems == numSampled) { --- End diff -- This is style issue. ```scala if (numItems == 0) { } else if { ... } ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21859: [SPARK-24900][SQL]Speed up sort when the dataset ...
Github user sddyljsx commented on a diff in the pull request: https://github.com/apache/spark/pull/21859#discussion_r208441135 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/SmallDataSortBenchmark.scala --- @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.benchmark + +import java.io.File + +import org.apache.spark.SparkConf +import org.apache.spark.sql.SparkSession +import org.apache.spark.util.{Benchmark, Utils} + +/** + * The benchmarks aims to measure performance of + * [SPARK-24900][SQL]speed up sort when the dataset is small + */ +object SmallDataSortBenchmark { + + val conf = new SparkConf() + + val spark = SparkSession.builder +.master("local[1]") +.appName("speed up sort when the dataset is small") +.config(conf) +.getOrCreate() + + import spark.implicits._ + + def withTempPath(f: File => Unit): Unit = { +val path = Utils.createTempDir() +path.delete() +try f(path) finally Utils.deleteRecursively(path) + } + + def run(rowsNum: Int): Unit = { +val factor = 1000 +val key = rowsNum / 2 +val benchmark = new Benchmark("speed up sort when the dataset is small", rowsNum * factor) +withTempPath { path => + // scalastyle:off println + benchmark.out.println("Preparing data for benchmarking ...") + // scalastyle:on println + + val list = (0 to factor).toList + + spark.sparkContext.range(0, rowsNum, 1) +.flatMap(num => { + list.map(x => (num, x)) +}) +.toDF("key", "value") +.write +.option("encoding", "UTF-8") +.json(path.getAbsolutePath) + + val dataset = spark.read.json(path.getAbsolutePath) + + dataset.createOrReplaceTempView("src") + + benchmark.addCase("sort with optimization", 10) { _ => + spark.conf.set("spark.sql.execution.rangeExchange.sampleCache.enabled", "true") +val result = spark. + sql(s"select * from src where key = $key order by value").collectAsList().size() + + } + + benchmark.addCase("sort without optimization", 10) { _ => + spark.conf.set("spark.sql.execution.rangeExchange.sampleCache.enabled", "false") +val result = spark. + sql(s"select * from src where key = $key order by value").collectAsList().size() + + } + + benchmark.run() +} + --- End diff -- I have removed them. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21859: [SPARK-24900][SQL]Speed up sort when the dataset ...
Github user sddyljsx commented on a diff in the pull request: https://github.com/apache/spark/pull/21859#discussion_r208441067 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/SmallDataSortBenchmark.scala --- @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.benchmark + +import java.io.File + +import org.apache.spark.SparkConf +import org.apache.spark.sql.SparkSession +import org.apache.spark.util.{Benchmark, Utils} + +/** + * The benchmarks aims to measure performance of + * [SPARK-24900][SQL]speed up sort when the dataset is small + */ +object SmallDataSortBenchmark { + + val conf = new SparkConf() + + val spark = SparkSession.builder +.master("local[1]") +.appName("speed up sort when the dataset is small") +.config(conf) +.getOrCreate() + + import spark.implicits._ + + def withTempPath(f: File => Unit): Unit = { +val path = Utils.createTempDir() +path.delete() +try f(path) finally Utils.deleteRecursively(path) + } + + def run(rowsNum: Int): Unit = { +val factor = 1000 +val key = rowsNum / 2 +val benchmark = new Benchmark("speed up sort when the dataset is small", rowsNum * factor) +withTempPath { path => + // scalastyle:off println + benchmark.out.println("Preparing data for benchmarking ...") + // scalastyle:on println + + val list = (0 to factor).toList + + spark.sparkContext.range(0, rowsNum, 1) +.flatMap(num => { + list.map(x => (num, x)) +}) +.toDF("key", "value") +.write +.option("encoding", "UTF-8") +.json(path.getAbsolutePath) + + benchmark.addCase("sort", 10) { _ => +val dataset = spark.read.json(path.getAbsolutePath) +dataset.createOrReplaceTempView("src") +val result = spark. + sql(s"select * from src where key = $key order by value").collectAsList().size() + + } + + benchmark.run() --- End diff -- Yes, I forgot it. I have added it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21859: [SPARK-24900][SQL]Speed up sort when the dataset ...
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/21859#discussion_r208437010 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/SmallDataSortBenchmark.scala --- @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.benchmark + +import java.io.File + +import org.apache.spark.SparkConf +import org.apache.spark.sql.SparkSession +import org.apache.spark.util.{Benchmark, Utils} + +/** + * The benchmarks aims to measure performance of + * [SPARK-24900][SQL]speed up sort when the dataset is small + */ +object SmallDataSortBenchmark { + + val conf = new SparkConf() + + val spark = SparkSession.builder +.master("local[1]") +.appName("speed up sort when the dataset is small") +.config(conf) +.getOrCreate() + + import spark.implicits._ + + def withTempPath(f: File => Unit): Unit = { +val path = Utils.createTempDir() +path.delete() +try f(path) finally Utils.deleteRecursively(path) + } + + def run(rowsNum: Int): Unit = { +val factor = 1000 +val key = rowsNum / 2 +val benchmark = new Benchmark("speed up sort when the dataset is small", rowsNum * factor) +withTempPath { path => + // scalastyle:off println + benchmark.out.println("Preparing data for benchmarking ...") + // scalastyle:on println + + val list = (0 to factor).toList + + spark.sparkContext.range(0, rowsNum, 1) +.flatMap(num => { + list.map(x => (num, x)) +}) +.toDF("key", "value") +.write +.option("encoding", "UTF-8") +.json(path.getAbsolutePath) + + benchmark.addCase("sort", 10) { _ => +val dataset = spark.read.json(path.getAbsolutePath) +dataset.createOrReplaceTempView("src") +val result = spark. + sql(s"select * from src where key = $key order by value").collectAsList().size() + + } + + benchmark.run() --- End diff -- It would be good to put the result into the source file as a comment. WDYT? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21859: [SPARK-24900][SQL]Speed up sort when the dataset ...
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/21859#discussion_r208436900 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/SmallDataSortBenchmark.scala --- @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.benchmark + +import java.io.File + +import org.apache.spark.SparkConf +import org.apache.spark.sql.SparkSession +import org.apache.spark.util.{Benchmark, Utils} + +/** + * The benchmarks aims to measure performance of + * [SPARK-24900][SQL]speed up sort when the dataset is small + */ +object SmallDataSortBenchmark { + + val conf = new SparkConf() + + val spark = SparkSession.builder +.master("local[1]") +.appName("speed up sort when the dataset is small") +.config(conf) +.getOrCreate() + + import spark.implicits._ + + def withTempPath(f: File => Unit): Unit = { +val path = Utils.createTempDir() +path.delete() +try f(path) finally Utils.deleteRecursively(path) + } + + def run(rowsNum: Int): Unit = { +val factor = 1000 +val key = rowsNum / 2 +val benchmark = new Benchmark("speed up sort when the dataset is small", rowsNum * factor) +withTempPath { path => + // scalastyle:off println + benchmark.out.println("Preparing data for benchmarking ...") + // scalastyle:on println + + val list = (0 to factor).toList + + spark.sparkContext.range(0, rowsNum, 1) +.flatMap(num => { + list.map(x => (num, x)) +}) +.toDF("key", "value") +.write +.option("encoding", "UTF-8") +.json(path.getAbsolutePath) + + val dataset = spark.read.json(path.getAbsolutePath) + + dataset.createOrReplaceTempView("src") + + benchmark.addCase("sort with optimization", 10) { _ => + spark.conf.set("spark.sql.execution.rangeExchange.sampleCache.enabled", "true") +val result = spark. + sql(s"select * from src where key = $key order by value").collectAsList().size() + + } + + benchmark.addCase("sort without optimization", 10) { _ => + spark.conf.set("spark.sql.execution.rangeExchange.sampleCache.enabled", "false") +val result = spark. + sql(s"select * from src where key = $key order by value").collectAsList().size() + + } + + benchmark.run() +} + --- End diff -- nit: remove unnecessary blank lines. Lines 90 and 94, too. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21859: [SPARK-24900][SQL]Speed up sort when the dataset ...
Github user sddyljsx commented on a diff in the pull request: https://github.com/apache/spark/pull/21859#discussion_r208129700 --- Diff: core/src/main/scala/org/apache/spark/Partitioner.scala --- @@ -166,7 +170,13 @@ class RangePartitioner[K : Ordering : ClassTag, V]( // Assume the input partitions are roughly balanced and over-sample a little bit. val sampleSizePerPartition = math.ceil(3.0 * sampleSize / rdd.partitions.length).toInt val (numItems, sketched) = RangePartitioner.sketch(rdd.map(_._1), sampleSizePerPartition) - if (numItems == 0L) { + // get the sampled data + sampledArray = sketched.foldLeft(sampledArray)((total, sample) => { --- End diff -- good idea. I have updated code. 'calculate the sampledArray only when necessary' --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21859: [SPARK-24900][SQL]Speed up sort when the dataset ...
Github user sddyljsx commented on a diff in the pull request: https://github.com/apache/spark/pull/21859#discussion_r208125902 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/SmallDataSortBenchmark.scala --- @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.benchmark + +import java.io.File + +import org.apache.spark.SparkConf +import org.apache.spark.sql.SparkSession +import org.apache.spark.util.{Benchmark, Utils} + +/** + * The benchmarks aims to measure performance of + * [SPARK-24900][SQL]speed up sort when the dataset is small + */ +object SmallDataSortBenchmark { + + val conf = new SparkConf() + + val spark = SparkSession.builder +.master("local[1]") +.appName("speed up sort when the dataset is small") +.config(conf) +.getOrCreate() + + import spark.implicits._ + + def withTempPath(f: File => Unit): Unit = { +val path = Utils.createTempDir() +path.delete() +try f(path) finally Utils.deleteRecursively(path) + } + + def run(rowsNum: Int): Unit = { +val factor = 1000 +val key = rowsNum / 2 +val benchmark = new Benchmark("speed up sort when the dataset is small", rowsNum * factor) +withTempPath { path => + // scalastyle:off println + benchmark.out.println("Preparing data for benchmarking ...") + // scalastyle:on println + + val list = (0 to factor).toList + + spark.sparkContext.range(0, rowsNum, 1) +.flatMap(num => { + list.map(x => (num, x)) +}) +.toDF("key", "value") +.write +.option("encoding", "UTF-8") +.json(path.getAbsolutePath) + + benchmark.addCase("sort", 10) { _ => +val dataset = spark.read.json(path.getAbsolutePath) +dataset.createOrReplaceTempView("src") +val result = spark. + sql(s"select * from src where key = $key order by value").collectAsList().size() + + } + + benchmark.run() --- End diff -- the benchmark has been updated, and the result is ``` Java HotSpot(TM) 64-Bit Server VM 1.8.0_91-b14 on Mac OS X 10.13.6 Intel(R) Core(TM) i5-5257U CPU @ 2.70GHz speed up sort when the dataset is small: Best/Avg Time(ms)Rate(M/s) Per Row(ns) Relative sort with optimization 56695 / 61700 1.8 566.9 1.0X sort without optimization 112698 / 115274 0.9 1127.0 0.5X ``` the sql speeds ââup almost by 100% with this optimization. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21859: [SPARK-24900][SQL]Speed up sort when the dataset ...
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/21859#discussion_r207819972 --- Diff: core/src/main/scala/org/apache/spark/Partitioner.scala --- @@ -166,7 +170,13 @@ class RangePartitioner[K : Ordering : ClassTag, V]( // Assume the input partitions are roughly balanced and over-sample a little bit. val sampleSizePerPartition = math.ceil(3.0 * sampleSize / rdd.partitions.length).toInt val (numItems, sketched) = RangePartitioner.sketch(rdd.map(_._1), sampleSizePerPartition) - if (numItems == 0L) { + // get the sampled data + sampledArray = sketched.foldLeft(sampledArray)((total, sample) => { --- End diff -- Do we need to always create `sampledArray` and to store into `var`? It may lead to overhead when the execution would go to L182. It would be good to calculate only length here and to create the array at L179. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21859: [SPARK-24900][SQL]Speed up sort when the dataset ...
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/21859#discussion_r207817068 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/SmallDataSortBenchmark.scala --- @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.benchmark + +import java.io.File + +import org.apache.spark.SparkConf +import org.apache.spark.sql.SparkSession +import org.apache.spark.util.{Benchmark, Utils} + +/** + * The benchmarks aims to measure performance of + * [SPARK-24900][SQL]speed up sort when the dataset is small + */ +object SmallDataSortBenchmark { + + val conf = new SparkConf() + + val spark = SparkSession.builder +.master("local[1]") +.appName("speed up sort when the dataset is small") +.config(conf) +.getOrCreate() + + import spark.implicits._ + + def withTempPath(f: File => Unit): Unit = { +val path = Utils.createTempDir() +path.delete() +try f(path) finally Utils.deleteRecursively(path) + } + + def run(rowsNum: Int): Unit = { +val factor = 1000 +val key = rowsNum / 2 +val benchmark = new Benchmark("speed up sort when the dataset is small", rowsNum * factor) +withTempPath { path => + // scalastyle:off println + benchmark.out.println("Preparing data for benchmarking ...") + // scalastyle:on println + + val list = (0 to factor).toList + + spark.sparkContext.range(0, rowsNum, 1) +.flatMap(num => { + list.map(x => (num, x)) +}) +.toDF("key", "value") +.write +.option("encoding", "UTF-8") +.json(path.getAbsolutePath) + + benchmark.addCase("sort", 10) { _ => +val dataset = spark.read.json(path.getAbsolutePath) +dataset.createOrReplaceTempView("src") +val result = spark. + sql(s"select * from src where key = $key order by value").collectAsList().size() + + } + + benchmark.run() --- End diff -- Would you please add the performance results without and with this PR to this space like other benchmarks? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21859: [SPARK-24900][SQL]speed up sort when the dataset ...
GitHub user sddyljsx opened a pull request: https://github.com/apache/spark/pull/21859 [SPARK-24900][SQL]speed up sort when the dataset is small ## What changes were proposed in this pull request? when running the sql like 'select * from order where order_status = 4 order by order_id'. The filescan and filter will be executed twice, it may take a long time. If the final dataset is small, and the sample data covers all the data, there is no need to do so. ## How was this patch tested? (Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests) (If this patch involves UI changes, please attach a screenshot; otherwise, remove this) Please review http://spark.apache.org/contributing.html before opening a pull request. You can merge this pull request into a Git repository by running: $ git pull https://github.com/sddyljsx/spark order-optimization Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/21859.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #21859 commit dd50783d638ca5804531061c0a8aef2c8fef9dc1 Author: neal Date: 2018-07-24T07:26:58Z speed up sort when the dataset is small --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org