[GitHub] spark pull request #22612: [SPARK-24958] Add executors' process tree total m...
Github user yucai commented on a diff in the pull request: https://github.com/apache/spark/pull/22612#discussion_r237737064 --- Diff: core/src/main/scala/org/apache/spark/executor/ProcfsMetricsGetter.scala --- @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.executor + +import java.io._ +import java.nio.charset.Charset +import java.nio.file.{Files, Paths} +import java.util.Locale + +import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer +import scala.util.Try + +import org.apache.spark.{SparkEnv, SparkException} +import org.apache.spark.internal.{config, Logging} +import org.apache.spark.util.Utils + + +private[spark] case class ProcfsMetrics( +jvmVmemTotal: Long, +jvmRSSTotal: Long, +pythonVmemTotal: Long, +pythonRSSTotal: Long, +otherVmemTotal: Long, +otherRSSTotal: Long) + +// Some of the ideas here are taken from the ProcfsBasedProcessTree class in hadoop +// project. +private[spark] class ProcfsMetricsGetter( +val procfsDir: String = "/proc/", +val pSizeForTest: Long = 0) extends Logging { + val procfsStatFile = "stat" + val testing = sys.env.contains("SPARK_TESTING") || sys.props.contains("spark.testing") + val pageSize = computePageSize() + var isAvailable: Boolean = isProcfsAvailable + private val pid = computePid() + + private lazy val isProcfsAvailable: Boolean = { +if (testing) { + true +} +else { + val procDirExists = Try(Files.exists(Paths.get(procfsDir))).recover { +case ioe: IOException => + logWarning("Exception checking for procfs dir", ioe) + false + } + val shouldLogStageExecutorMetrics = +SparkEnv.get.conf.get(config.EVENT_LOG_STAGE_EXECUTOR_METRICS) + val shouldLogStageExecutorProcessTreeMetrics = +SparkEnv.get.conf.get(config.EVENT_LOG_PROCESS_TREE_METRICS) + procDirExists.get && shouldLogStageExecutorProcessTreeMetrics && shouldLogStageExecutorMetrics +} + } + + private def computePid(): Int = { +if (!isAvailable || testing) { + return -1; +} +try { + // This can be simplified in java9: + // https://docs.oracle.com/javase/9/docs/api/java/lang/ProcessHandle.html + val cmd = Array("bash", "-c", "echo $PPID") + val out = Utils.executeAndGetOutput(cmd) + val pid = Integer.parseInt(out.split("\n")(0)) + return pid; +} +catch { + case e: SparkException => +logWarning("Exception when trying to compute process tree." + + " As a result reporting of ProcessTree metrics is stopped", e) +isAvailable = false +-1 +} + } + + private def computePageSize(): Long = { +if (testing) { + return pSizeForTest; +} +try { + val cmd = Array("getconf", "PAGESIZE") + val out = Utils.executeAndGetOutput(cmd) + Integer.parseInt(out.split("\n")(0)) +} catch { + case e: Exception => +logWarning("Exception when trying to compute pagesize, as a" + + " result reporting of ProcessTree metrics is stopped") +isAvailable = false +0 +} + } + + private def computeProcessTree(): Set[Int] = { +if (!isAvailable || testing) { + return Set() +} +var ptree: Set[Int] = Set() +ptree += pid +val queue = mutable.Queue.empty[Int] +queue += pid +while( !queue.isEmpty ) { + val p = queue.dequeue() + val c = getChildPids(p) + if(!c.isEmpty) { +queue ++= c
[GitHub] spark pull request #22612: [SPARK-24958] Add executors' process tree total m...
Github user yucai commented on a diff in the pull request: https://github.com/apache/spark/pull/22612#discussion_r237736770 --- Diff: core/src/main/scala/org/apache/spark/executor/ProcfsMetricsGetter.scala --- @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.executor + +import java.io._ +import java.nio.charset.Charset +import java.nio.file.{Files, Paths} +import java.util.Locale + +import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer +import scala.util.Try + +import org.apache.spark.{SparkEnv, SparkException} +import org.apache.spark.internal.{config, Logging} +import org.apache.spark.util.Utils + + +private[spark] case class ProcfsMetrics( +jvmVmemTotal: Long, +jvmRSSTotal: Long, +pythonVmemTotal: Long, +pythonRSSTotal: Long, +otherVmemTotal: Long, +otherRSSTotal: Long) + +// Some of the ideas here are taken from the ProcfsBasedProcessTree class in hadoop +// project. +private[spark] class ProcfsMetricsGetter( +val procfsDir: String = "/proc/", +val pSizeForTest: Long = 0) extends Logging { + val procfsStatFile = "stat" + val testing = sys.env.contains("SPARK_TESTING") || sys.props.contains("spark.testing") + val pageSize = computePageSize() + var isAvailable: Boolean = isProcfsAvailable + private val pid = computePid() + + private lazy val isProcfsAvailable: Boolean = { +if (testing) { + true +} +else { + val procDirExists = Try(Files.exists(Paths.get(procfsDir))).recover { +case ioe: IOException => + logWarning("Exception checking for procfs dir", ioe) + false + } + val shouldLogStageExecutorMetrics = +SparkEnv.get.conf.get(config.EVENT_LOG_STAGE_EXECUTOR_METRICS) + val shouldLogStageExecutorProcessTreeMetrics = +SparkEnv.get.conf.get(config.EVENT_LOG_PROCESS_TREE_METRICS) + procDirExists.get && shouldLogStageExecutorProcessTreeMetrics && shouldLogStageExecutorMetrics +} + } + + private def computePid(): Int = { +if (!isAvailable || testing) { + return -1; +} +try { + // This can be simplified in java9: + // https://docs.oracle.com/javase/9/docs/api/java/lang/ProcessHandle.html + val cmd = Array("bash", "-c", "echo $PPID") + val out = Utils.executeAndGetOutput(cmd) + val pid = Integer.parseInt(out.split("\n")(0)) + return pid; +} +catch { + case e: SparkException => +logWarning("Exception when trying to compute process tree." + + " As a result reporting of ProcessTree metrics is stopped", e) +isAvailable = false +-1 +} + } + + private def computePageSize(): Long = { +if (testing) { + return pSizeForTest; +} +try { + val cmd = Array("getconf", "PAGESIZE") + val out = Utils.executeAndGetOutput(cmd) + Integer.parseInt(out.split("\n")(0)) +} catch { + case e: Exception => +logWarning("Exception when trying to compute pagesize, as a" + + " result reporting of ProcessTree metrics is stopped") +isAvailable = false +0 +} + } + + private def computeProcessTree(): Set[Int] = { +if (!isAvailable || testing) { + return Set() +} +var ptree: Set[Int] = Set() +ptree += pid +val queue = mutable.Queue.empty[Int] +queue += pid +while( !queue.isEmpty ) { + val p = queue.dequeue() + val c = getChildPids(p) + if(!c.isEmpty) { +queue ++= c
[GitHub] spark pull request #22965: [SPARK-25964][SQL][Minor] Revise OrcReadBenchmark...
Github user yucai commented on a diff in the pull request: https://github.com/apache/spark/pull/22965#discussion_r231908870 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcReadBenchmark.scala --- @@ -32,9 +32,11 @@ import org.apache.spark.sql.types._ * Benchmark to measure ORC read performance. * {{{ * To run this benchmark: - * 1. without sbt: bin/spark-submit --class - * 2. build/sbt "sql/test:runMain " - * 3. generate result: SPARK_GENERATE_BENCHMARK_FILES=1 build/sbt "sql/test:runMain " + * 1. without sbt: bin/spark-submit --class + *--jars --- End diff -- Thanks @gengliangwang ! I also find some other `Benchmark`'s jar is wrong, for example: ``` UDTSerializationBenchmark: * 1. without sbt: bin/spark-submit --class ``` I will make a PR to update them. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22823: [SPARK-25676][SQL][TEST] Rename and refactor Benc...
Github user yucai commented on a diff in the pull request: https://github.com/apache/spark/pull/22823#discussion_r231771399 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/WideTableBenchmark.scala --- @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.benchmark + +import org.apache.spark.benchmark.Benchmark +import org.apache.spark.sql.internal.SQLConf + +/** + * Benchmark to measure performance for wide table. + * {{{ + * To run this benchmark: + * 1. without sbt: bin/spark-submit --class + *--jars , + * 2. build/sbt "sql/test:runMain " + * 3. generate result: SPARK_GENERATE_BENCHMARK_FILES=1 build/sbt "sql/test:runMain " + * Results will be written to "benchmarks/WideTableBenchmark-results.txt". + * }}} + */ +object WideTableBenchmark extends SqlBasedBenchmark { + + override def runBenchmarkSuite(mainArgs: Array[String]): Unit = { +runBenchmark("projection on wide table") { + val N = 1 << 20 + val df = spark.range(N) + val columns = (0 until 400).map{ i => s"id as id$i"} + val benchmark = new Benchmark("projection on wide table", N, output = output) + Seq("10", "100", "1024", "2048", "4096", "8192", "65536").foreach { n => +benchmark.addCase(s"split threshold $n", numIters = 5) { iter => + withSQLConf(SQLConf.CODEGEN_METHOD_SPLIT_THRESHOLD.key -> n) { +df.selectExpr(columns: _*).foreach(identity(_)) --- End diff -- I see, thanks! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22823: [SPARK-25676][SQL][TEST] Rename and refactor Benc...
Github user yucai commented on a diff in the pull request: https://github.com/apache/spark/pull/22823#discussion_r231198094 --- Diff: sql/core/benchmarks/WideTableBenchmark-results.txt --- @@ -0,0 +1,17 @@ + +projection on wide table + + +OpenJDK 64-Bit Server VM 1.8.0_191-b12 on Linux 3.10.0-862.3.2.el7.x86_64 +Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz +projection on wide table:Best/Avg Time(ms)Rate(M/s) Per Row(ns) Relative + +split threshold 10 38932 / 39307 0.0 37128.1 1.0X +split threshold 100 31991 / 32556 0.0 30508.8 1.2X +split threshold 102410993 / 11041 0.1 10483.5 3.5X +split threshold 2048 8959 / 8998 0.1 8543.8 4.3X --- End diff -- I tested it with openjdk (in one Linux VM), `2048` is also the best. ``` projection on wide table OpenJDK 64-Bit Server VM 1.8.0_171-b10 on Linux 3.10.0-693.11.1.el7.x86_64 Intel Core Processor (Haswell) projection on wide table:Best/Avg Time(ms)Rate(M/s) Per Row(ns) Relative split threshold 10 23995 / 25673 0.0 22883.7 1.0X split threshold 100 12881 / 13419 0.1 12284.3 1.9X split threshold 1024 6435 / 7402 0.2 6137.2 3.7X split threshold 2048 5861 / 6766 0.2 5589.2 4.1X split threshold 4096 6464 / 7825 0.2 6164.6 3.7X split threshold 8192 7886 / 8742 0.1 7520.7 3.0X split threshold 65536 46143 / 48029 0.0 44005.6 0.5X ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22823: [SPARK-25676][SQL][TEST] Rename and refactor BenchmarkWi...
Github user yucai commented on the issue: https://github.com/apache/spark/pull/22823 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22823: [SPARK-25676][SQL][TEST] Rename and refactor Benc...
Github user yucai commented on a diff in the pull request: https://github.com/apache/spark/pull/22823#discussion_r231116690 --- Diff: sql/core/benchmarks/WideTableBenchmark-results.txt --- @@ -0,0 +1,17 @@ + +projection on wide table + + +OpenJDK 64-Bit Server VM 1.8.0_191-b12 on Linux 3.10.0-862.3.2.el7.x86_64 +Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz +projection on wide table:Best/Avg Time(ms)Rate(M/s) Per Row(ns) Relative + +split threshold 10 38932 / 39307 0.0 37128.1 1.0X +split threshold 100 31991 / 32556 0.0 30508.8 1.2X +split threshold 102410993 / 11041 0.1 10483.5 3.5X +split threshold 2048 8959 / 8998 0.1 8543.8 4.3X --- End diff -- @dongjoon-hyun In my mac, at most case, `2048` is the best. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22823: [SPARK-25676][SQL][TEST] Refactor BenchmarkWideTable to ...
Github user yucai commented on the issue: https://github.com/apache/spark/pull/22823 @dongjoon-hyun Just push the rebased version, thanks! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22847: [SPARK-25850][SQL] Make the split threshold for the code...
Github user yucai commented on the issue: https://github.com/apache/spark/pull/22847 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22847: [SPARK-25850][SQL] Make the split threshold for the code...
Github user yucai commented on the issue: https://github.com/apache/spark/pull/22847 @cloud-fan @rednaxelafx I missed that! Please help review. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22847: [SPARK-25850][SQL] Make the split threshold for the code...
Github user yucai commented on the issue: https://github.com/apache/spark/pull/22847 @cloud-fan @gatorsmile How about merging this PR first? And then we can dissuss those performance issue in other PR? 1. One PR to improve WideTableBenchmark #22823 WIP. 2. One PR to add more tests in WideTableBenchmark. 3. If we can figure out good split threshold based on 2, another PR to update that value. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22847: [SPARK-25850][SQL] Make the split threshold for t...
Github user yucai commented on a diff in the pull request: https://github.com/apache/spark/pull/22847#discussion_r230248773 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -812,6 +812,17 @@ object SQLConf { .intConf .createWithDefault(65535) + val CODEGEN_METHOD_SPLIT_THRESHOLD = buildConf("spark.sql.codegen.methodSplitThreshold") +.internal() +.doc("The threshold of source code length without comment of a single Java function by " + + "codegen to be split. When the generated Java function source code exceeds this threshold" + + ", it will be split into multiple small functions. We can't know how many bytecode will " + + "be generated, so use the code length as metric. A function's bytecode should not go " + + "beyond 8KB, otherwise it will not be JITted; it also should not be too small, otherwise " + + "there will be many function calls.") +.intConf --- End diff -- @rednaxelafx the wide table benchmark I used has 400 columns, whole stage codegen is disabled by default. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22847: [SPARK-25850][SQL] Make the split threshold for the code...
Github user yucai commented on the issue: https://github.com/apache/spark/pull/22847 I used the WideTableBenchmark to test this configuration. 4 scenarioes are tested, `2048` is always better than `1024`, overall it is also good and looks more safe to avoid hitting 8KB limitaion. **Scenario** 1. projection on wide table: simple ``` val N = 1 << 20 val df = spark.range(N) val columns = (0 until 400).map{ i => s"id as id$i"} df.selectExpr(columns: _*).foreach(identity(_)) ``` 2. projection on wide table: long alias names ``` val longName = "averylongaliasname" * 20 val columns = (0 until 400).map{ i => s"id as ${longName}_id$i"} df.selectExpr(columns: _*).foreach(identity(_)) ``` 3. projection on wide table: many complex expressions ``` // 400 columns, whole stage codegen is disabled for spark.sql.codegen.maxFields val columns = (0 until 400).map{ i => s"case when id = $i then $i else 800 end as id$i"} df.selectExpr(columns: _*).foreach(identity(_)) ``` 4. projection on wide table: a big complex expressions ``` // Because of spark.sql.subexpressionElimination.enabled, // the whole case when codes will be put into one function, // and it will be invoked once only. val columns = (0 until 400).map{ i => s"case when id = ${N + 1} then 1 when id = ${N + 2} then 1 ... when id = ${N + 6} then 1 else sqrt(N) end as id$i"} df.selectExpr(columns: _*).foreach(identity(_)) ``` **Perf Results** ``` projection on wide table: simple Java HotSpot(TM) 64-Bit Server VM 1.8.0_162-b12 on Mac OS X 10.13.6 Intel(R) Core(TM) i7-7820HQ CPU @ 2.90GHz projection on wide table: simple:Best/Avg Time(ms)Rate(M/s) Per Row(ns) Relative split threshold 107553 / 7676 0.1 7202.7 1.0X split threshold 100 5463 / 5504 0.2 5210.0 1.4X split threshold 1024 2981 / 3017 0.4 2843.0 2.5X split threshold 2048 2857 / 2897 0.4 2724.2 2.6X split threshold 4096 3128 / 3187 0.3 2983.3 2.4X split threshold 8196 3755 / 3793 0.3 3581.3 2.0X split threshold 65536 27616 / 27685 0.0 26336.2 0.3X projection on wide table: long alias names Java HotSpot(TM) 64-Bit Server VM 1.8.0_162-b12 on Mac OS X 10.13.6 Intel(R) Core(TM) i7-7820HQ CPU @ 2.90GHz projection on wide table: long alias names: Best/Avg Time(ms)Rate(M/s) Per Row(ns) Relative split threshold 107513 / 7566 0.1 7164.6 1.0X split threshold 100 5363 / 5410 0.2 5114.4 1.4X split threshold 1024 2966 / 2998 0.4 2828.3 2.5X split threshold 2048 2840 / 2864 0.4 2708.0 2.6X split threshold 4096 3126 / 3166 0.3 2981.2 2.4X split threshold 8196 3756 / 3823 0.3 3582.3 2.0X split threshold 65536 27542 / 27729 0.0 26266.4 0.3X projection on wide table: many complex expressions Java HotSpot(TM) 64-Bit Server VM 1.8.0_162-b12 on Mac OS X 10.13.6 Intel(R) Core(TM) i7-7820HQ CPU @ 2.90GHz projection on wide table: complex expressions 1: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative split threshold 108758 / 9007 0.1 8352.3 1.0X split threshold 1
[GitHub] spark pull request #22847: [SPARK-25850][SQL] Make the split threshold for t...
Github user yucai commented on a diff in the pull request: https://github.com/apache/spark/pull/22847#discussion_r229919857 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -812,6 +812,17 @@ object SQLConf { .intConf .createWithDefault(65535) + val CODEGEN_METHOD_SPLIT_THRESHOLD = buildConf("spark.sql.codegen.methodSplitThreshold") +.internal() +.doc("The threshold of source code length without comment of a single Java function by " + + "codegen to be split. When the generated Java function source code exceeds this threshold" + + ", it will be split into multiple small functions. We can't know how many bytecode will " + + "be generated, so use the code length as metric. A function's bytecode should not go " + + "beyond 8KB, otherwise it will not be JITted; it also should not be too small, otherwise " + + "there will be many function calls.") +.intConf --- End diff -- Seems like long alias names have no influence. ``` [info] Java HotSpot(TM) 64-Bit Server VM 1.8.0_162-b12 on Mac OS X 10.13.6 [info] Intel(R) Core(TM) i7-7820HQ CPU @ 2.90GHz [info] projection on wide table:Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative [info] [info] split threshold 106512 / 6736 0.26210.4 1.0X [info] split threshold 100 5730 / 6329 0.25464.9 1.1X [info] split threshold 1024 3119 / 3184 0.32974.6 2.1X [info] split threshold 2048 2981 / 3100 0.42842.9 2.2X [info] split threshold 4096 3289 / 3379 0.33136.6 2.0X [info] split threshold 8196 4307 / 4338 0.24108.0 1.5X [info] split threshold 65536 29147 / 30212 0.0 27797.0 0.2X ``` No `averylongprefixrepeatedmultipletimes` in the **expression code gen**: ``` /* 047 */ private void createExternalRow_0_8(InternalRow i, Object[] values_0) { /* 048 */ /* 049 */ // input[80, bigint, false] /* 050 */ long value_81 = i.getLong(80); /* 051 */ if (false) { /* 052 */ values_0[80] = null; /* 053 */ } else { /* 054 */ values_0[80] = value_81; /* 055 */ } /* 056 */ /* 057 */ // input[81, bigint, false] /* 058 */ long value_82 = i.getLong(81); /* 059 */ if (false) { /* 060 */ values_0[81] = null; /* 061 */ } else { /* 062 */ values_0[81] = value_82; /* 063 */ } /* 064 */ /* 065 */ // input[82, bigint, false] /* 066 */ long value_83 = i.getLong(82); /* 067 */ if (false) { /* 068 */ values_0[82] = null; /* 069 */ } else { /* 070 */ values_0[82] = value_83; /* 071 */ } /* 072 */ ... ``` My benchmark: ``` object WideTableBenchmark extends SqlBasedBenchmark { override def runBenchmarkSuite(mainArgs: Array[String]): Unit = { runBenchmark("projection on wide table") { val N = 1 << 20 val df = spark.range(N) val columns = (0 until 400).map{ i => s"id as averylongprefixrepeatedmultipletimes_id$i"} val benchmark = new Benchmark("projection on wide table", N, output = output) Seq("10", "100", "1024", "2048", "4096", "8196", "65536").foreach { n => benchmark.addCase(s"split threshold $n", numIters = 5) { iter => withSQLConf("spark.testing.codegen.splitThreshold" -> n) { df.selectExpr(columns: _*).foreach(identity(_)) } } } benchmark.run() } } } ``` Will keep benchmarking for the complex expression. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22847: [SPARK-25850][SQL] Make the split threshold for t...
Github user yucai commented on a diff in the pull request: https://github.com/apache/spark/pull/22847#discussion_r229638917 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -812,6 +812,17 @@ object SQLConf { .intConf .createWithDefault(65535) + val CODEGEN_METHOD_SPLIT_THRESHOLD = buildConf("spark.sql.codegen.methodSplitThreshold") +.internal() +.doc("The threshold of source code length without comment of a single Java function by " + + "codegen to be split. When the generated Java function source code exceeds this threshold" + + ", it will be split into multiple small functions. We can't know how many bytecode will " + + "be generated, so use the code length as metric. A function's bytecode should not go " + + "beyond 8KB, otherwise it will not be JITted; it also should not be too small, otherwise " + + "there will be many function calls.") +.intConf --- End diff -- @kiszk agree, `1000` might be not the best, see my benchmark for the wide table, `2048` is better. ``` projection on wide table Java HotSpot(TM) 64-Bit Server VM 1.8.0_162-b12 on Mac OS X 10.13.6 Intel(R) Core(TM) i7-7820HQ CPU @ 2.90GHz projection on wide table:Best/Avg Time(ms)Rate(M/s) Per Row(ns) Relative split threshold 108464 / 8737 0.1 8072.0 1.0X split threshold 100 5959 / 6251 0.2 5683.4 1.4X split threshold 1024 3202 / 3248 0.3 3053.2 2.6X split threshold 2048 3009 / 3097 0.3 2869.2 2.8X split threshold 4096 3414 / 3458 0.3 3256.1 2.5X split threshold 8196 4095 / 4112 0.3 3905.5 2.1X split threshold 65536 28800 / 29705 0.0 27465.8 0.3X ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22879: [SPARK-25872][SQL][TEST] Add an optimizer tracker for TP...
Github user yucai commented on the issue: https://github.com/apache/spark/pull/22879 `tpcdsQueries` and `tpcdsQueriesV2_7` are duplicated to `TPCDSQueryBenchmark`'s, should we maintain them together? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22847: [SPARK-25850][SQL] Make the split threshold for the code...
Github user yucai commented on the issue: https://github.com/apache/spark/pull/22847 @cloud-fan @dongjoon-hyun @kiszk I just add a negative check, maybe we need another PR to figure better value later if it is not easy to decide now. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22861: [SPARK-25663][SPARK-25661][SQL][TEST] Refactor BuiltInDa...
Github user yucai commented on the issue: https://github.com/apache/spark/pull/22861 @dongjoon-hyun Tests have been passed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22861: [SPARK-25663][SPARK-25661][SQL][TEST] Refactor Bu...
Github user yucai commented on a diff in the pull request: https://github.com/apache/spark/pull/22861#discussion_r229162191 --- Diff: external/avro/src/test/scala/org/apache/spark/sql/execution/benchmark/AvroWriteBenchmark.scala --- @@ -19,22 +19,17 @@ package org.apache.spark.sql.execution.benchmark /** * Benchmark to measure Avro data sources write performance. - * Usage: - * 1. with spark-submit: bin/spark-submit --class - * 2. with sbt: build/sbt "avro/test:runMain " + * To run this benchmark: + * {{{ + * 1. without sbt: bin/spark-submit --class --- End diff -- @yucai Good catch! I think it needs ``, added. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22845: [SPARK-25848][SQL][TEST] Refactor CSVBenchmarks t...
Github user yucai commented on a diff in the pull request: https://github.com/apache/spark/pull/22845#discussion_r229011040 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVBenchmarks.scala --- @@ -137,22 +124,15 @@ object CSVBenchmarks extends SQLHelper { ds.count() } - /* - Intel(R) Core(TM) i7-7700HQ CPU @ 2.80GHz - - Count a dataset with 10 columns: Best/Avg Time(ms)Rate(M/s) Per Row(ns) Relative - - - Select 10 columns + count() 12598 / 12740 0.8 1259.8 1.0X - Select 1 column + count() 7960 / 8175 1.3 796.0 1.6X - count()2332 / 2386 4.3 233.2 5.4X - */ benchmark.run() } } - def main(args: Array[String]): Unit = { -quotedValuesBenchmark(rowsNum = 50 * 1000, numIters = 3) -multiColumnsBenchmark(rowsNum = 1000 * 1000) -countBenchmark(10 * 1000 * 1000) + override def runBenchmarkSuite(): Unit = { --- End diff -- #22872 has updated `runBenchmarkSuite`'s signature. ```suggestion override def runBenchmarkSuite(mainArgs: Array[String]): Unit = { ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22844: [SPARK-25847][SQL][TEST] Refactor JSONBenchmarks ...
Github user yucai commented on a diff in the pull request: https://github.com/apache/spark/pull/22844#discussion_r229010476 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonBenchmarks.scala --- @@ -195,23 +170,16 @@ object JSONBenchmarks extends SQLHelper { ds.count() } - /* - Intel(R) Core(TM) i7-7700HQ CPU @ 2.80GHz - - Count a dataset with 10 columns: Best/Avg Time(ms)Rate(M/s) Per Row(ns) Relative - - - Select 10 columns + count() 9961 / 10006 1.0 996.1 1.0X - Select 1 column + count() 8355 / 8470 1.2 835.5 1.2X - count()2104 / 2156 4.8 210.4 4.7X - */ benchmark.run() } } - def main(args: Array[String]): Unit = { -schemaInferring(100 * 1000 * 1000) -perlineParsing(100 * 1000 * 1000) -perlineParsingOfWideColumn(10 * 1000 * 1000) -countBenchmark(10 * 1000 * 1000) + override def runBenchmarkSuite(): Unit = { --- End diff -- #22872 has updated `runBenchmarkSuite`'s signature. ```suggestion override def runBenchmarkSuite(mainArgs: Array[String]): Unit = { ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22861: [SPARK-25663][SPARK-25661][SQL][TEST] Refactor BuiltInDa...
Github user yucai commented on the issue: https://github.com/apache/spark/pull/22861 @dongjoon-hyun I used #22872 to make main args accessible for `BenchmarkBase`'s subclass, this PR is mainly for refactoring `DataSourceWriteBenchmark` and `BuiltInDataSourceWriteBenchmark`. But it will lead to `AvroWriteBenchmark` refactor, so I put them in one PR, kindly let me know if you have better idea. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22872: [SPARK-25864][SQL][TEST] Make main args accessible for B...
Github user yucai commented on the issue: https://github.com/apache/spark/pull/22872 @HyukjinKwon @cloud-fan My previous tests have been passed :). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22847: [SPARK-25850][SQL] Make the split threshold for the code...
Github user yucai commented on the issue: https://github.com/apache/spark/pull/22847 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22861: [SPARK-25663][SPARK-25661][SQL][TEST] Refactor BuiltInDa...
Github user yucai commented on the issue: https://github.com/apache/spark/pull/22861 Current implementation misses main args, but some suite would need it anyway. Let's can discuss #22872. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22872: [SPARK-25864][SQL][TEST] Make main args accessible for B...
Github user yucai commented on the issue: https://github.com/apache/spark/pull/22872 @gengliangwang @dongjoon-hyun @cloud-fan --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22872: [SPARK-25864][SQL][TEST] Make main args accessibl...
GitHub user yucai opened a pull request: https://github.com/apache/spark/pull/22872 [SPARK-25864][SQL][TEST] Make main args accessible for BenchmarkBase's subclass ## What changes were proposed in this pull request? Set main args correctly in BenchmarkBase, to make it accessible for its subclass. It will benefit: - BuiltInDataSourceWriteBenchmark - AvroWriteBenchmark ## How was this patch tested? manual tests You can merge this pull request into a Git repository by running: $ git pull https://github.com/yucai/spark main_args Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/22872.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #22872 commit 269632148cac72c1e19a6d843a1ec647dfd10bf4 Author: yucai Date: 2018-10-29T05:45:02Z [SPARK-25864][SQL][TEST] Make main args set correctly in BenchmarkBase --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22847: [SPARK-25850][SQL] Make the split threshold for t...
Github user yucai commented on a diff in the pull request: https://github.com/apache/spark/pull/22847#discussion_r228788123 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -812,6 +812,17 @@ object SQLConf { .intConf .createWithDefault(65535) + val CODEGEN_METHOD_SPLIT_THRESHOLD = buildConf("spark.sql.codegen.methodSplitThreshold") +.internal() +.doc("The threshold of source code length without comment of a single Java function by " + + "codegen to be split. When the generated Java function source code exceeds this threshold" + + ", it will be split into multiple small functions. We can't know how many bytecode will " + + "be generated, so use the code length as metric. A function's bytecode should not go " + + "beyond 8KB, otherwise it will not be JITted; it also should not be too small, otherwise " + + "there will be many function calls.") +.intConf --- End diff -- To be more accurately, I think I should add `When running on HotSpot, a function's bytecode should not go beyond 8KB...`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22861: [SPARK-25663][SPARK-25661][SQL][TEST] Refactor BuiltInDa...
Github user yucai commented on the issue: https://github.com/apache/spark/pull/22861 @dongjoon-hyun Originally, I want to do two things in this PR. 1. Make `mainArgs` correctly set in `BenchmarkBase`. 2. Include an example to use `mainArgs`: refactor `DataSourceWriteBenchmark` and `BuiltInDataSourceWriteBenchmark` to use main method. But, refactor `DataSourceWriteBenchmark` will lead to refactor `AvroWriteBenchmark`. Any suggestion how to split PR? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22861: [SPARK-25663][SPARK-25661][SQL][TEST] Refactor BuiltInDa...
Github user yucai commented on the issue: https://github.com/apache/spark/pull/22861 @gengliangwang @wangyum @dongjoon-hyun help review. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22861: [SPARK-25663][SPARK-25661][SQL][TEST] Refactor Bu...
GitHub user yucai opened a pull request: https://github.com/apache/spark/pull/22861 [SPARK-25663][SPARK-25661][SQL][TEST] Refactor BuiltInDataSourceWriteBenchmark, DataSourceWriteBenchmark and AvroWriteBenchmark to use main method ## What changes were proposed in this pull request? Refactor BuiltInDataSourceWriteBenchmark, DataSourceWriteBenchmark and AvroWriteBenchmark to use main method. ``` SPARK_GENERATE_BENCHMARK_FILES=1 build/sbt "sql/test:runMain org.apache.spark.sql.execution.benchmark.BuiltInDataSourceWriteBenchmark" SPARK_GENERATE_BENCHMARK_FILES=1 build/sbt "sql/test:runMain org.apache.spark.sql.execution.benchmark.BuiltInDataSourceWriteBenchmark Parquet ORC" SPARK_GENERATE_BENCHMARK_FILES=1 build/sbt "sql/test:runMain org.apache.spark.sql.execution.benchmark.AvroWriteBenchmark" ``` ## How was this patch tested? manual tests You can merge this pull request into a Git repository by running: $ git pull https://github.com/yucai/spark BuiltInDataSourceWriteBenchmark Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/22861.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #22861 commit 81fe383d4f1189c3a4a7bae32f8ca38d123e6d7d Author: yucai Date: 2018-10-26T05:40:29Z BuiltInDataSourceWriteBenchmark --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22823: [SPARK-25676][SQL][TEST] Improve BenchmarkWideTab...
Github user yucai commented on a diff in the pull request: https://github.com/apache/spark/pull/22823#discussion_r228715829 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/BenchmarkWideTable.scala --- @@ -1,52 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - *http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.execution.benchmark - -import org.apache.spark.benchmark.Benchmark - -/** - * Benchmark to measure performance for wide table. - * To run this: - * build/sbt "sql/test-only *benchmark.BenchmarkWideTable" - * - * Benchmarks in this file are skipped in normal builds. - */ -class BenchmarkWideTable extends BenchmarkWithCodegen { - - ignore("project on wide table") { -val N = 1 << 20 -val df = sparkSession.range(N) -val columns = (0 until 400).map{ i => s"id as id$i"} -val benchmark = new Benchmark("projection on wide table", N) -benchmark.addCase("wide table", numIters = 5) { iter => - df.selectExpr(columns : _*).queryExecution.toRdd.count() -} -benchmark.run() - -/** - * Here are some numbers with different split threshold: - * - * Split threshold methods Rate(M/s) Per Row(ns) - * 10 400 0.4 2279 - * 100 200 0.6 1554 - * 1k 370.9 1116 --- End diff -- @dongjoon-hyun I am working on #22847. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22847: [SPARK-25850][SQL] Make the split threshold for the code...
Github user yucai commented on the issue: https://github.com/apache/spark/pull/22847 @cloud-fan @dongjoon-hyun @gengliangwang Kindly help review. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22847: [SPARK-25850][SQL] Make the split threshold for t...
GitHub user yucai opened a pull request: https://github.com/apache/spark/pull/22847 [SPARK-25850][SQL] Make the split threshold for the code generated method configurable ## What changes were proposed in this pull request? As per the [discussion](https://github.com/apache/spark/pull/22823/files#r228400706), add a new configuration to make the split threshold for the code generated method configurable. When the generated Java function source code exceeds the split threshold, it will be split into multiple small functions, each function length is spark.sql.codegen.methodSplitThreshold. ## How was this patch tested? manual tests You can merge this pull request into a Git repository by running: $ git pull https://github.com/yucai/spark splitThreshold Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/22847.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #22847 commit 188a9476e3504c151ebe27b362a080469c262674 Author: yucai Date: 2018-10-26T08:00:24Z [SPARK-25850][SQL] Make the split threshold for the code generated method configurable --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22823: [SPARK-25676][SQL][TEST] Refactor BenchmarkWideTa...
Github user yucai commented on a diff in the pull request: https://github.com/apache/spark/pull/22823#discussion_r228387605 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala --- @@ -910,12 +910,14 @@ class CodegenContext { val blocks = new ArrayBuffer[String]() val blockBuilder = new StringBuilder() var length = 0 +val splitThreshold = + SQLConf.get.getConfString("spark.testing.codegen.splitThreshold", "1024").toInt --- End diff -- @wangyum Thanks for the suggestion! You prefer to modifying `CodeGenerator.scala` each time run this benchmark, right? I feel it could be kind of tricky to let user modify codes and if the `CodeGenerator.scala` changes in the future, it is hard to update the document here. @dongjoon-hyun @gengliangwang any suggestion? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22823: [SPARK-25676][SQL][TEST] Refactor BenchmarkWideTable to ...
Github user yucai commented on the issue: https://github.com/apache/spark/pull/22823 Thanks @wangyum for good suggestion! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22823: [SPARK-25676][SQL][TEST] Refactor BenchmarkWideTa...
GitHub user yucai opened a pull request: https://github.com/apache/spark/pull/22823 [SPARK-25676][SQL][TEST] Refactor BenchmarkWideTable to use main method ## What changes were proposed in this pull request? Refactor BenchmarkWideTable to use main method. Generate benchmark result: ``` SPARK_GENERATE_BENCHMARK_FILES=1 build/sbt "sql/test:runMain org.apache.spark.sql.execution.benchmark.BenchmarkWideTable" ``` ## How was this patch tested? manual tests You can merge this pull request into a Git repository by running: $ git pull https://github.com/yucai/spark BenchmarkWideTable Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/22823.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #22823 commit f8bce475682a18db7dee34e8a6fdfcde23166457 Author: yucai Date: 2018-10-21T09:18:07Z [SPARK-25676][SQL][TEST] Refactor BenchmarkWideTable to use main method --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19788: [SPARK-9853][Core] Optimize shuffle fetch of contiguous ...
Github user yucai commented on the issue: https://github.com/apache/spark/pull/19788 @cloud-fan , exception screenshot. Let me know if you want any change. ![image](https://user-images.githubusercontent.com/2989575/47471258-1793ce00-d83c-11e8-90bf-107865fc9032.png) ![image](https://user-images.githubusercontent.com/2989575/47471297-4316b880-d83c-11e8-8d65-004ba5c3fd18.png) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21156: [SPARK-24087][SQL] Avoid shuffle when join keys a...
Github user yucai commented on a diff in the pull request: https://github.com/apache/spark/pull/21156#discussion_r227268021 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/JoinUtils.scala --- @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.joins + +import org.apache.spark.sql.catalyst.expressions.Expression +import org.apache.spark.sql.catalyst.plans.physical.{ClusteredDistribution, Distribution, HashClusteredDistribution, HashPartitioning} +import org.apache.spark.sql.execution.{FileSourceScanExec, SparkPlan} +import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec + +object JoinUtils { + private def avoidShuffleIfPossible( + joinKeys: Seq[Expression], + expressions: Seq[Expression], + leftKeys: Seq[Expression], + rightKeys: Seq[Expression]): Seq[Distribution] = { +val indices = expressions.map(x => joinKeys.indexWhere(_.semanticEquals(x))) +HashClusteredDistribution(indices.map(leftKeys(_))) :: + HashClusteredDistribution(indices.map(rightKeys(_))) :: Nil + } + + def requiredChildDistributionForShuffledJoin( + partitioningDetection: Boolean, + leftKeys: Seq[Expression], + rightKeys: Seq[Expression], + left: SparkPlan, + right: SparkPlan): Seq[Distribution] = { +if (!partitioningDetection) { + return HashClusteredDistribution(leftKeys) :: HashClusteredDistribution(rightKeys) :: Nil +} + +val leftPartitioning = left.outputPartitioning +val rightPartitioning = right.outputPartitioning --- End diff -- @cloud-fan in this PR, `requiredChildDistribution` is always re-calculated each time it is invoked, could it be more precise than `EnsureRequirements.reorderJoinPredicates`? This kind of scenario is common, do we have a plan to improve the framework in 3.0? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22580: [SPARK-25508][SQL] Refactor OrcReadBenchmark to u...
GitHub user yucai opened a pull request: https://github.com/apache/spark/pull/22580 [SPARK-25508][SQL] Refactor OrcReadBenchmark to use main method ## What changes were proposed in this pull request? Refactor OrcReadBenchmark to use main method. Generate benchmark result: ``` SPARK_GENERATE_BENCHMARK_FILES=1 build/sbt "hive/test:runMain org.apache.spark.sql.hive.orc.OrcReadBenchmark" ``` ## How was this patch tested? manual tests You can merge this pull request into a Git repository by running: $ git pull https://github.com/yucai/spark SPARK-25508 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/22580.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #22580 commit 4b6ea8706f83d5995ef61938ac4367185cd00378 Author: yucai Date: 2018-09-21T13:26:29Z Refactor OrcReadBenchmark --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22493: [SPARK-25485][SQL][TEST] Refactor UnsafeProjectionBenchm...
Github user yucai commented on the issue: https://github.com/apache/spark/pull/22493 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22493: [SPARK-25485][SQL][TEST] Refactor UnsafeProjectionBenchm...
Github user yucai commented on the issue: https://github.com/apache/spark/pull/22493 @dongjoon-hyun could you help review? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22495: [SPARK-25486][TEST] Refactor SortBenchmark to use...
Github user yucai commented on a diff in the pull request: https://github.com/apache/spark/pull/22495#discussion_r219731873 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/SortBenchmark.scala --- @@ -28,12 +28,15 @@ import org.apache.spark.util.random.XORShiftRandom /** * Benchmark to measure performance for aggregate primitives. - * To run this: - * build/sbt "sql/test-only *benchmark.SortBenchmark" - * - * Benchmarks in this file are skipped in normal builds. + * {{{ + * To run this benchmark: + * 1. without sbt: bin/spark-submit --class + * 2. build/sbt "sql/test:runMain " + * 3. generate result: SPARK_GENERATE_BENCHMARK_FILES=1 build/sbt "sql/test:runMain " + * Results will be written to "benchmarks/-results.txt". + * }}} */ -class SortBenchmark extends BenchmarkWithCodegen { +object SortBenchmark extends BenchmarkBase { --- End diff -- @dongjoon-hyun `SortBenchmark` does not use any function provided in `BenchmarkWithCodegen`, so I remove it. Another option is like #22484 did, make `BenchmarkWithCodegen` extend `BenchmarkBase`, and then `SortBenchmark` can extend `BenchmarkWithCodegen`. Do you prefer the 2nd way? BTW, congratulations! :) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22490: [SPARK-25481][TEST] Refactor ColumnarBatchBenchma...
Github user yucai commented on a diff in the pull request: https://github.com/apache/spark/pull/22490#discussion_r219548887 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchBenchmark.scala --- @@ -30,8 +30,13 @@ import org.apache.spark.util.collection.BitSet /** * Benchmark to low level memory access using different ways to manage buffers. + * To run this benchmark: --- End diff -- Oh, I see, thanks! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22495: [SPARK-25486][TEST] Refactor SortBenchmark to use...
GitHub user yucai opened a pull request: https://github.com/apache/spark/pull/22495 [SPARK-25486][TEST] Refactor SortBenchmark to use main method ## What changes were proposed in this pull request? Refactor SortBenchmark to use main method. Generate benchmark result: ``` SPARK_GENERATE_BENCHMARK_FILES=1 build/sbt "sql/test:runMain org.apache.spark.sql.execution.benchmark.SortBenchmark" ``` ## How was this patch tested? manual tests You can merge this pull request into a Git repository by running: $ git pull https://github.com/yucai/spark SPARK-25486 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/22495.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #22495 commit c3c3d8bb5f7c0fc476c3960f12e53133efd3e2f2 Author: yucai Date: 2018-09-20T13:52:40Z Refactor SortBenchmark --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22493: [SPARK-25485][TEST] Refactor UnsafeProjectionBenc...
GitHub user yucai opened a pull request: https://github.com/apache/spark/pull/22493 [SPARK-25485][TEST] Refactor UnsafeProjectionBenchmark to use main method ## What changes were proposed in this pull request? Refactor `UnsafeProjectionBenchmark` to use main method. Generate benchmark result: ``` SPARK_GENERATE_BENCHMARK_FILES=1 build/sbt "sql/test:runMain org.apache.spark.sql.UnsafeProjectionBenchmark" ``` ## How was this patch tested? manual test You can merge this pull request into a Git repository by running: $ git pull https://github.com/yucai/spark SPARK-25485 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/22493.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #22493 commit 9ba1f582c230223be5ea0965d49d4f495f7ca876 Author: yucai Date: 2018-09-20T12:28:39Z [SPARK-25485][TEST] Refactor UnsafeProjectionBenchmark to use main method --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22490: [SPARK-25481][TEST] Refactor ColumnarBatchBenchmark to u...
Github user yucai commented on the issue: https://github.com/apache/spark/pull/22490 @wangyum Could you help review? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22490: [SPARK-25481][TEST] Refactor ColumnarBatchBenchma...
GitHub user yucai opened a pull request: https://github.com/apache/spark/pull/22490 [SPARK-25481][TEST] Refactor ColumnarBatchBenchmark to use main method ## What changes were proposed in this pull request? Refactor `ColumnarBatchBenchmark` to use main method. Generate benchmark result: ``` SPARK_GENERATE_BENCHMARK_FILES=1 build/sbt "sql/test:runMain org.apache.spark.sql.execution.vectorized.ColumnarBatchBenchmark" ``` ## How was this patch tested? manual tests You can merge this pull request into a Git repository by running: $ git pull https://github.com/yucai/spark SPARK-25481 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/22490.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #22490 commit 912348f005b392e8cfaf9076e023f487eb6b033f Author: yucai Date: 2018-09-20T10:44:10Z Refactor ColumnarBatchBenchmark to use main method commit 01b22e715d9743e3b3551fa14234c04d30862a41 Author: yucai Date: 2018-09-20T10:51:59Z minor --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21791: [SPARK-24925][SQL] input bytesRead metrics fluctuate fro...
Github user yucai commented on the issue: https://github.com/apache/spark/pull/21791 @kiszk I feel it is hard to add UT, do you have any idea? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21791: [SPARK-24925][SQL] input bytesRead metrics fluctuate fro...
Github user yucai commented on the issue: https://github.com/apache/spark/pull/21791 @kiszk I see #22324 has been solved, which is one of my PR's dependency actually (see https://issues.apache.org/jira/browse/SPARK-24925?focusedCommentId=16556818=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-16556818) , I am stuck for that for a long time, I will update this one recently. Thanks! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22197: [SPARK-25207][SQL] Case-insensitve field resolution for ...
Github user yucai commented on the issue: https://github.com/apache/spark/pull/22197 @cloud-fan, tests have passed. And I will use a followup PR to make it cleaner. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22197: [SPARK-25207][SQL] Case-insensitve field resoluti...
Github user yucai commented on a diff in the pull request: https://github.com/apache/spark/pull/22197#discussion_r213912108 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala --- @@ -1021,6 +1022,113 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex } } } + + test("SPARK-25207: Case-insensitive field resolution for pushdown when reading parquet") { +def createParquetFilter(caseSensitive: Boolean): ParquetFilters = { + new ParquetFilters(conf.parquetFilterPushDownDate, conf.parquetFilterPushDownTimestamp, +conf.parquetFilterPushDownDecimal, conf.parquetFilterPushDownStringStartWith, +conf.parquetFilterPushDownInFilterThreshold, caseSensitive) +} +val caseSensitiveParquetFilters = createParquetFilter(caseSensitive = true) +val caseInsensitiveParquetFilters = createParquetFilter(caseSensitive = false) + +def testCaseInsensitiveResolution( +schema: StructType, +expected: FilterPredicate, +filter: sources.Filter): Unit = { + val parquetSchema = new SparkToParquetSchemaConverter(conf).convert(schema) + + assertResult(Some(expected)) { +caseInsensitiveParquetFilters.createFilter(parquetSchema, filter) + } + assertResult(None) { +caseSensitiveParquetFilters.createFilter(parquetSchema, filter) + } +} + +val schema = StructType(Seq(StructField("cint", IntegerType))) + +testCaseInsensitiveResolution( + schema, FilterApi.eq(intColumn("cint"), null.asInstanceOf[Integer]), sources.IsNull("CINT")) + +testCaseInsensitiveResolution( + schema, + FilterApi.notEq(intColumn("cint"), null.asInstanceOf[Integer]), + sources.IsNotNull("CINT")) + +testCaseInsensitiveResolution( + schema, FilterApi.eq(intColumn("cint"), 1000: Integer), sources.EqualTo("CINT", 1000)) + +testCaseInsensitiveResolution( + schema, + FilterApi.notEq(intColumn("cint"), 1000: Integer), + sources.Not(sources.EqualTo("CINT", 1000))) + +testCaseInsensitiveResolution( + schema, FilterApi.eq(intColumn("cint"), 1000: Integer), sources.EqualNullSafe("CINT", 1000)) + +testCaseInsensitiveResolution( + schema, + FilterApi.notEq(intColumn("cint"), 1000: Integer), + sources.Not(sources.EqualNullSafe("CINT", 1000))) + +testCaseInsensitiveResolution( + schema, + FilterApi.lt(intColumn("cint"), 1000: Integer), sources.LessThan("CINT", 1000)) + +testCaseInsensitiveResolution( + schema, + FilterApi.ltEq(intColumn("cint"), 1000: Integer), + sources.LessThanOrEqual("CINT", 1000)) + +testCaseInsensitiveResolution( + schema, FilterApi.gt(intColumn("cint"), 1000: Integer), sources.GreaterThan("CINT", 1000)) + +testCaseInsensitiveResolution( + schema, + FilterApi.gtEq(intColumn("cint"), 1000: Integer), + sources.GreaterThanOrEqual("CINT", 1000)) + +testCaseInsensitiveResolution( + schema, + FilterApi.or( +FilterApi.eq(intColumn("cint"), 10: Integer), +FilterApi.eq(intColumn("cint"), 20: Integer)), + sources.In("CINT", Array(10, 20))) + +val dupFieldSchema = StructType( + Seq(StructField("cint", IntegerType), StructField("cINT", IntegerType))) +val dupParquetSchema = new SparkToParquetSchemaConverter(conf).convert(dupFieldSchema) +assertResult(None) { + caseInsensitiveParquetFilters.createFilter( +dupParquetSchema, sources.EqualTo("CINT", 1000)) +} + } + + test("SPARK-25207: exception when duplicate fields in case-insensitive mode") { +withTempPath { dir => + val tableName = "spark_25207" + val tableDir = dir.getAbsoluteFile + "/table" + withTable(tableName) { +withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") { + spark.range(10).selectExpr("id as A", "id as B", "id as b") +.write.mode("overwrite").parquet(tableDir) +} +sql( + s""" + |CREATE TABLE $tableName (A LONG, B LONG) USING PARQUET LOCATION '$tableDir' + """.stripMargin) + +
[GitHub] spark issue #22197: [SPARK-25207][SQL] Case-insensitve field resolution for ...
Github user yucai commented on the issue: https://github.com/apache/spark/pull/22197 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22197: [SPARK-25207][SQL] Case-insensitve field resolution for ...
Github user yucai commented on the issue: https://github.com/apache/spark/pull/22197 @cloud-fan I reverted to the previous version. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22197: [SPARK-25207][SQL] Case-insensitve field resolution for ...
Github user yucai commented on the issue: https://github.com/apache/spark/pull/22197 @dongjoon-hyun Sorry for the late response, description is changed to: > Although filter "ID < 100L" is generated by Spark, it fails to pushdown into parquet actually, Spark still does the full table scan when reading. > This PR provides a case-insensitive field resolution to make it work. Let me know if you have any suggestion :). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22197: [SPARK-25207][SQL] Case-insensitve field resolution for ...
Github user yucai commented on the issue: https://github.com/apache/spark/pull/22197 I will treat the above case as acceptable and will add a duplicated field check for the parquet schema. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22197: [SPARK-25207][SQL] Case-insensitve field resolution for ...
Github user yucai commented on the issue: https://github.com/apache/spark/pull/22197 Both `catalystRequestedSchema` and `parquetSchema` are recursive structure, is there the easy way to find duplicated fields? Thanks! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22197: [SPARK-25207][SQL] Case-insensitve field resolution for ...
Github user yucai commented on the issue: https://github.com/apache/spark/pull/22197 @cloud-fan I also think my way changes too much in this PR. > go through the parquet schema and find duplicated field names If user query only query non-duplicated field, this way also throws Exception. Like below: ``` spark.range(10).selectExpr("id as a", "id as b", "id as B").write.mode("overwrite").parquet("/tmp/data") sql("select a from t").collect ``` Is it acceptable? Or use another PR to do the refactor first? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22184: [SPARK-25132][SQL][DOC] Add migration doc for cas...
Github user yucai commented on a diff in the pull request: https://github.com/apache/spark/pull/22184#discussion_r213581563 --- Diff: docs/sql-programming-guide.md --- @@ -1895,6 +1895,10 @@ working with timestamps in `pandas_udf`s to get the best performance, see - Since Spark 2.4, File listing for compute statistics is done in parallel by default. This can be disabled by setting `spark.sql.parallelFileListingInStatsComputation.enabled` to `False`. - Since Spark 2.4, Metadata files (e.g. Parquet summary files) and temporary files are not counted as data files when calculating table size during Statistics computation. +## Upgrading From Spark SQL 2.3.1 to 2.3.2 and above + + - In version 2.3.1 and earlier, when reading from a Parquet table, Spark always returns null for any column whose column names in Hive metastore schema and Parquet schema are in different letter cases, no matter whether `spark.sql.caseSensitive` is set to true or false. Since 2.3.2, when `spark.sql.caseSensitive` is set to false, Spark does case insensitive column name resolution between Hive metastore schema and Parquet schema, so even column names are in different letter cases, Spark returns corresponding column values. An exception is thrown if there is ambiguity, i.e. more than one Parquet column is matched. --- End diff -- The testing is based on `spark.sql.hive.convertMetastoreParquet` is set false, so it should use Hive serde reader instead of Spark reader, sorry if it is too confusing here. I guess you mean 1 and 3 :). I understand now. If we are not going to backport the PR to 2.3, I can close SPARK-25206 also? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22197: [SPARK-25207][SQL] Case-insensitve field resoluti...
Github user yucai commented on a diff in the pull request: https://github.com/apache/spark/pull/22197#discussion_r213551202 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala --- @@ -350,25 +356,38 @@ private[parquet] class ParquetFilters( } /** - * Returns a map from name of the column to the data type, if predicate push down applies. + * Returns a map, which contains parquet field name and data type, if predicate push down applies. */ - private def getFieldMap(dataType: MessageType): Map[String, ParquetSchemaType] = dataType match { -case m: MessageType => - // Here we don't flatten the fields in the nested schema but just look up through - // root fields. Currently, accessing to nested fields does not push down filters - // and it does not support to create filters for them. - m.getFields.asScala.filter(_.isPrimitive).map(_.asPrimitiveType()).map { f => -f.getName -> ParquetSchemaType( - f.getOriginalType, f.getPrimitiveTypeName, f.getTypeLength, f.getDecimalMetadata) - }.toMap -case _ => Map.empty[String, ParquetSchemaType] + private def getFieldMap(dataType: MessageType): Map[String, ParquetField] = { +// Here we don't flatten the fields in the nested schema but just look up through +// root fields. Currently, accessing to nested fields does not push down filters +// and it does not support to create filters for them. +val primitiveFields = + dataType.getFields.asScala.filter(_.isPrimitive).map(_.asPrimitiveType()).map { f => +f.getName -> ParquetField(f.getName, + ParquetSchemaType(f.getOriginalType, +f.getPrimitiveTypeName, f.getTypeLength, f.getDecimalMetadata)) + } +if (caseSensitive) { + primitiveFields.toMap +} else { + // Don't consider ambiguity here, i.e. more than one field is matched in case insensitive + // mode, just skip pushdown for these fields, they will trigger Exception when reading, + // See: SPARK-25132. --- End diff -- @cloud-fan, it is a great idea, thanks! I think it is not to "dedup" before pushdown and pruning. Maybe we should do parquet schema clip before pushdown and pruning. If duplicated fields are detected, throw the exception. If not, pass clipped parquet schema via hadoopconf to parquet lib. ``` catalystRequestedSchema = { val conf = context.getConfiguration val schemaString = conf.get(ParquetReadSupport.SPARK_ROW_REQUESTED_SCHEMA) assert(schemaString != null, "Parquet requested schema not set.") StructType.fromString(schemaString) } val caseSensitive = context.getConfiguration.getBoolean(SQLConf.CASE_SENSITIVE.key, SQLConf.CASE_SENSITIVE.defaultValue.get) val parquetRequestedSchema = ParquetReadSupport.clipParquetSchema( context.getFileSchema, catalystRequestedSchema, caseSensitive) ``` I am trying this way, will update soon. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22184: [SPARK-25132][SQL][DOC] Add migration doc for cas...
Github user yucai commented on a diff in the pull request: https://github.com/apache/spark/pull/22184#discussion_r213519348 --- Diff: docs/sql-programming-guide.md --- @@ -1895,6 +1895,10 @@ working with timestamps in `pandas_udf`s to get the best performance, see - Since Spark 2.4, File listing for compute statistics is done in parallel by default. This can be disabled by setting `spark.sql.parallelFileListingInStatsComputation.enabled` to `False`. - Since Spark 2.4, Metadata files (e.g. Parquet summary files) and temporary files are not counted as data files when calculating table size during Statistics computation. +## Upgrading From Spark SQL 2.3.1 to 2.3.2 and above + + - In version 2.3.1 and earlier, when reading from a Parquet table, Spark always returns null for any column whose column names in Hive metastore schema and Parquet schema are in different letter cases, no matter whether `spark.sql.caseSensitive` is set to true or false. Since 2.3.2, when `spark.sql.caseSensitive` is set to false, Spark does case insensitive column name resolution between Hive metastore schema and Parquet schema, so even column names are in different letter cases, Spark returns corresponding column values. An exception is thrown if there is ambiguity, i.e. more than one Parquet column is matched. --- End diff -- @gatorsmile I think 1 and 2 are always consistent. They both use Spark reader. Am I wrong? > parquet table created by Spark (using parquet) read by Spark reader > parquet table created by Spark (using hive) read by Spark reader --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22184: [SPARK-25132][SQL][DOC] Add migration doc for cas...
Github user yucai commented on a diff in the pull request: https://github.com/apache/spark/pull/22184#discussion_r213386126 --- Diff: docs/sql-programming-guide.md --- @@ -1895,6 +1895,10 @@ working with timestamps in `pandas_udf`s to get the best performance, see - Since Spark 2.4, File listing for compute statistics is done in parallel by default. This can be disabled by setting `spark.sql.parallelFileListingInStatsComputation.enabled` to `False`. - Since Spark 2.4, Metadata files (e.g. Parquet summary files) and temporary files are not counted as data files when calculating table size during Statistics computation. +## Upgrading From Spark SQL 2.3.1 to 2.3.2 and above + + - In version 2.3.1 and earlier, when reading from a Parquet table, Spark always returns null for any column whose column names in Hive metastore schema and Parquet schema are in different letter cases, no matter whether `spark.sql.caseSensitive` is set to true or false. Since 2.3.2, when `spark.sql.caseSensitive` is set to false, Spark does case insensitive column name resolution between Hive metastore schema and Parquet schema, so even column names are in different letter cases, Spark returns corresponding column values. An exception is thrown if there is ambiguity, i.e. more than one Parquet column is matched. --- End diff -- > For Spark native parquet tables that were created by us, this is a bug fix because the previous work does not respect spark.sql.caseSensitive; for the parquet tables created by Hive, the field resolution should be consistent no matter whether it is using our reader or Hive parquet reader. @gatorsmile, need confirm with you, about consistent, we have some kinds of tables. 1. parquet table created by Spark (using parquet) read by Spark reader 2. parquet table created by Spark (using hive) read by Spark reader 3. parquet table created by Spark (using hive) read by Hive reader 4. parquet table created by Hive read by Spark reader 5. parquet table created by Hive read by Hive reader Do you want all of them to be consitent? Or 2,3,4,5 consitent is enough? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22197: [SPARK-25207][SQL] Case-insensitve field resolution for ...
Github user yucai commented on the issue: https://github.com/apache/spark/pull/22197 @dongjoon-hyun In the **schema matched case** as you listed, it is expected behavior in current master. ``` spark.sparkContext.hadoopConfiguration.setInt("parquet.block.size", 8 * 1024 * 1024) spark.range(1, 40 * 1024 * 1024, 1, 1).sortWithinPartitions("id").write.mode("overwrite").parquet("/tmp/t") sql("CREATE TABLE t (id LONG) USING parquet LOCATION '/tmp/t'") // master and 2.3 have different plan for top limit (see below), that's why 28.4 MB are read in master sql("select * from t where id < 100L").show() ``` This difference is probably introduced by #21573, @cloud-fan, current master read more data than 2.3 for top limit like in https://github.com/apache/spark/pull/22197#issuecomment-416085556 , is it a regression or not? Master: ![image](https://user-images.githubusercontent.com/2989575/44654328-e0639500-aa23-11e8-94c5-5ba8f9f87fd8.png) 2.3 branch: ![image](https://user-images.githubusercontent.com/2989575/44654336-e9ecfd00-aa23-11e8-89ec-a1c0085e2c36.png) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22197: [SPARK-25207][SQL] Case-insensitve field resolution for ...
Github user yucai commented on the issue: https://github.com/apache/spark/pull/22197 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22197: [SPARK-25207][SQL] Case-insensitve field resolution for ...
Github user yucai commented on the issue: https://github.com/apache/spark/pull/22197 @gatorsmile I can help check `spark.sql.caseSensitive` for all the built-in data sources. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22197: [SPARK-25207][SQL] Case-insensitve field resoluti...
Github user yucai commented on a diff in the pull request: https://github.com/apache/spark/pull/22197#discussion_r212814524 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala --- @@ -350,25 +356,38 @@ private[parquet] class ParquetFilters( } /** - * Returns a map from name of the column to the data type, if predicate push down applies. + * Returns a map, which contains parquet field name and data type, if predicate push down applies. */ - private def getFieldMap(dataType: MessageType): Map[String, ParquetSchemaType] = dataType match { -case m: MessageType => - // Here we don't flatten the fields in the nested schema but just look up through - // root fields. Currently, accessing to nested fields does not push down filters - // and it does not support to create filters for them. - m.getFields.asScala.filter(_.isPrimitive).map(_.asPrimitiveType()).map { f => -f.getName -> ParquetSchemaType( - f.getOriginalType, f.getPrimitiveTypeName, f.getTypeLength, f.getDecimalMetadata) - }.toMap -case _ => Map.empty[String, ParquetSchemaType] + private def getFieldMap(dataType: MessageType): Map[String, ParquetField] = { +// Here we don't flatten the fields in the nested schema but just look up through +// root fields. Currently, accessing to nested fields does not push down filters +// and it does not support to create filters for them. +val primitiveFields = + dataType.getFields.asScala.filter(_.isPrimitive).map(_.asPrimitiveType()).map { f => +f.getName -> ParquetField(f.getName, + ParquetSchemaType(f.getOriginalType, +f.getPrimitiveTypeName, f.getTypeLength, f.getDecimalMetadata)) + } +if (caseSensitive) { + primitiveFields.toMap +} else { + // Don't consider ambiguity here, i.e. more than one field is matched in case insensitive + // mode, just skip pushdown for these fields, they will trigger Exception when reading, + // See: SPARK-25132. --- End diff -- It is a good question! Let's see the scenario like below: 1. parquet file has duplicate fields "a INT, A INT". 2. user wants to pushdown "A > 0". Without dedup, we possible pushdown "a > 0" instead of "A > 0", although it is wrong, it will still trigger the Exception finally when reading parquet, so whether dedup or not, we will get the same result. @cloud-fan , @gatorsmile any idea? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22197: [SPARK-25207][SQL] Case-insensitve field resoluti...
Github user yucai commented on a diff in the pull request: https://github.com/apache/spark/pull/22197#discussion_r212813302 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala --- @@ -1021,6 +1022,116 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex } } } + + test("SPARK-25207: Case-insensitive field resolution for pushdown when reading parquet") { +val caseSensitiveParquetFilters = + new ParquetFilters(conf.parquetFilterPushDownDate, conf.parquetFilterPushDownTimestamp, +conf.parquetFilterPushDownDecimal, conf.parquetFilterPushDownStringStartWith, +conf.parquetFilterPushDownInFilterThreshold, caseSensitive = true) + +val caseInsensitiveParquetFilters = + new ParquetFilters(conf.parquetFilterPushDownDate, conf.parquetFilterPushDownTimestamp, +conf.parquetFilterPushDownDecimal, conf.parquetFilterPushDownStringStartWith, +conf.parquetFilterPushDownInFilterThreshold, caseSensitive = false) + +def testCaseInsensitiveResolution( +schema: StructType, +expected: FilterPredicate, +filter: sources.Filter): Unit = { + val parquetSchema = new SparkToParquetSchemaConverter(conf).convert(schema) + + assertResult(Some(expected)) { +caseInsensitiveParquetFilters.createFilter(parquetSchema, filter) + } + assertResult(None) { +caseSensitiveParquetFilters.createFilter(parquetSchema, filter) + } +} + +val schema = StructType(Seq(StructField("cint", IntegerType))) + +testCaseInsensitiveResolution( + schema, FilterApi.eq(intColumn("cint"), null.asInstanceOf[Integer]), sources.IsNull("CINT")) + +testCaseInsensitiveResolution( + schema, + FilterApi.notEq(intColumn("cint"), null.asInstanceOf[Integer]), + sources.IsNotNull("CINT")) + +testCaseInsensitiveResolution( + schema, FilterApi.eq(intColumn("cint"), 1000: Integer), sources.EqualTo("CINT", 1000)) + +testCaseInsensitiveResolution( + schema, + FilterApi.notEq(intColumn("cint"), 1000: Integer), + sources.Not(sources.EqualTo("CINT", 1000))) + +testCaseInsensitiveResolution( + schema, FilterApi.eq(intColumn("cint"), 1000: Integer), sources.EqualNullSafe("CINT", 1000)) + +testCaseInsensitiveResolution( + schema, + FilterApi.notEq(intColumn("cint"), 1000: Integer), + sources.Not(sources.EqualNullSafe("CINT", 1000))) + +testCaseInsensitiveResolution( + schema, + FilterApi.lt(intColumn("cint"), 1000: Integer), sources.LessThan("CINT", 1000)) + +testCaseInsensitiveResolution( + schema, + FilterApi.ltEq(intColumn("cint"), 1000: Integer), + sources.LessThanOrEqual("CINT", 1000)) + +testCaseInsensitiveResolution( + schema, FilterApi.gt(intColumn("cint"), 1000: Integer), sources.GreaterThan("CINT", 1000)) + +testCaseInsensitiveResolution( + schema, + FilterApi.gtEq(intColumn("cint"), 1000: Integer), + sources.GreaterThanOrEqual("CINT", 1000)) --- End diff -- Each test is corresponding to one line code change in `createFilter`. Like: ``` case sources.IsNull(name) if canMakeFilterOn(name, null) => makeEq.lift(fieldMap(name).schema).map(_(fieldMap(name).name, null)) ``` All tests together can cover all my change in `createFilter`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22197: [SPARK-25207][SQL] Case-insensitve field resoluti...
Github user yucai commented on a diff in the pull request: https://github.com/apache/spark/pull/22197#discussion_r212813240 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala --- @@ -1021,6 +1022,116 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex } } } + + test("SPARK-25207: Case-insensitive field resolution for pushdown when reading parquet") { +val caseSensitiveParquetFilters = + new ParquetFilters(conf.parquetFilterPushDownDate, conf.parquetFilterPushDownTimestamp, +conf.parquetFilterPushDownDecimal, conf.parquetFilterPushDownStringStartWith, +conf.parquetFilterPushDownInFilterThreshold, caseSensitive = true) + +val caseInsensitiveParquetFilters = + new ParquetFilters(conf.parquetFilterPushDownDate, conf.parquetFilterPushDownTimestamp, +conf.parquetFilterPushDownDecimal, conf.parquetFilterPushDownStringStartWith, +conf.parquetFilterPushDownInFilterThreshold, caseSensitive = false) --- End diff -- Good idea, thanks! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22197: [SPARK-25207][SQL] Case-insensitve field resoluti...
Github user yucai commented on a diff in the pull request: https://github.com/apache/spark/pull/22197#discussion_r212801680 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala --- @@ -1021,6 +1021,88 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex } } } + + test("SPARK-25132: Case-insensitive field resolution for pushdown when reading parquet") { +val caseSensitiveParquetFilters = + new ParquetFilters(conf.parquetFilterPushDownDate, conf.parquetFilterPushDownTimestamp, +conf.parquetFilterPushDownDecimal, conf.parquetFilterPushDownStringStartWith, +conf.parquetFilterPushDownInFilterThreshold, caseSensitive = true) +val caseInsensitiveParquetFilters = + new ParquetFilters(conf.parquetFilterPushDownDate, conf.parquetFilterPushDownTimestamp, +conf.parquetFilterPushDownDecimal, conf.parquetFilterPushDownStringStartWith, +conf.parquetFilterPushDownInFilterThreshold, caseSensitive = false) +def testCaseInsensitiveResolution( +schema: StructType, +expected: FilterPredicate, +filter: sources.Filter): Unit = { + val parquetSchema = new SparkToParquetSchemaConverter(conf).convert(schema) + + assertResult(Some(expected)) { +caseInsensitiveParquetFilters.createFilter(parquetSchema, filter) + } + assertResult(None) { +caseSensitiveParquetFilters.createFilter(parquetSchema, filter) + } +} + +val schema = StructType(Seq(StructField("cint", IntegerType))) + +testCaseInsensitiveResolution( + schema, FilterApi.eq(intColumn("cint"), null.asInstanceOf[Integer]), sources.IsNull("CINT")) + +testCaseInsensitiveResolution( + schema, + FilterApi.notEq(intColumn("cint"), null.asInstanceOf[Integer]), + sources.IsNotNull("CINT")) + +testCaseInsensitiveResolution( + schema, FilterApi.eq(intColumn("cint"), 1000: Integer), sources.EqualTo("CINT", 1000)) + +testCaseInsensitiveResolution( + schema, + FilterApi.notEq(intColumn("cint"), 1000: Integer), + sources.Not(sources.EqualTo("CINT", 1000))) + +testCaseInsensitiveResolution( + schema, FilterApi.eq(intColumn("cint"), 1000: Integer), sources.EqualNullSafe("CINT", 1000)) + +testCaseInsensitiveResolution( + schema, + FilterApi.notEq(intColumn("cint"), 1000: Integer), + sources.Not(sources.EqualNullSafe("CINT", 1000))) + +testCaseInsensitiveResolution( + schema, + FilterApi.lt(intColumn("cint"), 1000: Integer), sources.LessThan("CINT", 1000)) + +testCaseInsensitiveResolution( + schema, + FilterApi.ltEq(intColumn("cint"), 1000: Integer), + sources.LessThanOrEqual("CINT", 1000)) + +testCaseInsensitiveResolution( + schema, FilterApi.gt(intColumn("cint"), 1000: Integer), sources.GreaterThan("CINT", 1000)) + +testCaseInsensitiveResolution( + schema, + FilterApi.gtEq(intColumn("cint"), 1000: Integer), + sources.GreaterThanOrEqual("CINT", 1000)) + +testCaseInsensitiveResolution( + schema, + FilterApi.or( +FilterApi.eq(intColumn("cint"), 10: Integer), +FilterApi.eq(intColumn("cint"), 20: Integer)), + sources.In("CINT", Array(10, 20))) + +val dupFieldSchema = StructType( + Seq(StructField("cint", IntegerType), StructField("cINT", IntegerType))) +val dupParquetSchema = new SparkToParquetSchemaConverter(conf).convert(dupFieldSchema) +assertResult(None) { + caseInsensitiveParquetFilters.createFilter( +dupParquetSchema, sources.EqualTo("CINT", 1000)) +} --- End diff -- Added, thanks! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22197: [SPARK-25207][SQL] Case-insensitve field resolution for ...
Github user yucai commented on the issue: https://github.com/apache/spark/pull/22197 @cloud-fan @HyukjinKwon Seem cannot simply add `originalName` into `ParquetSchemaType`. Because we need exact ParquetSchemaType info for type match, like: ``` private val ParquetByteType = ParquetSchemaType(INT_8, INT32, 0, null) private val ParquetShortType = ParquetSchemaType(INT_16, INT32, 0, null) private val ParquetIntegerType = ParquetSchemaType(null, INT32, 0, null) ... private val makeEq: = { case ParquetByteType | ParquetShortType | ParquetIntegerType => ``` I use a new case class `ParquetField` to reduce the map. ``` private case class ParquetField( name: String, schema: ParquetSchemaType) ``` Let me know if you are OK with this way. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22197: [SPARK-25207][SQL] Case-insensitve field resoluti...
Github user yucai commented on a diff in the pull request: https://github.com/apache/spark/pull/22197#discussion_r212798970 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala --- @@ -350,25 +352,43 @@ private[parquet] class ParquetFilters( } /** - * Returns a map from name of the column to the data type, if predicate push down applies. + * Returns nameMap and typeMap based on different case sensitive mode, if predicate push --- End diff -- Great idea! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22197: [SPARK-25207][SQL] Case-insensitve field resoluti...
Github user yucai commented on a diff in the pull request: https://github.com/apache/spark/pull/22197#discussion_r212788978 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala --- @@ -377,7 +378,7 @@ class ParquetFileFormat // Collects all converted Parquet filter predicates. Notice that not all predicates can be // converted (`ParquetFilters.createFilter` returns an `Option`). That's why a `flatMap` // is used here. - .flatMap(parquetFilters.createFilter(parquetSchema, _)) + .flatMap(parquetFilters.createFilter(parquetSchema, _, isCaseSensitive)) --- End diff -- Yes, that way is better. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22183: [SPARK-25132][SQL][BACKPORT-2.3] Case-insensitive field ...
Github user yucai commented on the issue: https://github.com/apache/spark/pull/22183 We need to backport it. Without this PR, we cannot solve the data issue in [SPARK-25206] Wrong data may be returned when enable pushdown. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22197: [SPARK-25207][SQL] Case-insensitve field resoluti...
GitHub user yucai opened a pull request: https://github.com/apache/spark/pull/22197 [SPARK-25207][SQL] Case-insensitve field resolution for filter pushdown when reading Parquet ## What changes were proposed in this pull request? Currently, filter pushdown will not work if Parquet schema and Hive metastore schema are in different letter cases even spark.sql.caseSensitive is false. Like the below case: ```scala spark.range(10).write.parquet("/tmp/data") sql("DROP TABLE t") sql("CREATE TABLE t (ID LONG) USING parquet LOCATION '/tmp/data'") sql("select * from t where id > 0").show ``` No filter will be generated, this PR provides a case-insensitive field resolution to filter pushdown. ## How was this patch tested? Added UTs. You can merge this pull request into a Git repository by running: $ git pull https://github.com/yucai/spark SPARK-25207 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/22197.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #22197 commit 5902afe6fb6e88f98fb4f2649e59156264bc3e4d Author: yucai Date: 2018-08-23T07:16:42Z [SPARK-25207][SQL] Case-insensitve field resolution for filter pushdown when reading Parquet --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22148: [SPARK-25132][SQL] Case-insensitive field resolution whe...
Github user yucai commented on the issue: https://github.com/apache/spark/pull/22148 I trigged 3 hours ago, but see many Jenkins submission is in the queue. And it says "Jenkins is about to shut down" ? ![image](https://user-images.githubusercontent.com/2989575/44340714-44cca480-a4b8-11e8-8c2d-aa432a9516ca.png) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22148: [SPARK-25132][SQL] Case-insensitive field resolution whe...
Github user yucai commented on the issue: https://github.com/apache/spark/pull/22148 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22148: [SPARK-25132][SQL] Case-insensitive field resolution whe...
Github user yucai commented on the issue: https://github.com/apache/spark/pull/22148 LGTM. @cloud-fan @gatorsmile Could you kindly help trigger Jenkins and review? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19788: [SPARK-9853][Core] Optimize shuffle fetch of contiguous ...
Github user yucai commented on the issue: https://github.com/apache/spark/pull/19788 **Summary** One disk IO solution's performance seems not as good as current PR19877's implementation. **Benchmark** ```scacla spark.range(1, 512000L, 1, 1280).selectExpr("id as key", "id as value").groupBy("key").agg(max("value")).foreach(identity(_)) ``` ![image](https://user-images.githubusercontent.com/2989575/43998875-e606d8d4-9e32-11e8-8e35-a3ceb955aa00.png) All codes are based on the same recent master. Current AE: https://github.com/yucai/spark/tree/current_ae Server Side One Disk IO OPT: https://github.com/yucai/spark/tree/one_disk Rebase PR19877: https://github.com/yucai/spark/commits/pr19788_merge **Deep Dive** In one disk IO OPT solution, we still have two disadvantages: 1. Need send the shuffle block one by one, and the client side needs process them one by one. Instead PR19877 will send all of them in one time (logically) and client side processes them in one time. 2. No netty's zero copy. So I did another two experiment to verify my guess. 1. One Disk IO One Net (I hacked some client side codes): https://github.com/yucai/spark/commits/one_disk_one_net_1 2. One Disk IO One Net + Zero Copy (need client hack also): https://github.com/yucai/spark/commits/one_disk_one_net_2 ![image](https://user-images.githubusercontent.com/2989575/43998902-6ea317e8-9e33-11e8-9491-cdf95ce2411d.png) After optimizing to "one net", we got the similar performance as PR19877. Looks like "one net" is also important, but it needs change in client side. @cloud-fan, I understand you may be very busy with 2.4, feel free to ping me if you have any suggestion. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22077: [SPARK-25084][SQL][BACKPORT-2.3] "distribute by" on mult...
Github user yucai commented on the issue: https://github.com/apache/spark/pull/22077 @LantaoJin KafkaSourceStressForDontFailOnDataLossSuite fails occationally, thanks @wangyum for retesting. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22077: [SPARK-25084][SQL] "distribute by" on multiple columns (...
Github user yucai commented on the issue: https://github.com/apache/spark/pull/22077 ok to test. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22077: [SPARK-25084][SQL] "distribute by" on multiple columns (...
Github user yucai commented on the issue: https://github.com/apache/spark/pull/22077 @cloud-fan Kindly help review. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22077: [SPARK-25084][SQL] "distribute by" on multiple columns (...
Github user yucai commented on the issue: https://github.com/apache/spark/pull/22077 LGTM. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22077: [SPARK-25084][SQL] "distribute by" on multiple columns (...
Github user yucai commented on the issue: https://github.com/apache/spark/pull/22077 @LantaoJin, could you modify the title and description? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22066: [SPARK-25084][SQL] "distribute by" on multiple columns (...
Github user yucai commented on the issue: https://github.com/apache/spark/pull/22066 @cloud-fan Synced with @LantaoJin he will help port to 2.3 soon and I will review it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22066: [SPARK-25084][SQL] "distribute by" on multiple columns (...
Github user yucai commented on the issue: https://github.com/apache/spark/pull/22066 @cloud-fan @jerryshao sure, I will do it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22066: [SPARK-25084][SQL] "distribute by" on multiple co...
Github user yucai commented on a diff in the pull request: https://github.com/apache/spark/pull/22066#discussion_r209426886 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/hash.scala --- @@ -404,21 +404,26 @@ abstract class HashExpression[E] extends Expression { input: String, result: String, fields: Array[StructField]): String = { +val tmpInput = ctx.freshName("input") val fieldsHash = fields.zipWithIndex.map { case (field, index) => - nullSafeElementHash(input, index.toString, field.nullable, field.dataType, result, ctx) + nullSafeElementHash(tmpInput, index.toString, field.nullable, field.dataType, result, ctx) } val hashResultType = CodeGenerator.javaType(dataType) -ctx.splitExpressions( +val code = ctx.splitExpressions( expressions = fieldsHash, funcName = "computeHashForStruct", - arguments = Seq("InternalRow" -> input, hashResultType -> result), + arguments = Seq("InternalRow" -> tmpInput, hashResultType -> result), returnType = hashResultType, makeSplitFunction = body => s""" |$body |return $result; """.stripMargin, foldFunctions = _.map(funcCall => s"$result = $funcCall;").mkString("\n")) +s""" + |final InternalRow $tmpInput = $input; --- End diff -- Yes, very agree, we can improve this in the future. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22066: [SPARK-25084][SQL] "distribute by" on multiple columns (...
Github user yucai commented on the issue: https://github.com/apache/spark/pull/22066 @LantaoJin I realized the initial way had some issue, so I marked it as WIP to refine and add test. It is different from your original implementation, so I would like to use this one. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22066: [SPARK-25084][SQL] "distribute by" on multiple columns (...
Github user yucai commented on the issue: https://github.com/apache/spark/pull/22066 @cloud-fan Jira and 1st is from this one. It is critical to our 2.3 migration. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22066: [SPARK-25084][SQL] "distribute by" on multiple co...
Github user yucai commented on a diff in the pull request: https://github.com/apache/spark/pull/22066#discussion_r209267827 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/hash.scala --- @@ -778,21 +783,22 @@ case class HiveHash(children: Seq[Expression]) extends HashExpression[Int] { input: String, result: String, fields: Array[StructField]): String = { +val tmpInput = ctx.freshName("input") --- End diff -- Seems like `HiveHash` cannot be triggered in the normal way. Because Spark uses `Murmur3Hash`. But this function does have this issue. You can hack to test in this way. In `HashPartitioning`: ``` def partitionIdExpression: Expression = Pmod(new Murmur3Hash(expressions), Literal(numPartitions)) ``` to ``` def partitionIdExpression: Expression = Pmod(new HiveHash(expressions), Literal(numPartitions)) ``` Then run tests: ``` val df = spark.range(1000) val columns = (0 until 400).map{ i => s"id as id$i" } val distributeExprs = (0 until 100).map(c => s"id$c").mkString(",") df.selectExpr(columns : _*).createTempView("test") spark.sql(s"select * from test distribute by ($distributeExprs)").count() ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22066: [SPARK-25084][SQL] "distribute by" on multiple co...
Github user yucai commented on a diff in the pull request: https://github.com/apache/spark/pull/22066#discussion_r209265323 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala --- @@ -2831,4 +2831,17 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { } } } + + test("SPARK-25084: 'distribute by' on multiple columns may lead to codegen issue") { +withView("spark_25084") { + val count = 1000 --- End diff -- How to inline? We still use it in the assert. ``` assert( spark.sql(s"select * from spark_25084 distribute by ($distributeExprs)").count() === count) ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22066: [SPARK-25084][SQL] "distribute by" on multiple columns m...
Github user yucai commented on the issue: https://github.com/apache/spark/pull/22066 @cloud-fan @gatorsmile PR has been ready, kindly help review. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22066: [WIP][SPARK-25084][SQL] "distribute by" on multiple colu...
Github user yucai commented on the issue: https://github.com/apache/spark/pull/22066 @cloud-fan I am refining and adding tests. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22066: [SPARK-25084][SQL] "distribute by" on multiple co...
GitHub user yucai opened a pull request: https://github.com/apache/spark/pull/22066 [SPARK-25084][SQL] "distribute by" on multiple columns may lead to codegen issue ## What changes were proposed in this pull request? "distribute by" on multiple columns may lead to codegen issue ## How was this patch tested? UTs. You can merge this pull request into a Git repository by running: $ git pull https://github.com/yucai/spark SPARK-25084 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/22066.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #22066 commit 8ee56bbfaacdd64b1712d72650a39939ca3b13f2 Author: yucai Date: 2018-08-10T05:19:43Z "distribute by" on multiple columns may lead to codegen issue --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19788: [SPARK-9853][Core] Optimize shuffle fetch of contiguous ...
Github user yucai commented on the issue: https://github.com/apache/spark/pull/19788 @cloud-fan @gatorsmile I am trying the new method as suggested and I have a question. If we make it **purely server-side** optimization, for external shuffle service, it has no idea how shuffle data is compressed (concatenatable?) or serialized (relocatable?), how does it decide if it can merge the contiguous partition or not? One possible solution is to read all contiguous partition in one shot and then send the data one by one, how do you think? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19788: [SPARK-9853][Core] Optimize shuffle fetch of contiguous ...
Github user yucai commented on the issue: https://github.com/apache/spark/pull/19788 @gatorsmile @cloud-fan @carsonwang I will update it recently. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21156: [SPARK-24087][SQL] Avoid shuffle when join keys are a su...
Github user yucai commented on the issue: https://github.com/apache/spark/pull/21156 closed by mistake, reopen it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21156: [SPARK-24087][SQL] Avoid shuffle when join keys a...
GitHub user yucai reopened a pull request: https://github.com/apache/spark/pull/21156 [SPARK-24087][SQL] Avoid shuffle when join keys are a super-set of bucket keys ## What changes were proposed in this pull request? To improve the bucket join, when join keys are a super-set of bucket keys, we should avoid shuffle. ## How was this patch tested? Enable ignored test. You can merge this pull request into a Git repository by running: $ git pull https://github.com/yucai/spark SPARK-24087 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/21156.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #21156 commit b6bfdc21ed8edf98f9a3b9ac1c253c59adb141a2 Author: yucai Date: 2018-04-25T00:49:43Z [SPARK-24087][SQL] Avoid shuffle when join keys are a super-set of bucket keys commit a59c94f5b655fc034ce8907b98022cacf6bf318e Author: yucai Date: 2018-04-26T04:33:08Z simplify the codes commit 4e026e5e437dc7f578434244b55bb1ebe189bace Author: yucai Date: 2018-06-04T02:22:12Z Add spark.sql.sortMergeJoinExec.childrenPartitioningDetection for user to disable this feature commit fa76a7823baf4e6eb05f33bc746ade7f65f44372 Author: yucai Date: 2018-06-04T05:25:01Z enable spark.sql.sortMergeJoinExec.childrenPartitioningDetection by default commit 946688aee3d03d37a57270e654e00bb9236f21c4 Author: yucai Date: 2018-06-04T05:28:51Z should return commit 981a0fd22d30768ce533982c9fcc701b15d4dc44 Author: yucai Date: 2018-07-06T06:51:24Z skip RangePartition commit 76e7d5f67017604c29179ce55280e0fc56574fde Author: yucai Date: 2018-07-09T10:14:43Z Merge remote-tracking branch 'origin/master' into pr21156 commit 371c3a932f4dede4aeb1be2c9db404b457547ecf Author: yucai Date: 2018-07-09T11:33:17Z improve tests commit de2bc4de76077f257b85e6a1d58ee17fbc770c8e Author: yucai Date: 2018-07-12T01:43:35Z support shuffled hash join commit f40606203da01efe400431ed9d2b8b70c0476fc6 Author: yucai Date: 2018-07-26T14:33:40Z remove bucket table check --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21156: [SPARK-24087][SQL] Avoid shuffle when join keys a...
Github user yucai closed the pull request at: https://github.com/apache/spark/pull/21156 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21791: [SPARK-24832][SQL] Improve inputMetrics's bytesRe...
GitHub user yucai opened a pull request: https://github.com/apache/spark/pull/21791 [SPARK-24832][SQL] Improve inputMetrics's bytesRead update for ColumnarBatch ## What changes were proposed in this pull request? Currently, ColumnarBatch's bytesRead need to be updated every 4096 * 1000 rows, which makes the metrics out of date. This PR makes it update for each batch. ## How was this patch tested? Existing UTs. You can merge this pull request into a Git repository by running: $ git pull https://github.com/yucai/spark SPARK-24832 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/21791.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #21791 commit b9320c8d8a3735d4569709b51a0d66a7121e23cb Author: yucai Date: 2018-07-17T10:20:18Z [SPARK-24832][SQL] Improve inputMetrics's bytesRead update for ColumnarBatch --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21156: [SPARK-24087][SQL] Avoid shuffle when join keys are a su...
Github user yucai commented on the issue: https://github.com/apache/spark/pull/21156 @maryannxue how about this way? Any better idea? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org