spark git commit: [SPARK-26252][PYTHON] Add support to run specific unittests and/or doctests in python/run-tests script
Repository: spark Updated Branches: refs/heads/master 7143e9d72 -> 7e3eb3cd2 [SPARK-26252][PYTHON] Add support to run specific unittests and/or doctests in python/run-tests script ## What changes were proposed in this pull request? This PR proposes add a developer option, `--testnames`, to our testing script to allow run specific set of unittests and doctests. **1. Run unittests in the class** ```bash ./run-tests --testnames 'pyspark.sql.tests.test_arrow ArrowTests' ``` ``` Running PySpark tests. Output is in /.../spark/python/unit-tests.log Will test against the following Python executables: ['python2.7', 'pypy'] Will test the following Python tests: ['pyspark.sql.tests.test_arrow ArrowTests'] Starting test(python2.7): pyspark.sql.tests.test_arrow ArrowTests Starting test(pypy): pyspark.sql.tests.test_arrow ArrowTests Finished test(python2.7): pyspark.sql.tests.test_arrow ArrowTests (14s) Finished test(pypy): pyspark.sql.tests.test_arrow ArrowTests (14s) ... 22 tests were skipped Tests passed in 14 seconds Skipped tests in pyspark.sql.tests.test_arrow ArrowTests with pypy: test_createDataFrame_column_name_encoding (pyspark.sql.tests.test_arrow.ArrowTests) ... skipped 'Pandas >= 0.19.2 must be installed; however, it was not found.' test_createDataFrame_does_not_modify_input (pyspark.sql.tests.test_arrow.ArrowTests) ... skipped 'Pandas >= 0.19.2 must be installed; however, it was not found.' test_createDataFrame_fallback_disabled (pyspark.sql.tests.test_arrow.ArrowTests) ... skipped 'Pandas >= 0.19.2 must be installed; however, it was not found.' test_createDataFrame_fallback_enabled (pyspark.sql.tests.test_arrow.ArrowTests) ... skipped ... ``` **2. Run single unittest in the class.** ```bash ./run-tests --testnames 'pyspark.sql.tests.test_arrow ArrowTests.test_null_conversion' ``` ``` Running PySpark tests. Output is in /.../spark/python/unit-tests.log Will test against the following Python executables: ['python2.7', 'pypy'] Will test the following Python tests: ['pyspark.sql.tests.test_arrow ArrowTests.test_null_conversion'] Starting test(pypy): pyspark.sql.tests.test_arrow ArrowTests.test_null_conversion Starting test(python2.7): pyspark.sql.tests.test_arrow ArrowTests.test_null_conversion Finished test(pypy): pyspark.sql.tests.test_arrow ArrowTests.test_null_conversion (0s) ... 1 tests were skipped Finished test(python2.7): pyspark.sql.tests.test_arrow ArrowTests.test_null_conversion (8s) Tests passed in 8 seconds Skipped tests in pyspark.sql.tests.test_arrow ArrowTests.test_null_conversion with pypy: test_null_conversion (pyspark.sql.tests.test_arrow.ArrowTests) ... skipped 'Pandas >= 0.19.2 must be installed; however, it was not found.' ``` **3. Run doctests in single PySpark module.** ```bash ./run-tests --testnames pyspark.sql.dataframe ``` ``` Running PySpark tests. Output is in /.../spark/python/unit-tests.log Will test against the following Python executables: ['python2.7', 'pypy'] Will test the following Python tests: ['pyspark.sql.dataframe'] Starting test(pypy): pyspark.sql.dataframe Starting test(python2.7): pyspark.sql.dataframe Finished test(python2.7): pyspark.sql.dataframe (47s) Finished test(pypy): pyspark.sql.dataframe (48s) Tests passed in 48 seconds ``` Of course, you can mix them: ```bash ./run-tests --testnames 'pyspark.sql.tests.test_arrow ArrowTests,pyspark.sql.dataframe' ``` ``` Running PySpark tests. Output is in /.../spark/python/unit-tests.log Will test against the following Python executables: ['python2.7', 'pypy'] Will test the following Python tests: ['pyspark.sql.tests.test_arrow ArrowTests', 'pyspark.sql.dataframe'] Starting test(pypy): pyspark.sql.dataframe Starting test(pypy): pyspark.sql.tests.test_arrow ArrowTests Starting test(python2.7): pyspark.sql.dataframe Starting test(python2.7): pyspark.sql.tests.test_arrow ArrowTests Finished test(pypy): pyspark.sql.tests.test_arrow ArrowTests (0s) ... 22 tests were skipped Finished test(python2.7): pyspark.sql.tests.test_arrow ArrowTests (18s) Finished test(python2.7): pyspark.sql.dataframe (50s) Finished test(pypy): pyspark.sql.dataframe (52s) Tests passed in 52 seconds Skipped tests in pyspark.sql.tests.test_arrow ArrowTests with pypy: test_createDataFrame_column_name_encoding (pyspark.sql.tests.test_arrow.ArrowTests) ... skipped 'Pandas >= 0.19.2 must be installed; however, it was not found.' test_createDataFrame_does_not_modify_input (pyspark.sql.tests.test_arrow.ArrowTests) ... skipped 'Pandas >= 0.19.2 must be installed; however, it was not found.' test_createDataFrame_fallback_disabled (pyspark.sql.tests.test_arrow.ArrowTests) ... skipped 'Pandas >= 0.19.2 must be installed; however, it was not found.' ``` and also you can use all other options (except `--modules`, which will be ignored) ```bash ./run-tests --testnames 'pyspark.sql.tests.test_arrow ArrowTests.test_null_conversion' --python-executables=python ``` ``` Running PySpark
svn commit: r31362 - in /dev/spark/3.0.0-SNAPSHOT-2018_12_04_21_19-7143e9d-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Wed Dec 5 05:32:29 2018 New Revision: 31362 Log: Apache Spark 3.0.0-SNAPSHOT-2018_12_04_21_19-7143e9d docs [This commit notification would consist of 1764 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
svn commit: r31361 - in /dev/spark/3.0.0-SNAPSHOT-2018_12_04_17_11-180f969-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Wed Dec 5 01:23:23 2018 New Revision: 31361 Log: Apache Spark 3.0.0-SNAPSHOT-2018_12_04_17_11-180f969 docs [This commit notification would consist of 1764 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-25829][SQL][FOLLOWUP] Refactor MapConcat in order to check properly the limit size
Repository: spark Updated Branches: refs/heads/master 180f969c9 -> 7143e9d72 [SPARK-25829][SQL][FOLLOWUP] Refactor MapConcat in order to check properly the limit size ## What changes were proposed in this pull request? The PR starts from the [comment](https://github.com/apache/spark/pull/23124#discussion_r236112390) in the main one and it aims at: - simplifying the code for `MapConcat`; - be more precise in checking the limit size. ## How was this patch tested? existing tests Closes #23217 from mgaido91/SPARK-25829_followup. Authored-by: Marco Gaido Signed-off-by: Wenchen Fan Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7143e9d7 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7143e9d7 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7143e9d7 Branch: refs/heads/master Commit: 7143e9d7220bd98ceb82c5c5f045108a8a664ec1 Parents: 180f969 Author: Marco Gaido Authored: Wed Dec 5 09:12:24 2018 +0800 Committer: Wenchen Fan Committed: Wed Dec 5 09:12:24 2018 +0800 -- .../expressions/collectionOperations.scala | 77 +--- .../catalyst/util/ArrayBasedMapBuilder.scala| 10 +++ 2 files changed, 12 insertions(+), 75 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/7143e9d7/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala index fa8e38a..67f6739 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala @@ -554,13 +554,6 @@ case class MapConcat(children: Seq[Expression]) extends ComplexTypeMergingExpres return null } -val numElements = maps.foldLeft(0L)((sum, ad) => sum + ad.numElements()) -if (numElements > ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH) { - throw new RuntimeException(s"Unsuccessful attempt to concat maps with $numElements " + -s"elements due to exceeding the map size limit " + -s"${ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH}.") -} - for (map <- maps) { mapBuilder.putAll(map.keyArray(), map.valueArray()) } @@ -569,8 +562,6 @@ case class MapConcat(children: Seq[Expression]) extends ComplexTypeMergingExpres override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { val mapCodes = children.map(_.genCode(ctx)) -val keyType = dataType.keyType -val valueType = dataType.valueType val argsName = ctx.freshName("args") val hasNullName = ctx.freshName("hasNull") val builderTerm = ctx.addReferenceObj("mapBuilder", mapBuilder) @@ -610,41 +601,12 @@ case class MapConcat(children: Seq[Expression]) extends ComplexTypeMergingExpres ) val idxName = ctx.freshName("idx") -val numElementsName = ctx.freshName("numElems") -val finKeysName = ctx.freshName("finalKeys") -val finValsName = ctx.freshName("finalValues") - -val keyConcat = genCodeForArrays(ctx, keyType, false) - -val valueConcat = - if (valueType.sameType(keyType) && - !(CodeGenerator.isPrimitiveType(valueType) && dataType.valueContainsNull)) { -keyConcat - } else { -genCodeForArrays(ctx, valueType, dataType.valueContainsNull) - } - -val keyArgsName = ctx.freshName("keyArgs") -val valArgsName = ctx.freshName("valArgs") - val mapMerge = s""" -|ArrayData[] $keyArgsName = new ArrayData[${mapCodes.size}]; -|ArrayData[] $valArgsName = new ArrayData[${mapCodes.size}]; -|long $numElementsName = 0; |for (int $idxName = 0; $idxName < $argsName.length; $idxName++) { -| $keyArgsName[$idxName] = $argsName[$idxName].keyArray(); -| $valArgsName[$idxName] = $argsName[$idxName].valueArray(); -| $numElementsName += $argsName[$idxName].numElements(); +| $builderTerm.putAll($argsName[$idxName].keyArray(), $argsName[$idxName].valueArray()); |} -|if ($numElementsName > ${ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH}) { -| throw new RuntimeException("Unsuccessful attempt to concat maps with " + -| $numElementsName + " elements due to exceeding the map size limit " + -| "${ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH}."); -|} -|ArrayData $finKeysName = $keyConcat($keyArgsName, (int) $numElementsName); -|ArrayData $finValsName = $valueConcat($valArgsName, (int) $numElementsName); -
svn commit: r31359 - in /dev/spark/2.4.1-SNAPSHOT-2018_12_04_15_04-51739d1-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Tue Dec 4 23:18:58 2018 New Revision: 31359 Log: Apache Spark 2.4.1-SNAPSHOT-2018_12_04_15_04-51739d1 docs [This commit notification would consist of 1476 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-26094][CORE][STREAMING] createNonEcFile creates parent dirs.
Repository: spark Updated Branches: refs/heads/master 35f9163ad -> 180f969c9 [SPARK-26094][CORE][STREAMING] createNonEcFile creates parent dirs. ## What changes were proposed in this pull request? We explicitly avoid files with hdfs erasure coding for the streaming WAL and for event logs, as hdfs EC does not support all relevant apis. However, the new builder api used has different semantics -- it does not create parent dirs, and it does not resolve relative paths. This updates createNonEcFile to have similar semantics to the old api. ## How was this patch tested? Ran tests with the WAL pointed at a non-existent dir, which failed before this change. Manually tested the new function with a relative path as well. Unit tests via jenkins. Closes #23092 from squito/SPARK-26094. Authored-by: Imran Rashid Signed-off-by: Marcelo Vanzin Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/180f969c Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/180f969c Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/180f969c Branch: refs/heads/master Commit: 180f969c97a66b4c265e5fad8272665a00572f1a Parents: 35f9163 Author: Imran Rashid Authored: Tue Dec 4 14:35:04 2018 -0800 Committer: Marcelo Vanzin Committed: Tue Dec 4 14:35:04 2018 -0800 -- .../main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala | 8 +++- 1 file changed, 7 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/180f969c/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala -- diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala index 7bb2a41..9371992 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala @@ -466,7 +466,13 @@ private[spark] object SparkHadoopUtil { try { // Use reflection as this uses apis only avialable in hadoop 3 val builderMethod = fs.getClass().getMethod("createFile", classOf[Path]) - val builder = builderMethod.invoke(fs, path) + // the builder api does not resolve relative paths, nor does it create parent dirs, while + // the old api does. + if (!fs.mkdirs(path.getParent())) { +throw new IOException(s"Failed to create parents of $path") + } + val qualifiedPath = fs.makeQualified(path) + val builder = builderMethod.invoke(fs, qualifiedPath) val builderCls = builder.getClass() // this may throw a NoSuchMethodException if the path is not on hdfs val replicateMethod = builderCls.getMethod("replicate") - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-26119][CORE][WEBUI] Task summary table should contain only successful tasks' metrics
Repository: spark Updated Branches: refs/heads/branch-2.4 a091216a6 -> 51739d1ae [SPARK-26119][CORE][WEBUI] Task summary table should contain only successful tasks' metrics ## What changes were proposed in this pull request? Task summary table in the stage page currently displays the summary of all the tasks. However, we should display the task summary of only successful tasks, to follow the behavior of previous versions of spark. ## How was this patch tested? Added UT. attached screenshot Before patch: ![screenshot from 2018-11-20 00-36-18](https://user-images.githubusercontent.com/23054875/48729339-62e3a580-ec5d-11e8-81f0-0d191a234ffe.png) ![screenshot from 2018-11-20 01-18-37](https://user-images.githubusercontent.com/23054875/48731112-41d18380-ec62-11e8-8c31-1ffbfa04e746.png) Closes #23088 from shahidki31/summaryMetrics. Authored-by: Shahid Signed-off-by: Marcelo Vanzin (cherry picked from commit 35f9163adf5c067229afbe57ed60d5dd5f2422c8) Signed-off-by: Marcelo Vanzin Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/51739d1a Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/51739d1a Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/51739d1a Branch: refs/heads/branch-2.4 Commit: 51739d1ae06801138f06dcc441fc10c9b821789b Parents: a091216 Author: Shahid Authored: Tue Dec 4 11:00:58 2018 -0800 Committer: Marcelo Vanzin Committed: Tue Dec 4 11:01:10 2018 -0800 -- .../apache/spark/status/AppStatusStore.scala| 73 ++-- .../spark/status/AppStatusStoreSuite.scala | 33 - 2 files changed, 81 insertions(+), 25 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/51739d1a/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala -- diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala b/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala index e237281..84716f8 100644 --- a/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala +++ b/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala @@ -146,11 +146,20 @@ private[spark] class AppStatusStore( // cheaper for disk stores (avoids deserialization). val count = { Utils.tryWithResource( -store.view(classOf[TaskDataWrapper]) - .parent(stageKey) - .index(TaskIndexNames.EXEC_RUN_TIME) - .first(0L) - .closeableIterator() +if (store.isInstanceOf[InMemoryStore]) { + store.view(classOf[TaskDataWrapper]) +.parent(stageKey) +.index(TaskIndexNames.STATUS) +.first("SUCCESS") +.last("SUCCESS") +.closeableIterator() +} else { + store.view(classOf[TaskDataWrapper]) +.parent(stageKey) +.index(TaskIndexNames.EXEC_RUN_TIME) +.first(0L) +.closeableIterator() +} ) { it => var _count = 0L while (it.hasNext()) { @@ -219,30 +228,50 @@ private[spark] class AppStatusStore( // stabilize once the stage finishes. It's also slow, especially with disk stores. val indices = quantiles.map { q => math.min((q * count).toLong, count - 1) } +// TODO: Summary metrics needs to display all the successful tasks' metrics (SPARK-26119). +// For InMemory case, it is efficient to find using the following code. But for diskStore case +// we need an efficient solution to avoid deserialization time overhead. For that, we need to +// rework on the way indexing works, so that we can index by specific metrics for successful +// and failed tasks differently (would be tricky). Also would require changing the disk store +// version (to invalidate old stores). def scanTasks(index: String)(fn: TaskDataWrapper => Long): IndexedSeq[Double] = { - Utils.tryWithResource( -store.view(classOf[TaskDataWrapper]) + if (store.isInstanceOf[InMemoryStore]) { +val quantileTasks = store.view(classOf[TaskDataWrapper]) .parent(stageKey) .index(index) .first(0L) - .closeableIterator() - ) { it => -var last = Double.NaN -var currentIdx = -1L -indices.map { idx => - if (idx == currentIdx) { -last - } else { -val diff = idx - currentIdx -currentIdx = idx -if (it.skip(diff - 1)) { - last = fn(it.next()).toDouble + .asScala + .filter { _.status == "SUCCESS"} // Filter "SUCCESS" tasks + .toIndexedSeq + +indices.map { index => + fn(quantileTasks(index.toInt)).toDouble +}.toIndexedSeq + } else { +
spark git commit: [SPARK-26119][CORE][WEBUI] Task summary table should contain only successful tasks' metrics
Repository: spark Updated Branches: refs/heads/master 556d83e0d -> 35f9163ad [SPARK-26119][CORE][WEBUI] Task summary table should contain only successful tasks' metrics ## What changes were proposed in this pull request? Task summary table in the stage page currently displays the summary of all the tasks. However, we should display the task summary of only successful tasks, to follow the behavior of previous versions of spark. ## How was this patch tested? Added UT. attached screenshot Before patch: ![screenshot from 2018-11-20 00-36-18](https://user-images.githubusercontent.com/23054875/48729339-62e3a580-ec5d-11e8-81f0-0d191a234ffe.png) ![screenshot from 2018-11-20 01-18-37](https://user-images.githubusercontent.com/23054875/48731112-41d18380-ec62-11e8-8c31-1ffbfa04e746.png) Closes #23088 from shahidki31/summaryMetrics. Authored-by: Shahid Signed-off-by: Marcelo Vanzin Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/35f9163a Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/35f9163a Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/35f9163a Branch: refs/heads/master Commit: 35f9163adf5c067229afbe57ed60d5dd5f2422c8 Parents: 556d83e Author: Shahid Authored: Tue Dec 4 11:00:58 2018 -0800 Committer: Marcelo Vanzin Committed: Tue Dec 4 11:00:58 2018 -0800 -- .../apache/spark/status/AppStatusStore.scala| 73 ++-- .../spark/status/AppStatusStoreSuite.scala | 33 - 2 files changed, 81 insertions(+), 25 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/35f9163a/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala -- diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala b/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala index 5c0ed4d..b35781c 100644 --- a/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala +++ b/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala @@ -148,11 +148,20 @@ private[spark] class AppStatusStore( // cheaper for disk stores (avoids deserialization). val count = { Utils.tryWithResource( -store.view(classOf[TaskDataWrapper]) - .parent(stageKey) - .index(TaskIndexNames.EXEC_RUN_TIME) - .first(0L) - .closeableIterator() +if (store.isInstanceOf[InMemoryStore]) { + store.view(classOf[TaskDataWrapper]) +.parent(stageKey) +.index(TaskIndexNames.STATUS) +.first("SUCCESS") +.last("SUCCESS") +.closeableIterator() +} else { + store.view(classOf[TaskDataWrapper]) +.parent(stageKey) +.index(TaskIndexNames.EXEC_RUN_TIME) +.first(0L) +.closeableIterator() +} ) { it => var _count = 0L while (it.hasNext()) { @@ -221,30 +230,50 @@ private[spark] class AppStatusStore( // stabilize once the stage finishes. It's also slow, especially with disk stores. val indices = quantiles.map { q => math.min((q * count).toLong, count - 1) } +// TODO: Summary metrics needs to display all the successful tasks' metrics (SPARK-26119). +// For InMemory case, it is efficient to find using the following code. But for diskStore case +// we need an efficient solution to avoid deserialization time overhead. For that, we need to +// rework on the way indexing works, so that we can index by specific metrics for successful +// and failed tasks differently (would be tricky). Also would require changing the disk store +// version (to invalidate old stores). def scanTasks(index: String)(fn: TaskDataWrapper => Long): IndexedSeq[Double] = { - Utils.tryWithResource( -store.view(classOf[TaskDataWrapper]) + if (store.isInstanceOf[InMemoryStore]) { +val quantileTasks = store.view(classOf[TaskDataWrapper]) .parent(stageKey) .index(index) .first(0L) - .closeableIterator() - ) { it => -var last = Double.NaN -var currentIdx = -1L -indices.map { idx => - if (idx == currentIdx) { -last - } else { -val diff = idx - currentIdx -currentIdx = idx -if (it.skip(diff - 1)) { - last = fn(it.next()).toDouble + .asScala + .filter { _.status == "SUCCESS"} // Filter "SUCCESS" tasks + .toIndexedSeq + +indices.map { index => + fn(quantileTasks(index.toInt)).toDouble +}.toIndexedSeq + } else { +Utils.tryWithResource( + store.view(classOf[TaskDataWrapper]) +.parent(stageKey) +
spark git commit: [SPARK-26233][SQL] CheckOverflow when encoding a decimal value
Repository: spark Updated Branches: refs/heads/master f982ca07e -> 556d83e0d [SPARK-26233][SQL] CheckOverflow when encoding a decimal value ## What changes were proposed in this pull request? When we encode a Decimal from external source we don't check for overflow. That method is useful not only in order to enforce that we can represent the correct value in the specified range, but it also changes the underlying data to the right precision/scale. Since in our code generation we assume that a decimal has exactly the same precision and scale of its data type, missing to enforce it can lead to corrupted output/results when there are subsequent transformations. ## How was this patch tested? added UT Closes #23210 from mgaido91/SPARK-26233. Authored-by: Marco Gaido Signed-off-by: Dongjoon Hyun Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/556d83e0 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/556d83e0 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/556d83e0 Branch: refs/heads/master Commit: 556d83e0d87a8f899f29544eb5ca4999a84c96c1 Parents: f982ca0 Author: Marco Gaido Authored: Tue Dec 4 10:33:27 2018 -0800 Committer: Dongjoon Hyun Committed: Tue Dec 4 10:33:27 2018 -0800 -- .../org/apache/spark/sql/catalyst/encoders/RowEncoder.scala | 4 ++-- .../src/test/scala/org/apache/spark/sql/DatasetSuite.scala | 9 + 2 files changed, 11 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/556d83e0/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/RowEncoder.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/RowEncoder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/RowEncoder.scala index d905f8f..8ca3d35 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/RowEncoder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/RowEncoder.scala @@ -106,12 +106,12 @@ object RowEncoder { returnNullable = false) case d: DecimalType => - StaticInvoke( + CheckOverflow(StaticInvoke( Decimal.getClass, d, "fromDecimal", inputObject :: Nil, -returnNullable = false) +returnNullable = false), d) case StringType => StaticInvoke( http://git-wip-us.apache.org/repos/asf/spark/blob/556d83e0/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala index 0f90083..525c7ce 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala @@ -1647,6 +1647,15 @@ class DatasetSuite extends QueryTest with SharedSQLContext { checkDataset(ds, data: _*) checkAnswer(ds.select("x"), Seq(Row(1), Row(2))) } + + test("SPARK-26233: serializer should enforce decimal precision and scale") { +val s = StructType(Seq(StructField("a", StringType), StructField("b", DecimalType(38, 8 +val encoder = RowEncoder(s) +implicit val uEnc = encoder +val df = spark.range(2).map(l => Row(l.toString, BigDecimal.valueOf(l + 0.))) +checkAnswer(df.groupBy(col("a")).agg(first(col("b"))), + Seq(Row("0", BigDecimal.valueOf(0.)), Row("1", BigDecimal.valueOf(1. + } } case class TestDataUnion(x: Int, y: Int, z: Int) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
svn commit: r31346 - in /dev/spark/3.0.0-SNAPSHOT-2018_12_04_08_34-f982ca0-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Tue Dec 4 16:46:19 2018 New Revision: 31346 Log: Apache Spark 3.0.0-SNAPSHOT-2018_12_04_08_34-f982ca0 docs [This commit notification would consist of 1764 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-26178][SQL] Use java.time API for parsing timestamps and dates from CSV
Repository: spark Updated Branches: refs/heads/master 06a3b6aaf -> f982ca07e [SPARK-26178][SQL] Use java.time API for parsing timestamps and dates from CSV ## What changes were proposed in this pull request? In the PR, I propose to use **java.time API** for parsing timestamps and dates from CSV content with microseconds precision. The SQL config `spark.sql.legacy.timeParser.enabled` allow to switch back to previous behaviour with using `java.text.SimpleDateFormat`/`FastDateFormat` for parsing/generating timestamps/dates. ## How was this patch tested? It was tested by `UnivocityParserSuite`, `CsvExpressionsSuite`, `CsvFunctionsSuite` and `CsvSuite`. Closes #23150 from MaxGekk/time-parser. Lead-authored-by: Maxim Gekk Co-authored-by: Maxim Gekk Signed-off-by: Sean Owen Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f982ca07 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f982ca07 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f982ca07 Branch: refs/heads/master Commit: f982ca07e80074bdc1e3b742c5e21cf368e4ede2 Parents: 06a3b6a Author: Maxim Gekk Authored: Tue Dec 4 08:36:33 2018 -0600 Committer: Sean Owen Committed: Tue Dec 4 08:36:33 2018 -0600 -- docs/sql-migration-guide-upgrade.md | 2 + .../spark/sql/catalyst/csv/CSVInferSchema.scala | 15 +- .../spark/sql/catalyst/csv/CSVOptions.scala | 10 +- .../sql/catalyst/csv/UnivocityGenerator.scala | 14 +- .../sql/catalyst/csv/UnivocityParser.scala | 38 ++-- .../sql/catalyst/util/DateTimeFormatter.scala | 179 +++ .../spark/sql/catalyst/util/DateTimeUtils.scala | 2 +- .../org/apache/spark/sql/internal/SQLConf.scala | 9 + .../sql/catalyst/csv/CSVInferSchemaSuite.scala | 7 +- .../sql/catalyst/csv/UnivocityParserSuite.scala | 113 ++-- .../sql/catalyst/util/DateTimeTestUtils.scala | 5 +- .../spark/sql/util/DateTimeFormatterSuite.scala | 103 +++ .../apache/spark/sql/CsvFunctionsSuite.scala| 2 +- .../execution/datasources/csv/CSVSuite.scala| 66 +++ 14 files changed, 431 insertions(+), 134 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/f982ca07/docs/sql-migration-guide-upgrade.md -- diff --git a/docs/sql-migration-guide-upgrade.md b/docs/sql-migration-guide-upgrade.md index 787f4bc..fee0e6d 100644 --- a/docs/sql-migration-guide-upgrade.md +++ b/docs/sql-migration-guide-upgrade.md @@ -33,6 +33,8 @@ displayTitle: Spark SQL Upgrading Guide - Spark applications which are built with Spark version 2.4 and prior, and call methods of `UserDefinedFunction`, need to be re-compiled with Spark 3.0, as they are not binary compatible with Spark 3.0. + - Since Spark 3.0, CSV datasource uses java.time API for parsing and generating CSV content. New formatting implementation supports date/timestamp patterns conformed to ISO 8601. To switch back to the implementation used in Spark 2.4 and earlier, set `spark.sql.legacy.timeParser.enabled` to `true`. + ## Upgrading From Spark SQL 2.3 to 2.4 - In Spark version 2.3 and earlier, the second parameter to array_contains function is implicitly promoted to the element type of first array type parameter. This type promotion can be lossy and may cause `array_contains` function to return wrong result. This problem has been addressed in 2.4 by employing a safer type promotion mechanism. This can cause some change in behavior and are illustrated in the table below. http://git-wip-us.apache.org/repos/asf/spark/blob/f982ca07/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala index 94cb4b1..345dc4d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala @@ -22,10 +22,16 @@ import scala.util.control.Exception.allCatch import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.analysis.TypeCoercion import org.apache.spark.sql.catalyst.expressions.ExprUtils -import org.apache.spark.sql.catalyst.util.DateTimeUtils +import org.apache.spark.sql.catalyst.util.DateTimeFormatter import org.apache.spark.sql.types._ -class CSVInferSchema(options: CSVOptions) extends Serializable { +class CSVInferSchema(val options: CSVOptions) extends Serializable { + + @transient + private lazy val timeParser = DateTimeFormatter( +options.timestampFormat, +options.timeZone, +
svn commit: r31345 - in /dev/spark/2.4.1-SNAPSHOT-2018_12_04_06_20-a091216-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Tue Dec 4 14:35:30 2018 New Revision: 31345 Log: Apache Spark 2.4.1-SNAPSHOT-2018_12_04_06_20-a091216 docs [This commit notification would consist of 1476 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-24423][FOLLOW-UP][SQL] Fix error example
Repository: spark Updated Branches: refs/heads/branch-2.4 90fcd12af -> a091216a6 [SPARK-24423][FOLLOW-UP][SQL] Fix error example ## What changes were proposed in this pull request? ![image](https://user-images.githubusercontent.com/5399861/49172173-42ad9800-f37b-11e8-8135-7adc323357ae.png) It will throw: ``` requirement failed: When reading JDBC data sources, users need to specify all or none for the following options: 'partitionColumn', 'lowerBound', 'upperBound', and 'numPartitions' ``` and ``` User-defined partition column subq.c1 not found in the JDBC relation ... ``` This PR fix this error example. ## How was this patch tested? manual tests Closes #23170 from wangyum/SPARK-24499. Authored-by: Yuming Wang Signed-off-by: Sean Owen (cherry picked from commit 06a3b6aafa510ede2f1376b29a46f99447286c67) Signed-off-by: Sean Owen Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a091216a Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a091216a Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a091216a Branch: refs/heads/branch-2.4 Commit: a091216a6d34ec998de05dca441ae5a368c13c22 Parents: 90fcd12 Author: Yuming Wang Authored: Tue Dec 4 07:57:58 2018 -0600 Committer: Sean Owen Committed: Tue Dec 4 07:58:12 2018 -0600 -- docs/sql-data-sources-jdbc.md | 6 +++--- .../sql/execution/datasources/jdbc/JDBCOptions.scala | 10 +++--- .../test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala | 10 +++--- 3 files changed, 17 insertions(+), 9 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/a091216a/docs/sql-data-sources-jdbc.md -- diff --git a/docs/sql-data-sources-jdbc.md b/docs/sql-data-sources-jdbc.md index 057e821..0f2bc49 100644 --- a/docs/sql-data-sources-jdbc.md +++ b/docs/sql-data-sources-jdbc.md @@ -64,9 +64,9 @@ the following case-insensitive options: Example: spark.read.format("jdbc") -.option("dbtable", "(select c1, c2 from t1) as subq") -.option("partitionColumn", "subq.c1" -.load() + .option("url", jdbcUrl) + .option("query", "select c1, c2 from t1") + .load() http://git-wip-us.apache.org/repos/asf/spark/blob/a091216a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala index 7dfbb9d..b4469cb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala @@ -137,9 +137,13 @@ class JDBCOptions( |the partition columns using the supplied subquery alias to resolve any ambiguity. |Example : |spark.read.format("jdbc") - |.option("dbtable", "(select c1, c2 from t1) as subq") - |.option("partitionColumn", "subq.c1" - |.load() + | .option("url", jdbcUrl) + | .option("dbtable", "(select c1, c2 from t1) as subq") + | .option("partitionColumn", "c1") + | .option("lowerBound", "1") + | .option("upperBound", "100") + | .option("numPartitions", "3") + | .load() """.stripMargin ) http://git-wip-us.apache.org/repos/asf/spark/blob/a091216a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala index 7fa0e7f..71e8376 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala @@ -1348,9 +1348,13 @@ class JDBCSuite extends QueryTest |the partition columns using the supplied subquery alias to resolve any ambiguity. |Example : |spark.read.format("jdbc") - |.option("dbtable", "(select c1, c2 from t1) as subq") - |.option("partitionColumn", "subq.c1" - |.load() + | .option("url", jdbcUrl) + | .option("dbtable", "(select c1, c2 from t1) as subq") + | .option("partitionColumn", "c1") + | .option("lowerBound", "1") + | .option("upperBound", "100") + | .option("numPartitions", "3") + |
spark git commit: [SPARK-24423][FOLLOW-UP][SQL] Fix error example
Repository: spark Updated Branches: refs/heads/master 93f5592aa -> 06a3b6aaf [SPARK-24423][FOLLOW-UP][SQL] Fix error example ## What changes were proposed in this pull request? ![image](https://user-images.githubusercontent.com/5399861/49172173-42ad9800-f37b-11e8-8135-7adc323357ae.png) It will throw: ``` requirement failed: When reading JDBC data sources, users need to specify all or none for the following options: 'partitionColumn', 'lowerBound', 'upperBound', and 'numPartitions' ``` and ``` User-defined partition column subq.c1 not found in the JDBC relation ... ``` This PR fix this error example. ## How was this patch tested? manual tests Closes #23170 from wangyum/SPARK-24499. Authored-by: Yuming Wang Signed-off-by: Sean Owen Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/06a3b6aa Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/06a3b6aa Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/06a3b6aa Branch: refs/heads/master Commit: 06a3b6aafa510ede2f1376b29a46f99447286c67 Parents: 93f5592 Author: Yuming Wang Authored: Tue Dec 4 07:57:58 2018 -0600 Committer: Sean Owen Committed: Tue Dec 4 07:57:58 2018 -0600 -- docs/sql-data-sources-jdbc.md | 6 +++--- .../sql/execution/datasources/jdbc/JDBCOptions.scala | 10 +++--- .../test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala | 10 +++--- 3 files changed, 17 insertions(+), 9 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/06a3b6aa/docs/sql-data-sources-jdbc.md -- diff --git a/docs/sql-data-sources-jdbc.md b/docs/sql-data-sources-jdbc.md index 9a5d0fc..a2b1462 100644 --- a/docs/sql-data-sources-jdbc.md +++ b/docs/sql-data-sources-jdbc.md @@ -64,9 +64,9 @@ the following case-insensitive options: Example: spark.read.format("jdbc") -.option("dbtable", "(select c1, c2 from t1) as subq") -.option("partitionColumn", "subq.c1" -.load() + .option("url", jdbcUrl) + .option("query", "select c1, c2 from t1") + .load() http://git-wip-us.apache.org/repos/asf/spark/blob/06a3b6aa/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala index 7dfbb9d..b4469cb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala @@ -137,9 +137,13 @@ class JDBCOptions( |the partition columns using the supplied subquery alias to resolve any ambiguity. |Example : |spark.read.format("jdbc") - |.option("dbtable", "(select c1, c2 from t1) as subq") - |.option("partitionColumn", "subq.c1" - |.load() + | .option("url", jdbcUrl) + | .option("dbtable", "(select c1, c2 from t1) as subq") + | .option("partitionColumn", "c1") + | .option("lowerBound", "1") + | .option("upperBound", "100") + | .option("numPartitions", "3") + | .load() """.stripMargin ) http://git-wip-us.apache.org/repos/asf/spark/blob/06a3b6aa/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala index 7fa0e7f..71e8376 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala @@ -1348,9 +1348,13 @@ class JDBCSuite extends QueryTest |the partition columns using the supplied subquery alias to resolve any ambiguity. |Example : |spark.read.format("jdbc") - |.option("dbtable", "(select c1, c2 from t1) as subq") - |.option("partitionColumn", "subq.c1" - |.load() + | .option("url", jdbcUrl) + | .option("dbtable", "(select c1, c2 from t1) as subq") + | .option("partitionColumn", "c1") + | .option("lowerBound", "1") + | .option("upperBound", "100") + | .option("numPartitions", "3") + | .load() """.stripMargin val e5 = intercept[RuntimeException] { sql(
spark git commit: [MINOR][SQL] Combine the same codes in test cases
Repository: spark Updated Branches: refs/heads/master 261284842 -> 93f5592aa [MINOR][SQL] Combine the same codes in test cases ## What changes were proposed in this pull request? In the DDLSuit, there are four test cases have the same codes , writing a function can combine the same code. ## How was this patch tested? existing tests. Closes #23194 from CarolinePeng/Update_temp. Authored-by: å½ç¿00244106 <00244106@zte.intra> Signed-off-by: Takeshi Yamamuro Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/93f5592a Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/93f5592a Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/93f5592a Branch: refs/heads/master Commit: 93f5592aa8c1254a93524fda81cf0e418c22cb2f Parents: 2612848 Author: å½ç¿00244106 <00244106@zte.intra> Authored: Tue Dec 4 22:08:16 2018 +0900 Committer: Takeshi Yamamuro Committed: Tue Dec 4 22:08:16 2018 +0900 -- .../spark/sql/execution/command/DDLSuite.scala | 40 1 file changed, 16 insertions(+), 24 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/93f5592a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala index 9d32fb6..052a5e7 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala @@ -377,41 +377,41 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils { } } - test("CTAS a managed table with the existing empty directory") { -val tableLoc = new File(spark.sessionState.catalog.defaultTablePath(TableIdentifier("tab1"))) + private def withEmptyDirInTablePath(dirName: String)(f : File => Unit): Unit = { +val tableLoc = + new File(spark.sessionState.catalog.defaultTablePath(TableIdentifier(dirName))) try { tableLoc.mkdir() + f(tableLoc) +} finally { + waitForTasksToFinish() + Utils.deleteRecursively(tableLoc) +} + } + + + test("CTAS a managed table with the existing empty directory") { +withEmptyDirInTablePath("tab1") { tableLoc => withTable("tab1") { sql(s"CREATE TABLE tab1 USING ${dataSource} AS SELECT 1, 'a'") checkAnswer(spark.table("tab1"), Row(1, "a")) } -} finally { - waitForTasksToFinish() - Utils.deleteRecursively(tableLoc) } } test("create a managed table with the existing empty directory") { -val tableLoc = new File(spark.sessionState.catalog.defaultTablePath(TableIdentifier("tab1"))) -try { - tableLoc.mkdir() +withEmptyDirInTablePath("tab1") { tableLoc => withTable("tab1") { sql(s"CREATE TABLE tab1 (col1 int, col2 string) USING ${dataSource}") sql("INSERT INTO tab1 VALUES (1, 'a')") checkAnswer(spark.table("tab1"), Row(1, "a")) } -} finally { - waitForTasksToFinish() - Utils.deleteRecursively(tableLoc) } } test("create a managed table with the existing non-empty directory") { withTable("tab1") { - val tableLoc = new File(spark.sessionState.catalog.defaultTablePath(TableIdentifier("tab1"))) - try { -// create an empty hidden file -tableLoc.mkdir() + withEmptyDirInTablePath("tab1") { tableLoc => val hiddenGarbageFile = new File(tableLoc.getCanonicalPath, ".garbage") hiddenGarbageFile.createNewFile() val exMsg = "Can not create the managed table('`tab1`'). The associated location" @@ -439,28 +439,20 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils { }.getMessage assert(ex.contains(exMsgWithDefaultDB)) } - } finally { -waitForTasksToFinish() -Utils.deleteRecursively(tableLoc) } } } test("rename a managed table with existing empty directory") { -val tableLoc = new File(spark.sessionState.catalog.defaultTablePath(TableIdentifier("tab2"))) -try { +withEmptyDirInTablePath("tab2") { tableLoc => withTable("tab1") { sql(s"CREATE TABLE tab1 USING $dataSource AS SELECT 1, 'a'") -tableLoc.mkdir() val ex = intercept[AnalysisException] { sql("ALTER TABLE tab1 RENAME TO tab2") }.getMessage val expectedMsg = "Can not rename the managed table('`tab1`'). The associated location" assert(ex.contains(expectedMsg)) } -} finally { - waitForTasksToFinish() - Utils.deleteRecursively(tableLoc) } }
spark git commit: [SPARK-25374][SQL] SafeProjection supports fallback to an interpreted mode
Repository: spark Updated Branches: refs/heads/master b4dea313c -> 261284842 [SPARK-25374][SQL] SafeProjection supports fallback to an interpreted mode ## What changes were proposed in this pull request? In SPARK-23711, we have implemented the expression fallback logic to an interpreted mode. So, this pr fixed code to support the same fallback mode in `SafeProjection` based on `CodeGeneratorWithInterpretedFallback`. ## How was this patch tested? Add tests in `CodeGeneratorWithInterpretedFallbackSuite` and `UnsafeRowConverterSuite`. Closes #22468 from maropu/SPARK-25374-3. Authored-by: Takeshi Yamamuro Signed-off-by: Wenchen Fan Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/26128484 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/26128484 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/26128484 Branch: refs/heads/master Commit: 26128484228089c74517cd15cef0bb4166a4186f Parents: b4dea31 Author: Takeshi Yamamuro Authored: Tue Dec 4 20:20:29 2018 +0800 Committer: Wenchen Fan Committed: Tue Dec 4 20:20:29 2018 +0800 -- .../catalyst/encoders/ExpressionEncoder.scala | 2 +- .../expressions/InterpretedSafeProjection.scala | 125 +++ .../sql/catalyst/expressions/Projection.scala | 34 +++-- .../expressions/CodeGenerationSuite.scala | 2 +- ...eGeneratorWithInterpretedFallbackSuite.scala | 15 +++ .../expressions/ExpressionEvalHelper.scala | 4 +- .../expressions/MutableProjectionSuite.scala| 2 +- .../expressions/UnsafeRowConverterSuite.scala | 89 - .../DeclarativeAggregateEvaluator.scala | 11 +- .../codegen/GeneratedProjectionSuite.scala | 8 +- .../util/ArrayDataIndexedSeqSuite.scala | 4 +- .../org/apache/spark/sql/types/TestUDT.scala| 61 + .../spark/sql/FileBasedDataSourceSuite.scala| 4 +- .../apache/spark/sql/UserDefinedTypeSuite.scala | 105 +--- .../execution/datasources/json/JsonSuite.scala | 4 +- .../datasources/orc/OrcQuerySuite.scala | 4 +- .../hive/execution/AggregationQuerySuite.scala | 2 +- .../execution/ObjectHashAggregateSuite.scala| 4 +- .../sql/sources/HadoopFsRelationTest.scala | 2 +- 19 files changed, 371 insertions(+), 111 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/26128484/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala index 589e215..fbf0bd6 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala @@ -302,7 +302,7 @@ case class ExpressionEncoder[T]( private lazy val inputRow = new GenericInternalRow(1) @transient - private lazy val constructProjection = GenerateSafeProjection.generate(deserializer :: Nil) + private lazy val constructProjection = SafeProjection.create(deserializer :: Nil) /** * Returns a new set (with unique ids) of [[NamedExpression]] that represent the serialized form http://git-wip-us.apache.org/repos/asf/spark/blob/26128484/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/InterpretedSafeProjection.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/InterpretedSafeProjection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/InterpretedSafeProjection.scala new file mode 100644 index 000..70789da --- /dev/null +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/InterpretedSafeProjection.scala @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the