spark git commit: [SPARK-17458][SQL] Alias specified for aggregates in a pivot are not honored
Repository: spark Updated Branches: refs/heads/master 1202075c9 -> b72486f82 [SPARK-17458][SQL] Alias specified for aggregates in a pivot are not honored ## What changes were proposed in this pull request? This change preserves aliases that are given for pivot aggregations ## How was this patch tested? New unit test Author: Andrew RayCloses #15111 from aray/SPARK-17458. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b72486f8 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b72486f8 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b72486f8 Branch: refs/heads/master Commit: b72486f82dd9920135442191be5d384028e7fb41 Parents: 1202075 Author: Andrew Ray Authored: Thu Sep 15 21:45:29 2016 +0200 Committer: Herman van Hovell Committed: Thu Sep 15 21:45:29 2016 +0200 -- .../apache/spark/sql/catalyst/analysis/Analyzer.scala| 10 +- .../scala/org/apache/spark/sql/DataFramePivotSuite.scala | 11 +++ 2 files changed, 20 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/b72486f8/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 92bf8e0..5210f42 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -373,7 +373,15 @@ class Analyzer( case Pivot(groupByExprs, pivotColumn, pivotValues, aggregates, child) => val singleAgg = aggregates.size == 1 def outputName(value: Literal, aggregate: Expression): String = { - if (singleAgg) value.toString else value + "_" + aggregate.sql + if (singleAgg) { +value.toString + } else { +val suffix = aggregate match { + case n: NamedExpression => n.name + case _ => aggregate.sql +} +value + "_" + suffix + } } if (aggregates.forall(a => PivotFirst.supportsDataType(a.dataType))) { // Since evaluating |pivotValues| if statements for each input row can get slow this is an http://git-wip-us.apache.org/repos/asf/spark/blob/b72486f8/sql/core/src/test/scala/org/apache/spark/sql/DataFramePivotSuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFramePivotSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFramePivotSuite.scala index d5cb5e1..1bbe135 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFramePivotSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFramePivotSuite.scala @@ -197,4 +197,15 @@ class DataFramePivotSuite extends QueryTest with SharedSQLContext{ Row(2013, Seq(48000.0, 7.0), Seq(3.0, 7.0)) :: Nil ) } + + test("pivot preserves aliases if given") { +assertResult( + Array("year", "dotNET_foo", "dotNET_avg(`earnings`)", "Java_foo", "Java_avg(`earnings`)") +)( + courseSales.groupBy($"year") +.pivot("course", Seq("dotNET", "Java")) +.agg(sum($"earnings").as("foo"), avg($"earnings")).columns +) + } + } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-17484] Prevent invalid block locations from being reported after put() exceptions
Repository: spark Updated Branches: refs/heads/branch-2.0 0169c2edc -> 9c23f4408 [SPARK-17484] Prevent invalid block locations from being reported after put() exceptions ## What changes were proposed in this pull request? If a BlockManager `put()` call failed after the BlockManagerMaster was notified of a block's availability then incomplete cleanup logic in a `finally` block would never send a second block status method to inform the master of the block's unavailability. This, in turn, leads to fetch failures and used to be capable of causing complete job failures before #15037 was fixed. This patch addresses this issue via multiple small changes: - The `finally` block now calls `removeBlockInternal` when cleaning up from a failed `put()`; in addition to removing the `BlockInfo` entry (which was _all_ that the old cleanup logic did), this code (redundantly) tries to remove the block from the memory and disk stores (as an added layer of defense against bugs lower down in the stack) and optionally notifies the master of block removal (which now happens during exception-triggered cleanup). - When a BlockManager receives a request for a block that it does not have it will now notify the master to update its block locations. This ensures that bad metadata pointing to non-existent blocks will eventually be fixed. Note that I could have implemented this logic in the block manager client (rather than in the remote server), but that would introduce the problem of distinguishing between transient and permanent failures; on the server, however, we know definitively that the block isn't present. - Catch `NonFatal` instead of `Exception` to avoid swallowing `InterruptedException`s thrown from synchronous block replication calls. This patch depends upon the refactorings in #15036, so that other patch will also have to be backported when backporting this fix. For more background on this issue, including example logs from a real production failure, see [SPARK-17484](https://issues.apache.org/jira/browse/SPARK-17484). ## How was this patch tested? Two new regression tests in BlockManagerSuite. Author: Josh RosenCloses #15085 from JoshRosen/SPARK-17484. (cherry picked from commit 1202075c95eabba0ffebc170077df798f271a139) Signed-off-by: Josh Rosen Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9c23f440 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9c23f440 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9c23f440 Branch: refs/heads/branch-2.0 Commit: 9c23f4408d337f4af31ebfbcc78767df67d36aed Parents: 0169c2e Author: Josh Rosen Authored: Thu Sep 15 11:54:17 2016 -0700 Committer: Josh Rosen Committed: Thu Sep 15 11:54:39 2016 -0700 -- .../org/apache/spark/storage/BlockManager.scala | 37 +++- .../spark/storage/BlockManagerSuite.scala | 34 ++ 2 files changed, 63 insertions(+), 8 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/9c23f440/core/src/main/scala/org/apache/spark/storage/BlockManager.scala -- diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 48db97a..37dfbd6 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -279,7 +279,12 @@ private[spark] class BlockManager( } else { getLocalBytes(blockId) match { case Some(buffer) => new BlockManagerManagedBuffer(blockInfoManager, blockId, buffer) -case None => throw new BlockNotFoundException(blockId.toString) +case None => + // If this block manager receives a request for a block that it doesn't have then it's + // likely that the master has outdated block statuses for this block. Therefore, we send + // an RPC so that this block is marked as being unavailable from this block manager. + reportBlockStatus(blockId, BlockStatus.empty) + throw new BlockNotFoundException(blockId.toString) } } } @@ -856,22 +861,38 @@ private[spark] class BlockManager( } val startTimeMs = System.currentTimeMillis -var blockWasSuccessfullyStored: Boolean = false +var exceptionWasThrown: Boolean = true val result: Option[T] = try { val res = putBody(putBlockInfo) - blockWasSuccessfullyStored = res.isEmpty - res -} finally { - if (blockWasSuccessfullyStored) { + exceptionWasThrown = false + if (res.isEmpty) { +// the block was successfully stored
spark git commit: [SPARK-17484] Prevent invalid block locations from being reported after put() exceptions
Repository: spark Updated Branches: refs/heads/master a6b818200 -> 1202075c9 [SPARK-17484] Prevent invalid block locations from being reported after put() exceptions ## What changes were proposed in this pull request? If a BlockManager `put()` call failed after the BlockManagerMaster was notified of a block's availability then incomplete cleanup logic in a `finally` block would never send a second block status method to inform the master of the block's unavailability. This, in turn, leads to fetch failures and used to be capable of causing complete job failures before #15037 was fixed. This patch addresses this issue via multiple small changes: - The `finally` block now calls `removeBlockInternal` when cleaning up from a failed `put()`; in addition to removing the `BlockInfo` entry (which was _all_ that the old cleanup logic did), this code (redundantly) tries to remove the block from the memory and disk stores (as an added layer of defense against bugs lower down in the stack) and optionally notifies the master of block removal (which now happens during exception-triggered cleanup). - When a BlockManager receives a request for a block that it does not have it will now notify the master to update its block locations. This ensures that bad metadata pointing to non-existent blocks will eventually be fixed. Note that I could have implemented this logic in the block manager client (rather than in the remote server), but that would introduce the problem of distinguishing between transient and permanent failures; on the server, however, we know definitively that the block isn't present. - Catch `NonFatal` instead of `Exception` to avoid swallowing `InterruptedException`s thrown from synchronous block replication calls. This patch depends upon the refactorings in #15036, so that other patch will also have to be backported when backporting this fix. For more background on this issue, including example logs from a real production failure, see [SPARK-17484](https://issues.apache.org/jira/browse/SPARK-17484). ## How was this patch tested? Two new regression tests in BlockManagerSuite. Author: Josh RosenCloses #15085 from JoshRosen/SPARK-17484. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1202075c Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1202075c Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1202075c Branch: refs/heads/master Commit: 1202075c95eabba0ffebc170077df798f271a139 Parents: a6b8182 Author: Josh Rosen Authored: Thu Sep 15 11:54:17 2016 -0700 Committer: Josh Rosen Committed: Thu Sep 15 11:54:17 2016 -0700 -- .../org/apache/spark/storage/BlockManager.scala | 37 +++- .../spark/storage/BlockManagerSuite.scala | 34 ++ 2 files changed, 63 insertions(+), 8 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/1202075c/core/src/main/scala/org/apache/spark/storage/BlockManager.scala -- diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index c172ac2..aa29acf 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -283,7 +283,12 @@ private[spark] class BlockManager( } else { getLocalBytes(blockId) match { case Some(buffer) => new BlockManagerManagedBuffer(blockInfoManager, blockId, buffer) -case None => throw new BlockNotFoundException(blockId.toString) +case None => + // If this block manager receives a request for a block that it doesn't have then it's + // likely that the master has outdated block statuses for this block. Therefore, we send + // an RPC so that this block is marked as being unavailable from this block manager. + reportBlockStatus(blockId, BlockStatus.empty) + throw new BlockNotFoundException(blockId.toString) } } } @@ -859,22 +864,38 @@ private[spark] class BlockManager( } val startTimeMs = System.currentTimeMillis -var blockWasSuccessfullyStored: Boolean = false +var exceptionWasThrown: Boolean = true val result: Option[T] = try { val res = putBody(putBlockInfo) - blockWasSuccessfullyStored = res.isEmpty - res -} finally { - if (blockWasSuccessfullyStored) { + exceptionWasThrown = false + if (res.isEmpty) { +// the block was successfully stored if (keepReadLock) { blockInfoManager.downgradeLock(blockId) } else {
spark git commit: [SPARK-17364][SQL] Antlr lexer wrongly treats full qualified identifier as a decimal number token when parsing SQL string
Repository: spark Updated Branches: refs/heads/branch-2.0 abb89c42e -> 0169c2edc [SPARK-17364][SQL] Antlr lexer wrongly treats full qualified identifier as a decimal number token when parsing SQL string ## What changes were proposed in this pull request? The Antlr lexer we use to tokenize a SQL string may wrongly tokenize a fully qualified identifier as a decimal number token. For example, table identifier `default.123_table` is wrongly tokenized as ``` default // Matches lexer rule IDENTIFIER .123 // Matches lexer rule DECIMAL_VALUE _TABLE // Matches lexer rule IDENTIFIER ``` The correct tokenization for `default.123_table` should be: ``` default // Matches lexer rule IDENTIFIER, . // Matches a single dot 123_TABLE // Matches lexer rule IDENTIFIER ``` This PR fix the Antlr grammar so that it can tokenize fully qualified identifier correctly: 1. Fully qualified table name can be parsed correctly. For example, `select * from database.123_suffix`. 2. Fully qualified column name can be parsed correctly, for example `select a.123_suffix from a`. ### Before change Case 1: Failed to parse fully qualified column name ``` scala> spark.sql("select a.123_column from a").show org.apache.spark.sql.catalyst.parser.ParseException: extraneous input '.123' expecting {, ... , IDENTIFIER, BACKQUOTED_IDENTIFIER}(line 1, pos 8) == SQL == select a.123_column from a ^^^ ``` Case 2: Failed to parse fully qualified table name ``` scala> spark.sql("select * from default.123_table") org.apache.spark.sql.catalyst.parser.ParseException: extraneous input '.123' expecting {, ... IDENTIFIER, BACKQUOTED_IDENTIFIER}(line 1, pos 21) == SQL == select * from default.123_table -^^^ ``` ### After Change Case 1: fully qualified column name, no ParseException thrown ``` scala> spark.sql("select a.123_column from a").show ``` Case 2: fully qualified table name, no ParseException thrown ``` scala> spark.sql("select * from default.123_table") ``` ## How was this patch tested? Unit test. Author: Sean ZhongCloses #15006 from clockfly/SPARK-17364. (cherry picked from commit a6b8182006d0c3dda67c06861067ca78383ecf1b) Signed-off-by: Herman van Hovell Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0169c2ed Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0169c2ed Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0169c2ed Branch: refs/heads/branch-2.0 Commit: 0169c2edc35ee918b2972f2f4d4e112ccbdcb0c1 Parents: abb89c4 Author: Sean Zhong Authored: Thu Sep 15 20:53:48 2016 +0200 Committer: Herman van Hovell Committed: Thu Sep 15 20:54:01 2016 +0200 -- .../apache/spark/sql/catalyst/parser/SqlBase.g4 | 44 .../catalyst/parser/ExpressionParserSuite.scala | 15 ++- .../parser/TableIdentifierParserSuite.scala | 13 ++ 3 files changed, 63 insertions(+), 9 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/0169c2ed/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 -- diff --git a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 index 4b47fa3..8b72140 100644 --- a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 +++ b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 @@ -16,6 +16,30 @@ grammar SqlBase; +@members { + /** + * Verify whether current token is a valid decimal token (which contains dot). + * Returns true if the character that follows the token is not a digit or letter or underscore. + * + * For example: + * For char stream "2.3", "2." is not a valid decimal token, because it is followed by digit '3'. + * For char stream "2.3_", "2.3" is not a valid decimal token, because it is followed by '_'. + * For char stream "2.3W", "2.3" is not a valid decimal token, because it is followed by 'W'. + * For char stream "12.0D 34.E2+0.12 " 12.0D is a valid decimal token because it is folllowed + * by a space. 34.E2 is a valid decimal token because it is followed by symbol '+' + * which is not a digit or letter or underscore. + */ + public boolean isValidDecimal() { +int nextChar = _input.LA(1); +if (nextChar >= 'A' && nextChar <= 'Z' || nextChar >= '0' && nextChar <= '9' || + nextChar == '_') { + return false; +} else { + return true; +} + } +} + tokens { DELIMITER } @@ -908,23 +932,22 @@ INTEGER_VALUE ; DECIMAL_VALUE -: DIGIT+ '.' DIGIT* -| '.' DIGIT+ +:
spark git commit: [SPARK-17364][SQL] Antlr lexer wrongly treats full qualified identifier as a decimal number token when parsing SQL string
Repository: spark Updated Branches: refs/heads/master fe767395f -> a6b818200 [SPARK-17364][SQL] Antlr lexer wrongly treats full qualified identifier as a decimal number token when parsing SQL string ## What changes were proposed in this pull request? The Antlr lexer we use to tokenize a SQL string may wrongly tokenize a fully qualified identifier as a decimal number token. For example, table identifier `default.123_table` is wrongly tokenized as ``` default // Matches lexer rule IDENTIFIER .123 // Matches lexer rule DECIMAL_VALUE _TABLE // Matches lexer rule IDENTIFIER ``` The correct tokenization for `default.123_table` should be: ``` default // Matches lexer rule IDENTIFIER, . // Matches a single dot 123_TABLE // Matches lexer rule IDENTIFIER ``` This PR fix the Antlr grammar so that it can tokenize fully qualified identifier correctly: 1. Fully qualified table name can be parsed correctly. For example, `select * from database.123_suffix`. 2. Fully qualified column name can be parsed correctly, for example `select a.123_suffix from a`. ### Before change Case 1: Failed to parse fully qualified column name ``` scala> spark.sql("select a.123_column from a").show org.apache.spark.sql.catalyst.parser.ParseException: extraneous input '.123' expecting {, ... , IDENTIFIER, BACKQUOTED_IDENTIFIER}(line 1, pos 8) == SQL == select a.123_column from a ^^^ ``` Case 2: Failed to parse fully qualified table name ``` scala> spark.sql("select * from default.123_table") org.apache.spark.sql.catalyst.parser.ParseException: extraneous input '.123' expecting {, ... IDENTIFIER, BACKQUOTED_IDENTIFIER}(line 1, pos 21) == SQL == select * from default.123_table -^^^ ``` ### After Change Case 1: fully qualified column name, no ParseException thrown ``` scala> spark.sql("select a.123_column from a").show ``` Case 2: fully qualified table name, no ParseException thrown ``` scala> spark.sql("select * from default.123_table") ``` ## How was this patch tested? Unit test. Author: Sean ZhongCloses #15006 from clockfly/SPARK-17364. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a6b81820 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a6b81820 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a6b81820 Branch: refs/heads/master Commit: a6b8182006d0c3dda67c06861067ca78383ecf1b Parents: fe76739 Author: Sean Zhong Authored: Thu Sep 15 20:53:48 2016 +0200 Committer: Herman van Hovell Committed: Thu Sep 15 20:53:48 2016 +0200 -- .../apache/spark/sql/catalyst/parser/SqlBase.g4 | 44 .../catalyst/parser/ExpressionParserSuite.scala | 15 ++- .../parser/TableIdentifierParserSuite.scala | 13 ++ 3 files changed, 63 insertions(+), 9 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/a6b81820/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 -- diff --git a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 index b475abd..7023c0c 100644 --- a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 +++ b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 @@ -16,6 +16,30 @@ grammar SqlBase; +@members { + /** + * Verify whether current token is a valid decimal token (which contains dot). + * Returns true if the character that follows the token is not a digit or letter or underscore. + * + * For example: + * For char stream "2.3", "2." is not a valid decimal token, because it is followed by digit '3'. + * For char stream "2.3_", "2.3" is not a valid decimal token, because it is followed by '_'. + * For char stream "2.3W", "2.3" is not a valid decimal token, because it is followed by 'W'. + * For char stream "12.0D 34.E2+0.12 " 12.0D is a valid decimal token because it is folllowed + * by a space. 34.E2 is a valid decimal token because it is followed by symbol '+' + * which is not a digit or letter or underscore. + */ + public boolean isValidDecimal() { +int nextChar = _input.LA(1); +if (nextChar >= 'A' && nextChar <= 'Z' || nextChar >= '0' && nextChar <= '9' || + nextChar == '_') { + return false; +} else { + return true; +} + } +} + tokens { DELIMITER } @@ -920,23 +944,22 @@ INTEGER_VALUE ; DECIMAL_VALUE -: DIGIT+ '.' DIGIT* -| '.' DIGIT+ +: DECIMAL_DIGITS {isValidDecimal()}? ; SCIENTIFIC_DECIMAL_VALUE -: DIGIT+ ('.' DIGIT*)? EXPONENT -| '.' DIGIT+ EXPONENT +: DIGIT+
spark git commit: [SPARK-17429][SQL] use ImplicitCastInputTypes with function Length
Repository: spark Updated Branches: refs/heads/master d403562eb -> fe767395f [SPARK-17429][SQL] use ImplicitCastInputTypes with function Length ## What changes were proposed in this pull request? select length(11); select length(2.0); these sql will return errors, but hive is ok. this PR will support casting input types implicitly for function length the correct result is: select length(11) return 2 select length(2.0) return 3 Author: å²çæµ· <261810...@qq.com> Author: cenyuhaiCloses #15014 from cenyuhai/SPARK-17429. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fe767395 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fe767395 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fe767395 Branch: refs/heads/master Commit: fe767395ff46ee6236cf53aece85fcd61c0b49d3 Parents: d403562 Author: å²çæµ· <261810...@qq.com> Authored: Thu Sep 15 20:45:00 2016 +0200 Committer: Herman van Hovell Committed: Thu Sep 15 20:45:00 2016 +0200 -- .../sql/catalyst/expressions/stringExpressions.scala | 2 +- .../scala/org/apache/spark/sql/StringFunctionsSuite.scala | 10 ++ 2 files changed, 7 insertions(+), 5 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/fe767395/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala index a8c23a8..1bcbb6c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala @@ -1057,7 +1057,7 @@ case class Substring(str: Expression, pos: Expression, len: Expression) @ExpressionDescription( usage = "_FUNC_(str | binary) - Returns the length of str or number of bytes in binary data.", extended = "> SELECT _FUNC_('Spark SQL');\n 9") -case class Length(child: Expression) extends UnaryExpression with ExpectsInputTypes { +case class Length(child: Expression) extends UnaryExpression with ImplicitCastInputTypes { override def dataType: DataType = IntegerType override def inputTypes: Seq[AbstractDataType] = Seq(TypeCollection(StringType, BinaryType)) http://git-wip-us.apache.org/repos/asf/spark/blob/fe767395/sql/core/src/test/scala/org/apache/spark/sql/StringFunctionsSuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/StringFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/StringFunctionsSuite.scala index 1cc7746..bcc2351 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/StringFunctionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/StringFunctionsSuite.scala @@ -330,7 +330,8 @@ class StringFunctionsSuite extends QueryTest with SharedSQLContext { } test("string / binary length function") { -val df = Seq(("123", Array[Byte](1, 2, 3, 4), 123)).toDF("a", "b", "c") +val df = Seq(("123", Array[Byte](1, 2, 3, 4), 123, 2.0f, 3.015)) + .toDF("a", "b", "c", "d", "e") checkAnswer( df.select(length($"a"), length($"b")), Row(3, 4)) @@ -339,9 +340,10 @@ class StringFunctionsSuite extends QueryTest with SharedSQLContext { df.selectExpr("length(a)", "length(b)"), Row(3, 4)) -intercept[AnalysisException] { - df.selectExpr("length(c)") // int type of the argument is unacceptable -} +checkAnswer( + df.selectExpr("length(c)", "length(d)", "length(e)"), + Row(3, 3, 5) +) } test("initcap function") { - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-17114][SQL] Fix aggregates grouped by literals with empty input
Repository: spark Updated Branches: refs/heads/branch-2.0 e77a437d2 -> 62ab53658 [SPARK-17114][SQL] Fix aggregates grouped by literals with empty input ## What changes were proposed in this pull request? This PR fixes an issue with aggregates that have an empty input, and use a literals as their grouping keys. These aggregates are currently interpreted as aggregates **without** grouping keys, this triggers the ungrouped code path (which aways returns a single row). This PR fixes the `RemoveLiteralFromGroupExpressions` optimizer rule, which changes the semantics of the Aggregate by eliminating all literal grouping keys. ## How was this patch tested? Added tests to `SQLQueryTestSuite`. Author: Herman van HovellCloses #15101 from hvanhovell/SPARK-17114-3. (cherry picked from commit d403562eb4b5b1d804909861d3e8b75d8f6323b9) Signed-off-by: Herman van Hovell Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/62ab5365 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/62ab5365 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/62ab5365 Branch: refs/heads/branch-2.0 Commit: 62ab536588e19293a84004f547ebc316346b869e Parents: e77a437 Author: Herman van Hovell Authored: Thu Sep 15 20:24:15 2016 +0200 Committer: Herman van Hovell Committed: Thu Sep 15 20:24:29 2016 +0200 -- .../sql/catalyst/optimizer/Optimizer.scala | 11 - .../optimizer/AggregateOptimizeSuite.scala | 10 +++- .../resources/sql-tests/inputs/group-by.sql | 17 +++ .../sql-tests/results/group-by.sql.out | 51 4 files changed, 86 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/62ab5365/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index e743898..d824c2e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -1439,9 +1439,16 @@ object ReplaceExceptWithAntiJoin extends Rule[LogicalPlan] { */ object RemoveLiteralFromGroupExpressions extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan transform { -case a @ Aggregate(grouping, _, _) => +case a @ Aggregate(grouping, _, _) if grouping.nonEmpty => val newGrouping = grouping.filter(!_.foldable) - a.copy(groupingExpressions = newGrouping) + if (newGrouping.nonEmpty) { +a.copy(groupingExpressions = newGrouping) + } else { +// All grouping expressions are literals. We should not drop them all, because this can +// change the return semantics when the input of the Aggregate is empty (SPARK-17114). We +// instead replace this by single, easy to hash/sort, literal expression. +a.copy(groupingExpressions = Seq(Literal(0, IntegerType))) + } } } http://git-wip-us.apache.org/repos/asf/spark/blob/62ab5365/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/AggregateOptimizeSuite.scala -- diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/AggregateOptimizeSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/AggregateOptimizeSuite.scala index 4c26c18..aecf59a 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/AggregateOptimizeSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/AggregateOptimizeSuite.scala @@ -28,7 +28,7 @@ import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan} import org.apache.spark.sql.catalyst.rules.RuleExecutor class AggregateOptimizeSuite extends PlanTest { - val conf = new SimpleCatalystConf(caseSensitiveAnalysis = false) + val conf = SimpleCatalystConf(caseSensitiveAnalysis = false, groupByOrdinal = false) val catalog = new SessionCatalog(new InMemoryCatalog, EmptyFunctionRegistry, conf) val analyzer = new Analyzer(catalog, conf) @@ -49,6 +49,14 @@ class AggregateOptimizeSuite extends PlanTest { comparePlans(optimized, correctAnswer) } + test("do not remove all grouping expressions if they are all literals") { +val query = testRelation.groupBy(Literal("1"), Literal(1) + Literal(2))(sum('b)) +val optimized = Optimize.execute(analyzer.execute(query)) +val
spark git commit: [SPARK-17547] Ensure temp shuffle data file is cleaned up after error
Repository: spark Updated Branches: refs/heads/branch-1.6 a447cd888 -> 8646b84fb [SPARK-17547] Ensure temp shuffle data file is cleaned up after error SPARK-8029 (#9610) modified shuffle writers to first stage their data to a temporary file in the same directory as the final destination file and then to atomically rename this temporary file at the end of the write job. However, this change introduced the potential for the temporary output file to be leaked if an exception occurs during the write because the shuffle writers' existing error cleanup code doesn't handle deletion of the temp file. This patch avoids this potential cause of disk-space leaks by adding `finally` blocks to ensure that temp files are always deleted if they haven't been renamed. Author: Josh RosenCloses #15104 from JoshRosen/cleanup-tmp-data-file-in-shuffle-writer. (cherry picked from commit 5b8f7377d54f83b93ef2bfc2a01ca65fae6d3032) Signed-off-by: Josh Rosen Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8646b84f Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8646b84f Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8646b84f Branch: refs/heads/branch-1.6 Commit: 8646b84fb8ed319e3a998f93de4821c723f7d419 Parents: a447cd8 Author: Josh Rosen Authored: Thu Sep 15 11:22:58 2016 -0700 Committer: Josh Rosen Committed: Thu Sep 15 11:24:00 2016 -0700 -- .../sort/BypassMergeSortShuffleWriter.java | 10 ++- .../spark/shuffle/sort/UnsafeShuffleWriter.java | 18 +++-- .../shuffle/IndexShuffleBlockResolver.scala | 80 +++- .../spark/shuffle/sort/SortShuffleWriter.scala | 14 +++- 4 files changed, 73 insertions(+), 49 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/8646b84f/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java -- diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java b/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java index a1a1fb0..80d24b9 100644 --- a/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java +++ b/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java @@ -157,8 +157,14 @@ final class BypassMergeSortShuffleWriter extends ShuffleWriter { File output = shuffleBlockResolver.getDataFile(shuffleId, mapId); File tmp = Utils.tempFileWith(output); -partitionLengths = writePartitionedFile(tmp); -shuffleBlockResolver.writeIndexFileAndCommit(shuffleId, mapId, partitionLengths, tmp); +try { + partitionLengths = writePartitionedFile(tmp); + shuffleBlockResolver.writeIndexFileAndCommit(shuffleId, mapId, partitionLengths, tmp); +} finally { + if (tmp.exists() && !tmp.delete()) { +logger.error("Error while deleting temp file {}", tmp.getAbsolutePath()); + } +} mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths); } http://git-wip-us.apache.org/repos/asf/spark/blob/8646b84f/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java -- diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java b/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java index 744c300..d5e16fc 100644 --- a/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java +++ b/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java @@ -209,15 +209,21 @@ public class UnsafeShuffleWriter extends ShuffleWriter { final File output = shuffleBlockResolver.getDataFile(shuffleId, mapId); final File tmp = Utils.tempFileWith(output); try { - partitionLengths = mergeSpills(spills, tmp); -} finally { - for (SpillInfo spill : spills) { -if (spill.file.exists() && ! spill.file.delete()) { - logger.error("Error while deleting spill file {}", spill.file.getPath()); + try { +partitionLengths = mergeSpills(spills, tmp); + } finally { +for (SpillInfo spill : spills) { + if (spill.file.exists() && ! spill.file.delete()) { +logger.error("Error while deleting spill file {}", spill.file.getPath()); + } } } + shuffleBlockResolver.writeIndexFileAndCommit(shuffleId, mapId, partitionLengths, tmp); +} finally { + if (tmp.exists() && !tmp.delete()) { +logger.error("Error while deleting temp file {}", tmp.getAbsolutePath()); + }
spark git commit: [SPARK-17114][SQL] Fix aggregates grouped by literals with empty input
Repository: spark Updated Branches: refs/heads/master 5b8f7377d -> d403562eb [SPARK-17114][SQL] Fix aggregates grouped by literals with empty input ## What changes were proposed in this pull request? This PR fixes an issue with aggregates that have an empty input, and use a literals as their grouping keys. These aggregates are currently interpreted as aggregates **without** grouping keys, this triggers the ungrouped code path (which aways returns a single row). This PR fixes the `RemoveLiteralFromGroupExpressions` optimizer rule, which changes the semantics of the Aggregate by eliminating all literal grouping keys. ## How was this patch tested? Added tests to `SQLQueryTestSuite`. Author: Herman van HovellCloses #15101 from hvanhovell/SPARK-17114-3. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d403562e Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d403562e Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d403562e Branch: refs/heads/master Commit: d403562eb4b5b1d804909861d3e8b75d8f6323b9 Parents: 5b8f737 Author: Herman van Hovell Authored: Thu Sep 15 20:24:15 2016 +0200 Committer: Herman van Hovell Committed: Thu Sep 15 20:24:15 2016 +0200 -- .../sql/catalyst/optimizer/Optimizer.scala | 11 - .../optimizer/AggregateOptimizeSuite.scala | 10 +++- .../resources/sql-tests/inputs/group-by.sql | 17 +++ .../sql-tests/results/group-by.sql.out | 51 4 files changed, 86 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/d403562e/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index d2f0c97..0df16b7 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -1098,9 +1098,16 @@ object ReplaceExceptWithAntiJoin extends Rule[LogicalPlan] { */ object RemoveLiteralFromGroupExpressions extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan transform { -case a @ Aggregate(grouping, _, _) => +case a @ Aggregate(grouping, _, _) if grouping.nonEmpty => val newGrouping = grouping.filter(!_.foldable) - a.copy(groupingExpressions = newGrouping) + if (newGrouping.nonEmpty) { +a.copy(groupingExpressions = newGrouping) + } else { +// All grouping expressions are literals. We should not drop them all, because this can +// change the return semantics when the input of the Aggregate is empty (SPARK-17114). We +// instead replace this by single, easy to hash/sort, literal expression. +a.copy(groupingExpressions = Seq(Literal(0, IntegerType))) + } } } http://git-wip-us.apache.org/repos/asf/spark/blob/d403562e/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/AggregateOptimizeSuite.scala -- diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/AggregateOptimizeSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/AggregateOptimizeSuite.scala index 4c26c18..aecf59a 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/AggregateOptimizeSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/AggregateOptimizeSuite.scala @@ -28,7 +28,7 @@ import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan} import org.apache.spark.sql.catalyst.rules.RuleExecutor class AggregateOptimizeSuite extends PlanTest { - val conf = new SimpleCatalystConf(caseSensitiveAnalysis = false) + val conf = SimpleCatalystConf(caseSensitiveAnalysis = false, groupByOrdinal = false) val catalog = new SessionCatalog(new InMemoryCatalog, EmptyFunctionRegistry, conf) val analyzer = new Analyzer(catalog, conf) @@ -49,6 +49,14 @@ class AggregateOptimizeSuite extends PlanTest { comparePlans(optimized, correctAnswer) } + test("do not remove all grouping expressions if they are all literals") { +val query = testRelation.groupBy(Literal("1"), Literal(1) + Literal(2))(sum('b)) +val optimized = Optimize.execute(analyzer.execute(query)) +val correctAnswer = analyzer.execute(testRelation.groupBy(Literal(0))(sum('b))) + +comparePlans(optimized, correctAnswer) + } + test("Remove
spark git commit: [SPARK-17547] Ensure temp shuffle data file is cleaned up after error
Repository: spark Updated Branches: refs/heads/branch-2.0 a09c258c9 -> e77a437d2 [SPARK-17547] Ensure temp shuffle data file is cleaned up after error SPARK-8029 (#9610) modified shuffle writers to first stage their data to a temporary file in the same directory as the final destination file and then to atomically rename this temporary file at the end of the write job. However, this change introduced the potential for the temporary output file to be leaked if an exception occurs during the write because the shuffle writers' existing error cleanup code doesn't handle deletion of the temp file. This patch avoids this potential cause of disk-space leaks by adding `finally` blocks to ensure that temp files are always deleted if they haven't been renamed. Author: Josh RosenCloses #15104 from JoshRosen/cleanup-tmp-data-file-in-shuffle-writer. (cherry picked from commit 5b8f7377d54f83b93ef2bfc2a01ca65fae6d3032) Signed-off-by: Josh Rosen Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e77a437d Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e77a437d Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e77a437d Branch: refs/heads/branch-2.0 Commit: e77a437d292ecda66163a895427d62e4f72e2a25 Parents: a09c258 Author: Josh Rosen Authored: Thu Sep 15 11:22:58 2016 -0700 Committer: Josh Rosen Committed: Thu Sep 15 11:23:17 2016 -0700 -- .../sort/BypassMergeSortShuffleWriter.java | 10 ++- .../spark/shuffle/sort/UnsafeShuffleWriter.java | 18 +++-- .../shuffle/IndexShuffleBlockResolver.scala | 80 +++- .../spark/shuffle/sort/SortShuffleWriter.scala | 14 +++- 4 files changed, 73 insertions(+), 49 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/e77a437d/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java -- diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java b/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java index 0e9defe..601dd6e 100644 --- a/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java +++ b/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java @@ -156,8 +156,14 @@ final class BypassMergeSortShuffleWriter extends ShuffleWriter { File output = shuffleBlockResolver.getDataFile(shuffleId, mapId); File tmp = Utils.tempFileWith(output); -partitionLengths = writePartitionedFile(tmp); -shuffleBlockResolver.writeIndexFileAndCommit(shuffleId, mapId, partitionLengths, tmp); +try { + partitionLengths = writePartitionedFile(tmp); + shuffleBlockResolver.writeIndexFileAndCommit(shuffleId, mapId, partitionLengths, tmp); +} finally { + if (tmp.exists() && !tmp.delete()) { +logger.error("Error while deleting temp file {}", tmp.getAbsolutePath()); + } +} mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths); } http://git-wip-us.apache.org/repos/asf/spark/blob/e77a437d/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java -- diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java b/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java index 44e6aa7..c08a5d4 100644 --- a/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java +++ b/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java @@ -207,15 +207,21 @@ public class UnsafeShuffleWriter extends ShuffleWriter { final File output = shuffleBlockResolver.getDataFile(shuffleId, mapId); final File tmp = Utils.tempFileWith(output); try { - partitionLengths = mergeSpills(spills, tmp); -} finally { - for (SpillInfo spill : spills) { -if (spill.file.exists() && ! spill.file.delete()) { - logger.error("Error while deleting spill file {}", spill.file.getPath()); + try { +partitionLengths = mergeSpills(spills, tmp); + } finally { +for (SpillInfo spill : spills) { + if (spill.file.exists() && ! spill.file.delete()) { +logger.error("Error while deleting spill file {}", spill.file.getPath()); + } } } + shuffleBlockResolver.writeIndexFileAndCommit(shuffleId, mapId, partitionLengths, tmp); +} finally { + if (tmp.exists() && !tmp.delete()) { +logger.error("Error while deleting temp file {}", tmp.getAbsolutePath()); + }
spark git commit: [SPARK-17547] Ensure temp shuffle data file is cleaned up after error
Repository: spark Updated Branches: refs/heads/master 0ad8eeb4d -> 5b8f7377d [SPARK-17547] Ensure temp shuffle data file is cleaned up after error SPARK-8029 (#9610) modified shuffle writers to first stage their data to a temporary file in the same directory as the final destination file and then to atomically rename this temporary file at the end of the write job. However, this change introduced the potential for the temporary output file to be leaked if an exception occurs during the write because the shuffle writers' existing error cleanup code doesn't handle deletion of the temp file. This patch avoids this potential cause of disk-space leaks by adding `finally` blocks to ensure that temp files are always deleted if they haven't been renamed. Author: Josh RosenCloses #15104 from JoshRosen/cleanup-tmp-data-file-in-shuffle-writer. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5b8f7377 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5b8f7377 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5b8f7377 Branch: refs/heads/master Commit: 5b8f7377d54f83b93ef2bfc2a01ca65fae6d3032 Parents: 0ad8eeb Author: Josh Rosen Authored: Thu Sep 15 11:22:58 2016 -0700 Committer: Josh Rosen Committed: Thu Sep 15 11:22:58 2016 -0700 -- .../sort/BypassMergeSortShuffleWriter.java | 10 ++- .../spark/shuffle/sort/UnsafeShuffleWriter.java | 18 +++-- .../shuffle/IndexShuffleBlockResolver.scala | 80 +++- .../spark/shuffle/sort/SortShuffleWriter.scala | 14 +++- 4 files changed, 73 insertions(+), 49 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/5b8f7377/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java -- diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java b/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java index 0fcc56d..4a15559 100644 --- a/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java +++ b/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java @@ -160,8 +160,14 @@ final class BypassMergeSortShuffleWriter extends ShuffleWriter { File output = shuffleBlockResolver.getDataFile(shuffleId, mapId); File tmp = Utils.tempFileWith(output); -partitionLengths = writePartitionedFile(tmp); -shuffleBlockResolver.writeIndexFileAndCommit(shuffleId, mapId, partitionLengths, tmp); +try { + partitionLengths = writePartitionedFile(tmp); + shuffleBlockResolver.writeIndexFileAndCommit(shuffleId, mapId, partitionLengths, tmp); +} finally { + if (tmp.exists() && !tmp.delete()) { +logger.error("Error while deleting temp file {}", tmp.getAbsolutePath()); + } +} mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths); } http://git-wip-us.apache.org/repos/asf/spark/blob/5b8f7377/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java -- diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java b/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java index 63d376b..f235c43 100644 --- a/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java +++ b/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java @@ -210,15 +210,21 @@ public class UnsafeShuffleWriter extends ShuffleWriter { final File output = shuffleBlockResolver.getDataFile(shuffleId, mapId); final File tmp = Utils.tempFileWith(output); try { - partitionLengths = mergeSpills(spills, tmp); -} finally { - for (SpillInfo spill : spills) { -if (spill.file.exists() && ! spill.file.delete()) { - logger.error("Error while deleting spill file {}", spill.file.getPath()); + try { +partitionLengths = mergeSpills(spills, tmp); + } finally { +for (SpillInfo spill : spills) { + if (spill.file.exists() && ! spill.file.delete()) { +logger.error("Error while deleting spill file {}", spill.file.getPath()); + } } } + shuffleBlockResolver.writeIndexFileAndCommit(shuffleId, mapId, partitionLengths, tmp); +} finally { + if (tmp.exists() && !tmp.delete()) { +logger.error("Error while deleting temp file {}", tmp.getAbsolutePath()); + } } -shuffleBlockResolver.writeIndexFileAndCommit(shuffleId, mapId, partitionLengths, tmp); mapStatus =
spark git commit: [SPARK-17379][BUILD] Upgrade netty-all to 4.0.41 final for bug fixes
Repository: spark Updated Branches: refs/heads/master b47927814 -> 0ad8eeb4d [SPARK-17379][BUILD] Upgrade netty-all to 4.0.41 final for bug fixes ## What changes were proposed in this pull request? Upgrade netty-all to latest in the 4.0.x line which is 4.0.41, mentions several bug fixes and performance improvements we may find useful, see netty.io/news/2016/08/29/4-0-41-Final-4-1-5-Final.html. Initially tried to use 4.1.5 but noticed it's not backwards compatible. ## How was this patch tested? Existing unit tests against branch-1.6 and branch-2.0 using IBM Java 8 on Intel, Power and Z architectures Author: Adam RobertsCloses #14961 from a-roberts/netty. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0ad8eeb4 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0ad8eeb4 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0ad8eeb4 Branch: refs/heads/master Commit: 0ad8eeb4d365c2fff5715ec22fbcf4c69c3340fd Parents: b479278 Author: Adam Roberts Authored: Thu Sep 15 10:40:10 2016 -0700 Committer: Shixiong Zhu Committed: Thu Sep 15 10:40:10 2016 -0700 -- .../main/java/org/apache/spark/network/util/TransportConf.java | 5 + dev/deps/spark-deps-hadoop-2.2 | 2 +- dev/deps/spark-deps-hadoop-2.3 | 2 +- dev/deps/spark-deps-hadoop-2.4 | 2 +- dev/deps/spark-deps-hadoop-2.6 | 2 +- dev/deps/spark-deps-hadoop-2.7 | 2 +- pom.xml | 2 +- 7 files changed, 11 insertions(+), 6 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/0ad8eeb4/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java -- diff --git a/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java b/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java index 0efc400..7d5baa9 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java +++ b/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java @@ -23,6 +23,11 @@ import com.google.common.primitives.Ints; * A central location that tracks all the settings we expose to users. */ public class TransportConf { + + static { +// Set this due to Netty PR #5661 for Netty 4.0.37+ to work +System.setProperty("io.netty.maxDirectMemory", "0"); + } private final String SPARK_NETWORK_IO_MODE_KEY; private final String SPARK_NETWORK_IO_PREFERDIRECTBUFS_KEY; http://git-wip-us.apache.org/repos/asf/spark/blob/0ad8eeb4/dev/deps/spark-deps-hadoop-2.2 -- diff --git a/dev/deps/spark-deps-hadoop-2.2 b/dev/deps/spark-deps-hadoop-2.2 index 81adde6..a7259e2 100644 --- a/dev/deps/spark-deps-hadoop-2.2 +++ b/dev/deps/spark-deps-hadoop-2.2 @@ -124,7 +124,7 @@ metrics-json-3.1.2.jar metrics-jvm-3.1.2.jar minlog-1.3.0.jar netty-3.8.0.Final.jar -netty-all-4.0.29.Final.jar +netty-all-4.0.41.Final.jar objenesis-2.1.jar opencsv-2.3.jar oro-2.0.8.jar http://git-wip-us.apache.org/repos/asf/spark/blob/0ad8eeb4/dev/deps/spark-deps-hadoop-2.3 -- diff --git a/dev/deps/spark-deps-hadoop-2.3 b/dev/deps/spark-deps-hadoop-2.3 index 75ab628..6986ab5 100644 --- a/dev/deps/spark-deps-hadoop-2.3 +++ b/dev/deps/spark-deps-hadoop-2.3 @@ -131,7 +131,7 @@ metrics-jvm-3.1.2.jar minlog-1.3.0.jar mx4j-3.0.2.jar netty-3.8.0.Final.jar -netty-all-4.0.29.Final.jar +netty-all-4.0.41.Final.jar objenesis-2.1.jar opencsv-2.3.jar oro-2.0.8.jar http://git-wip-us.apache.org/repos/asf/spark/blob/0ad8eeb4/dev/deps/spark-deps-hadoop-2.4 -- diff --git a/dev/deps/spark-deps-hadoop-2.4 b/dev/deps/spark-deps-hadoop-2.4 index 897d802..75cccb3 100644 --- a/dev/deps/spark-deps-hadoop-2.4 +++ b/dev/deps/spark-deps-hadoop-2.4 @@ -131,7 +131,7 @@ metrics-jvm-3.1.2.jar minlog-1.3.0.jar mx4j-3.0.2.jar netty-3.8.0.Final.jar -netty-all-4.0.29.Final.jar +netty-all-4.0.41.Final.jar objenesis-2.1.jar opencsv-2.3.jar oro-2.0.8.jar http://git-wip-us.apache.org/repos/asf/spark/blob/0ad8eeb4/dev/deps/spark-deps-hadoop-2.6 -- diff --git a/dev/deps/spark-deps-hadoop-2.6 b/dev/deps/spark-deps-hadoop-2.6 index f95ddb1..ef7b8a7 100644 --- a/dev/deps/spark-deps-hadoop-2.6 +++ b/dev/deps/spark-deps-hadoop-2.6 @@ -139,7 +139,7 @@
spark git commit: [SPARK-17451][CORE] CoarseGrainedExecutorBackend should inform driver before self-kill
Repository: spark Updated Branches: refs/heads/master 2ad276954 -> b47927814 [SPARK-17451][CORE] CoarseGrainedExecutorBackend should inform driver before self-kill ## What changes were proposed in this pull request? Jira : https://issues.apache.org/jira/browse/SPARK-17451 `CoarseGrainedExecutorBackend` in some failure cases exits the JVM. While this does not have any issue, from the driver UI there is no specific reason captured for this. In this PR, I am adding functionality to `exitExecutor` to notify driver that the executor is exiting. ## How was this patch tested? Ran the change over a test env and took down shuffle service before the executor could register to it. In the driver logs, where the job failure reason is mentioned (ie. `Job aborted due to stage ...` it gives the correct reason: Before: `ExecutorLostFailure (executor Z exited caused by one of the running tasks) Reason: Remote RPC client disassociated. Likely due to containers exceeding thresholds, or network issues. Check driver logs for WARN messages.` After: `ExecutorLostFailure (executor Z exited caused by one of the running tasks) Reason: Unable to create executor due to java.util.concurrent.TimeoutException: Timeout waiting for task.` Author: Tejas PatilCloses #15013 from tejasapatil/SPARK-17451_inform_driver. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b4792781 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b4792781 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b4792781 Branch: refs/heads/master Commit: b479278142728eb003b9ee466fab0e8d6ec4b13d Parents: 2ad2769 Author: Tejas Patil Authored: Thu Sep 15 10:23:41 2016 -0700 Committer: Shixiong Zhu Committed: Thu Sep 15 10:23:41 2016 -0700 -- .../executor/CoarseGrainedExecutorBackend.scala | 26 +++- .../org/apache/spark/storage/BlockManager.scala | 3 +++ 2 files changed, 23 insertions(+), 6 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/b4792781/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala -- diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index 391b97d..7eec4ae 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -31,7 +31,7 @@ import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.deploy.worker.WorkerWatcher import org.apache.spark.internal.Logging import org.apache.spark.rpc._ -import org.apache.spark.scheduler.TaskDescription +import org.apache.spark.scheduler.{ExecutorLossReason, TaskDescription} import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._ import org.apache.spark.serializer.SerializerInstance import org.apache.spark.util.{ThreadUtils, Utils} @@ -65,7 +65,7 @@ private[spark] class CoarseGrainedExecutorBackend( case Success(msg) => // Always receive `true`. Just ignore it case Failure(e) => -exitExecutor(1, s"Cannot register with driver: $driverUrl", e) +exitExecutor(1, s"Cannot register with driver: $driverUrl", e, notifyDriver = false) }(ThreadUtils.sameThread) } @@ -129,7 +129,8 @@ private[spark] class CoarseGrainedExecutorBackend( if (stopping.get()) { logInfo(s"Driver from $remoteAddress disconnected during shutdown") } else if (driver.exists(_.address == remoteAddress)) { - exitExecutor(1, s"Driver $remoteAddress disassociated! Shutting down.") + exitExecutor(1, s"Driver $remoteAddress disassociated! Shutting down.", null, +notifyDriver = false) } else { logWarning(s"An unknown ($remoteAddress) driver disconnected.") } @@ -148,12 +149,25 @@ private[spark] class CoarseGrainedExecutorBackend( * executor exits differently. For e.g. when an executor goes down, * back-end may not want to take the parent process down. */ - protected def exitExecutor(code: Int, reason: String, throwable: Throwable = null) = { + protected def exitExecutor(code: Int, + reason: String, + throwable: Throwable = null, + notifyDriver: Boolean = true) = { +val message = "Executor self-exiting due to : " + reason if (throwable != null) { - logError(reason, throwable) + logError(message, throwable) } else { - logError(reason) + logError(message) } + +if (notifyDriver && driver.nonEmpty) { +
spark git commit: [SPARK-17317][SPARKR] Add SparkR vignette to branch 2.0
Repository: spark Updated Branches: refs/heads/branch-2.0 5c2bc8360 -> a09c258c9 [SPARK-17317][SPARKR] Add SparkR vignette to branch 2.0 ## What changes were proposed in this pull request? This PR adds SparkR vignette to branch 2.0, which works as a friendly guidance going through the functionality provided by SparkR. ## How was this patch tested? R unit test. Author: junyangqAuthor: Shivaram Venkataraman Author: Junyang Qian Closes #15100 from junyangq/SPARKR-vignette-2.0. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a09c258c Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a09c258c Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a09c258c Branch: refs/heads/branch-2.0 Commit: a09c258c9a97e701fa7650cc0651e3c6a7a1cab9 Parents: 5c2bc83 Author: junyangq Authored: Thu Sep 15 10:00:36 2016 -0700 Committer: Shivaram Venkataraman Committed: Thu Sep 15 10:00:36 2016 -0700 -- R/create-docs.sh | 11 +- R/pkg/vignettes/sparkr-vignettes.Rmd | 643 ++ 2 files changed, 652 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/a09c258c/R/create-docs.sh -- diff --git a/R/create-docs.sh b/R/create-docs.sh index d2ae160..0dfba22 100755 --- a/R/create-docs.sh +++ b/R/create-docs.sh @@ -17,11 +17,13 @@ # limitations under the License. # -# Script to create API docs for SparkR -# This requires `devtools` and `knitr` to be installed on the machine. +# Script to create API docs and vignettes for SparkR +# This requires `devtools`, `knitr` and `rmarkdown` to be installed on the machine. # After running this script the html docs can be found in # $SPARK_HOME/R/pkg/html +# The vignettes can be found in +# $SPARK_HOME/R/pkg/vignettes/sparkr_vignettes.html set -o pipefail set -e @@ -43,4 +45,9 @@ Rscript -e 'libDir <- "../../lib"; library(SparkR, lib.loc=libDir); library(knit popd +# render creates SparkR vignettes +Rscript -e 'library(rmarkdown); paths <- .libPaths(); .libPaths(c("lib", paths)); Sys.setenv(SPARK_HOME=tools::file_path_as_absolute("..")); render("pkg/vignettes/sparkr-vignettes.Rmd"); .libPaths(paths)' + +find pkg/vignettes/. -not -name '.' -not -name '*.Rmd' -not -name '*.md' -not -name '*.pdf' -not -name '*.html' -delete + popd http://git-wip-us.apache.org/repos/asf/spark/blob/a09c258c/R/pkg/vignettes/sparkr-vignettes.Rmd -- diff --git a/R/pkg/vignettes/sparkr-vignettes.Rmd b/R/pkg/vignettes/sparkr-vignettes.Rmd new file mode 100644 index 000..5156c9e --- /dev/null +++ b/R/pkg/vignettes/sparkr-vignettes.Rmd @@ -0,0 +1,643 @@ +--- +title: "SparkR - Practical Guide" +output: + html_document: +theme: united +toc: true +toc_depth: 4 +toc_float: true +highlight: textmate +--- + +## Overview + +SparkR is an R package that provides a light-weight frontend to use Apache Spark from R. With Spark `r packageVersion("SparkR")`, SparkR provides a distributed data frame implementation that supports data processing operations like selection, filtering, aggregation etc. and distributed machine learning using [MLlib](http://spark.apache.org/mllib/). + +## Getting Started + +We begin with an example running on the local machine and provide an overview of the use of SparkR: data ingestion, data processing and machine learning. + +First, let's load and attach the package. +```{r, message=FALSE} +library(SparkR) +``` + +`SparkSession` is the entry point into SparkR which connects your R program to a Spark cluster. You can create a `SparkSession` using `sparkR.session` and pass in options such as the application name, any Spark packages depended on, etc. + +We use default settings in which it runs in local mode. It auto downloads Spark package in the background if no previous installation is found. For more details about setup, see [Spark Session](#SetupSparkSession). + +```{r, message=FALSE} +sparkR.session() +``` + +The operations in SparkR are centered around an R class called `SparkDataFrame`. It is a distributed collection of data organized into named columns, which is conceptually equivalent to a table in a relational database or a data frame in R, but with richer optimizations under the hood. + +`SparkDataFrame` can be constructed from a wide array of sources such as: structured data files, tables in Hive, external databases, or existing local R data frames. For example, we create a `SparkDataFrame` from a local R data frame, + +```{r} +cars <- cbind(model = rownames(mtcars), mtcars)
spark git commit: [SPARK-17406][BUILD][HOTFIX] MiMa excludes fix
Repository: spark Updated Branches: refs/heads/master 71a65825c -> 2ad276954 [SPARK-17406][BUILD][HOTFIX] MiMa excludes fix ## What changes were proposed in this pull request? Following https://github.com/apache/spark/pull/14969 for some reason the MiMa excludes weren't complete, but still passed the PR builder. This adds 3 more excludes from https://amplab.cs.berkeley.edu/jenkins/view/Spark%20QA%20Test%20(Dashboard)/job/spark-master-test-sbt-hadoop-2.2/1749/consoleFull It also moves the excludes to their own Seq in the build, as they probably should have been. Even though this is merged to 2.1.x only / master, I left the exclude in for 2.0.x in case we back port. It's a private API so is always a false positive. ## How was this patch tested? Jenkins build Author: Sean OwenCloses #15110 from srowen/SPARK-17406.2. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2ad27695 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2ad27695 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2ad27695 Branch: refs/heads/master Commit: 2ad276954858b0a7b3f442b9e440c72cbb1610e2 Parents: 71a6582 Author: Sean Owen Authored: Thu Sep 15 13:54:41 2016 +0100 Committer: Sean Owen Committed: Thu Sep 15 13:54:41 2016 +0100 -- project/MimaExcludes.scala | 29 + 1 file changed, 17 insertions(+), 12 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/2ad27695/project/MimaExcludes.scala -- diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index 37fff2e..1bdcf9a 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -426,18 +426,6 @@ object MimaExcludes { ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.storage.StorageStatusListener.this"), ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.streaming.scheduler.BatchInfo.streamIdToNumRecords"), ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.ui.exec.ExecutorsListener.storageStatusList"), - ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.ui.exec.ExecutorsListener.executorIdToData"), - ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.ui.exec.ExecutorsListener.executorToTasksActive"), - ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.ui.exec.ExecutorsListener.executorToTasksComplete"), - ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.ui.exec.ExecutorsListener.executorToInputRecords"), - ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.ui.exec.ExecutorsListener.executorToShuffleRead"), - ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.ui.exec.ExecutorsListener.executorToTasksFailed"), - ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.ui.exec.ExecutorsListener.executorToShuffleWrite"), - ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.ui.exec.ExecutorsListener.executorToDuration"), - ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.ui.exec.ExecutorsListener.executorToInputBytes"), - ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.ui.exec.ExecutorsListener.executorToLogUrls"), - ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.ui.exec.ExecutorsListener.executorToOutputBytes"), - ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.ui.exec.ExecutorsListener.executorToOutputRecords"), ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.ui.exec.ExecutorsListener.this"), ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.ui.storage.StorageListener.storageStatusList"), ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.ExceptionFailure.apply"), @@ -807,6 +795,23 @@ object MimaExcludes { // SPARK-17096: Improve exception string reported through the StreamingQueryListener ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.streaming.StreamingQueryListener#QueryTerminated.stackTrace"), ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.streaming.StreamingQueryListener#QueryTerminated.this") +) ++ Seq( + // SPARK-17406 limit timeline executor events + ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.ui.exec.ExecutorsListener.executorIdToData"), + ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.ui.exec.ExecutorsListener.executorToTasksActive"), +
spark git commit: [SPARK-17536][SQL] Minor performance improvement to JDBC batch inserts
Repository: spark Updated Branches: refs/heads/master ad79fc0a8 -> 71a65825c [SPARK-17536][SQL] Minor performance improvement to JDBC batch inserts ## What changes were proposed in this pull request? Optimize a while loop during batch inserts ## How was this patch tested? Unit tests were done, specifically "mvn test" for sql Author: John MullerCloses #15098 from blue666man/SPARK-17536. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/71a65825 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/71a65825 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/71a65825 Branch: refs/heads/master Commit: 71a65825c5d5d0886ac3e11f9945cfcb39573ac3 Parents: ad79fc0 Author: John Muller Authored: Thu Sep 15 10:00:28 2016 +0100 Committer: Sean Owen Committed: Thu Sep 15 10:00:28 2016 +0100 -- .../apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala| 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/71a65825/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala index 132472a..b09fd51 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala @@ -590,12 +590,12 @@ object JdbcUtils extends Logging { val stmt = insertStatement(conn, table, rddSchema, dialect) val setters: Array[JDBCValueSetter] = rddSchema.fields.map(_.dataType) .map(makeSetter(conn, dialect, _)).toArray + val numFields = rddSchema.fields.length try { var rowCount = 0 while (iterator.hasNext) { val row = iterator.next() - val numFields = rddSchema.fields.length var i = 0 while (i < numFields) { if (row.isNullAt(i)) { - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-17406][WEB UI] limit timeline executor events
Repository: spark Updated Branches: refs/heads/master 647ee05e5 -> ad79fc0a8 [SPARK-17406][WEB UI] limit timeline executor events ## What changes were proposed in this pull request? The job page will be too slow to open when there are thousands of executor events(added or removed). I found that in ExecutorsTab file, executorIdToData will not remove elements, it will increase all the time.Before this pr, it looks like [timeline1.png](https://issues.apache.org/jira/secure/attachment/12827112/timeline1.png). After this pr, it looks like [timeline2.png](https://issues.apache.org/jira/secure/attachment/12827113/timeline2.png)(we can set how many executor events will be displayed) Author: cenyuhaiCloses #14969 from cenyuhai/SPARK-17406. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ad79fc0a Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ad79fc0a Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ad79fc0a Branch: refs/heads/master Commit: ad79fc0a8407a950a03869f2f8cdc3ed0bf13875 Parents: 647ee05 Author: cenyuhai Authored: Thu Sep 15 09:58:53 2016 +0100 Committer: Sean Owen Committed: Thu Sep 15 09:58:53 2016 +0100 -- .../apache/spark/ui/exec/ExecutorsPage.scala| 41 +++ .../org/apache/spark/ui/exec/ExecutorsTab.scala | 112 +++ .../org/apache/spark/ui/jobs/AllJobsPage.scala | 66 +-- .../apache/spark/ui/jobs/ExecutorTable.scala| 3 +- .../org/apache/spark/ui/jobs/JobPage.scala | 67 ++- .../org/apache/spark/ui/jobs/StagePage.scala| 4 +- .../scala/org/apache/spark/ui/jobs/UIData.scala | 5 - project/MimaExcludes.scala | 12 ++ 8 files changed, 162 insertions(+), 148 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/ad79fc0a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala -- diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala index 982e891..7953d77 100644 --- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala @@ -17,14 +17,12 @@ package org.apache.spark.ui.exec -import java.net.URLEncoder import javax.servlet.http.HttpServletRequest import scala.xml.Node import org.apache.spark.status.api.v1.ExecutorSummary -import org.apache.spark.ui.{ToolTips, UIUtils, WebUIPage} -import org.apache.spark.util.Utils +import org.apache.spark.ui.{UIUtils, WebUIPage} // This isn't even used anymore -- but we need to keep it b/c of a MiMa false positive private[ui] case class ExecutorSummaryInfo( @@ -83,18 +81,7 @@ private[spark] object ExecutorsPage { val memUsed = status.memUsed val maxMem = status.maxMem val diskUsed = status.diskUsed -val totalCores = listener.executorToTotalCores.getOrElse(execId, 0) -val maxTasks = listener.executorToTasksMax.getOrElse(execId, 0) -val activeTasks = listener.executorToTasksActive.getOrElse(execId, 0) -val failedTasks = listener.executorToTasksFailed.getOrElse(execId, 0) -val completedTasks = listener.executorToTasksComplete.getOrElse(execId, 0) -val totalTasks = activeTasks + failedTasks + completedTasks -val totalDuration = listener.executorToDuration.getOrElse(execId, 0L) -val totalGCTime = listener.executorToJvmGCTime.getOrElse(execId, 0L) -val totalInputBytes = listener.executorToInputBytes.getOrElse(execId, 0L) -val totalShuffleRead = listener.executorToShuffleRead.getOrElse(execId, 0L) -val totalShuffleWrite = listener.executorToShuffleWrite.getOrElse(execId, 0L) -val executorLogs = listener.executorToLogUrls.getOrElse(execId, Map.empty) +val taskSummary = listener.executorToTaskSummary.getOrElse(execId, ExecutorTaskSummary(execId)) new ExecutorSummary( execId, @@ -103,19 +90,19 @@ private[spark] object ExecutorsPage { rddBlocks, memUsed, diskUsed, - totalCores, - maxTasks, - activeTasks, - failedTasks, - completedTasks, - totalTasks, - totalDuration, - totalGCTime, - totalInputBytes, - totalShuffleRead, - totalShuffleWrite, + taskSummary.totalCores, + taskSummary.tasksMax, + taskSummary.tasksActive, + taskSummary.tasksFailed, + taskSummary.tasksComplete, + taskSummary.tasksActive + taskSummary.tasksFailed + taskSummary.tasksComplete, + taskSummary.duration, + taskSummary.jvmGCTime, + taskSummary.inputBytes, + taskSummary.shuffleRead, + taskSummary.shuffleWrite, maxMem, -
spark git commit: [SPARK-17521] Error when I use sparkContext.makeRDD(Seq())
Repository: spark Updated Branches: refs/heads/master f893e2625 -> 647ee05e5 [SPARK-17521] Error when I use sparkContext.makeRDD(Seq()) ## What changes were proposed in this pull request? when i use sc.makeRDD below ``` val data3 = sc.makeRDD(Seq()) println(data3.partitions.length) ``` I got an error: Exception in thread "main" java.lang.IllegalArgumentException: Positive number of slices required We can fix this bug just modify the last line ,do a check of seq.size ``` def makeRDD[T: ClassTag](seq: Seq[(T, Seq[String])]): RDD[T] = withScope { assertNotStopped() val indexToPrefs = seq.zipWithIndex.map(t => (t._2, t._1._2)).toMap new ParallelCollectionRDD[T](this, seq.map(_._1), math.max(seq.size, defaultParallelism), indexToPrefs) } ``` ## How was this patch tested? manual tests (If this patch involves UI changes, please attach a screenshot; otherwise, remove this) Author: codlife <1004910...@qq.com> Author: codlifeCloses #15077 from codlife/master. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/647ee05e Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/647ee05e Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/647ee05e Branch: refs/heads/master Commit: 647ee05e5815bde361662a9286ac602c44b4d4e6 Parents: f893e26 Author: codlife <1004910...@qq.com> Authored: Thu Sep 15 09:38:13 2016 +0100 Committer: Sean Owen Committed: Thu Sep 15 09:38:13 2016 +0100 -- core/src/main/scala/org/apache/spark/SparkContext.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/647ee05e/core/src/main/scala/org/apache/spark/SparkContext.scala -- diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index e32e4aa..35b6334 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -795,7 +795,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli def makeRDD[T: ClassTag](seq: Seq[(T, Seq[String])]): RDD[T] = withScope { assertNotStopped() val indexToPrefs = seq.zipWithIndex.map(t => (t._2, t._1._2)).toMap -new ParallelCollectionRDD[T](this, seq.map(_._1), seq.size, indexToPrefs) +new ParallelCollectionRDD[T](this, seq.map(_._1), math.max(seq.size, 1), indexToPrefs) } /** - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-17521] Error when I use sparkContext.makeRDD(Seq())
Repository: spark Updated Branches: refs/heads/branch-2.0 bb2bdb440 -> 5c2bc8360 [SPARK-17521] Error when I use sparkContext.makeRDD(Seq()) ## What changes were proposed in this pull request? when i use sc.makeRDD below ``` val data3 = sc.makeRDD(Seq()) println(data3.partitions.length) ``` I got an error: Exception in thread "main" java.lang.IllegalArgumentException: Positive number of slices required We can fix this bug just modify the last line ,do a check of seq.size ``` def makeRDD[T: ClassTag](seq: Seq[(T, Seq[String])]): RDD[T] = withScope { assertNotStopped() val indexToPrefs = seq.zipWithIndex.map(t => (t._2, t._1._2)).toMap new ParallelCollectionRDD[T](this, seq.map(_._1), math.max(seq.size, defaultParallelism), indexToPrefs) } ``` ## How was this patch tested? manual tests (If this patch involves UI changes, please attach a screenshot; otherwise, remove this) Author: codlife <1004910...@qq.com> Author: codlifeCloses #15077 from codlife/master. (cherry picked from commit 647ee05e5815bde361662a9286ac602c44b4d4e6) Signed-off-by: Sean Owen Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5c2bc836 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5c2bc836 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5c2bc836 Branch: refs/heads/branch-2.0 Commit: 5c2bc8360019fb08e2e62e50bb261f7ce19b231e Parents: bb2bdb4 Author: codlife <1004910...@qq.com> Authored: Thu Sep 15 09:38:13 2016 +0100 Committer: Sean Owen Committed: Thu Sep 15 09:38:22 2016 +0100 -- core/src/main/scala/org/apache/spark/SparkContext.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/5c2bc836/core/src/main/scala/org/apache/spark/SparkContext.scala -- diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 71511b8..214758f 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -788,7 +788,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli def makeRDD[T: ClassTag](seq: Seq[(T, Seq[String])]): RDD[T] = withScope { assertNotStopped() val indexToPrefs = seq.zipWithIndex.map(t => (t._2, t._1._2)).toMap -new ParallelCollectionRDD[T](this, seq.map(_._1), seq.size, indexToPrefs) +new ParallelCollectionRDD[T](this, seq.map(_._1), math.max(seq.size, 1), indexToPrefs) } /** - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-17524][TESTS] Use specified spark.buffer.pageSize
Repository: spark Updated Branches: refs/heads/master d15b4f90e -> f893e2625 [SPARK-17524][TESTS] Use specified spark.buffer.pageSize ## What changes were proposed in this pull request? This PR has the appendRowUntilExceedingPageSize test in RowBasedKeyValueBatchSuite use whatever spark.buffer.pageSize value a user has specified to prevent a test failure for anyone testing Apache Spark on a box with a reduced page size. The test is currently hardcoded to use the default page size which is 64 MB so this minor PR is a test improvement ## How was this patch tested? Existing unit tests with 1 MB page size and with 64 MB (the default) page size Author: Adam RobertsCloses #15079 from a-roberts/patch-5. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f893e262 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f893e262 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f893e262 Branch: refs/heads/master Commit: f893e262500e2f183de88e984300dd5b085e1f71 Parents: d15b4f9 Author: Adam Roberts Authored: Thu Sep 15 09:37:12 2016 +0100 Committer: Sean Owen Committed: Thu Sep 15 09:37:12 2016 +0100 -- .../sql/catalyst/expressions/RowBasedKeyValueBatchSuite.java | 6 -- 1 file changed, 4 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/f893e262/sql/catalyst/src/test/java/org/apache/spark/sql/catalyst/expressions/RowBasedKeyValueBatchSuite.java -- diff --git a/sql/catalyst/src/test/java/org/apache/spark/sql/catalyst/expressions/RowBasedKeyValueBatchSuite.java b/sql/catalyst/src/test/java/org/apache/spark/sql/catalyst/expressions/RowBasedKeyValueBatchSuite.java index 0dd129c..fb3dbe8 100644 --- a/sql/catalyst/src/test/java/org/apache/spark/sql/catalyst/expressions/RowBasedKeyValueBatchSuite.java +++ b/sql/catalyst/src/test/java/org/apache/spark/sql/catalyst/expressions/RowBasedKeyValueBatchSuite.java @@ -338,15 +338,17 @@ public class RowBasedKeyValueBatchSuite { @Test public void appendRowUntilExceedingPageSize() throws Exception { +// Use default size or spark.buffer.pageSize if specified +int pageSizeToUse = (int) memoryManager.pageSizeBytes(); RowBasedKeyValueBatch batch = RowBasedKeyValueBatch.allocate(keySchema, -valueSchema, taskMemoryManager, 64 * 1024 * 1024); //enough capacity +valueSchema, taskMemoryManager, pageSizeToUse); //enough capacity try { UnsafeRow key = makeKeyRow(1, "A"); UnsafeRow value = makeValueRow(1, 1); int recordLength = 8 + key.getSizeInBytes() + value.getSizeInBytes() + 8; int totalSize = 4; int numRows = 0; - while (totalSize + recordLength < 64 * 1024 * 1024) { // default page size + while (totalSize + recordLength < pageSizeToUse) { appendRow(batch, key, value); totalSize += recordLength; numRows++; - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-17507][ML][MLLIB] check weight vector size in ANN
Repository: spark Updated Branches: refs/heads/master 6a6adb167 -> d15b4f90e [SPARK-17507][ML][MLLIB] check weight vector size in ANN ## What changes were proposed in this pull request? as the TODO described, check weight vector size and if wrong throw exception. ## How was this patch tested? existing tests. Author: WeichenXuCloses #15060 from WeichenXu123/check_input_weight_size_of_ann. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d15b4f90 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d15b4f90 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d15b4f90 Branch: refs/heads/master Commit: d15b4f90e64f7ec5cf14c7c57d2cb4234c3ce677 Parents: 6a6adb1 Author: WeichenXu Authored: Thu Sep 15 09:30:15 2016 +0100 Committer: Sean Owen Committed: Thu Sep 15 09:30:15 2016 +0100 -- mllib/src/main/scala/org/apache/spark/ml/ann/Layer.scala | 10 -- 1 file changed, 4 insertions(+), 6 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/d15b4f90/mllib/src/main/scala/org/apache/spark/ml/ann/Layer.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/ml/ann/Layer.scala b/mllib/src/main/scala/org/apache/spark/ml/ann/Layer.scala index 88909a9..e7e0dae 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/ann/Layer.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/ann/Layer.scala @@ -545,7 +545,9 @@ private[ann] object FeedForwardModel { * @return model */ def apply(topology: FeedForwardTopology, weights: Vector): FeedForwardModel = { -// TODO: check that weights size is equal to sum of layers sizes +val expectedWeightSize = topology.layers.map(_.weightSize).sum +require(weights.size == expectedWeightSize, + s"Expected weight vector of size ${expectedWeightSize} but got size ${weights.size}.") new FeedForwardModel(weights, topology) } @@ -559,11 +561,7 @@ private[ann] object FeedForwardModel { def apply(topology: FeedForwardTopology, seed: Long = 11L): FeedForwardModel = { val layers = topology.layers val layerModels = new Array[LayerModel](layers.length) -var totalSize = 0 -for (i <- 0 until topology.layers.length) { - totalSize += topology.layers(i).weightSize -} -val weights = BDV.zeros[Double](totalSize) +val weights = BDV.zeros[Double](topology.layers.map(_.weightSize).sum) var offset = 0 val random = new XORShiftRandom(seed) for (i <- 0 until layers.length) { - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-17440][SPARK-17441] Fixed Multiple Bugs in ALTER TABLE
Repository: spark Updated Branches: refs/heads/master bb3229436 -> 6a6adb167 [SPARK-17440][SPARK-17441] Fixed Multiple Bugs in ALTER TABLE ### What changes were proposed in this pull request? For the following `ALTER TABLE` DDL, we should issue an exception when the target table is a `VIEW`: ```SQL ALTER TABLE viewName SET LOCATION '/path/to/your/lovely/heart' ALTER TABLE viewName SET SERDE 'whatever' ALTER TABLE viewName SET SERDEPROPERTIES ('x' = 'y') ALTER TABLE viewName PARTITION (a=1, b=2) SET SERDEPROPERTIES ('x' = 'y') ALTER TABLE viewName ADD IF NOT EXISTS PARTITION (a='4', b='8') ALTER TABLE viewName DROP IF EXISTS PARTITION (a='2') ALTER TABLE viewName RECOVER PARTITIONS ALTER TABLE viewName PARTITION (a='1', b='q') RENAME TO PARTITION (a='100', b='p') ``` In addition, `ALTER TABLE RENAME PARTITION` is unable to handle data source tables, just like the other `ALTER PARTITION` commands. We should issue an exception instead. ### How was this patch tested? Added a few test cases. Author: gatorsmileCloses #15004 from gatorsmile/altertable. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6a6adb16 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6a6adb16 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6a6adb16 Branch: refs/heads/master Commit: 6a6adb1673775df63a62270879eac70f5f8d7d75 Parents: bb32294 Author: gatorsmile Authored: Thu Sep 15 14:43:10 2016 +0800 Committer: Wenchen Fan Committed: Thu Sep 15 14:43:10 2016 +0800 -- .../spark/sql/execution/command/ddl.scala | 45 + .../spark/sql/execution/command/tables.scala| 4 +- .../spark/sql/execution/command/DDLSuite.scala | 63 ++ .../spark/sql/hive/execution/HiveDDLSuite.scala | 67 +++- 4 files changed, 120 insertions(+), 59 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/6a6adb16/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala index dcda2f8..c0ccdca 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala @@ -230,8 +230,8 @@ case class AlterTableSetPropertiesCommand( override def run(sparkSession: SparkSession): Seq[Row] = { val catalog = sparkSession.sessionState.catalog -DDLUtils.verifyAlterTableType(catalog, tableName, isView) val table = catalog.getTableMetadata(tableName) +DDLUtils.verifyAlterTableType(catalog, table, isView) // This overrides old properties val newTable = table.copy(properties = table.properties ++ properties) catalog.alterTable(newTable) @@ -258,8 +258,8 @@ case class AlterTableUnsetPropertiesCommand( override def run(sparkSession: SparkSession): Seq[Row] = { val catalog = sparkSession.sessionState.catalog -DDLUtils.verifyAlterTableType(catalog, tableName, isView) val table = catalog.getTableMetadata(tableName) +DDLUtils.verifyAlterTableType(catalog, table, isView) if (!ifExists) { propKeys.foreach { k => if (!table.properties.contains(k)) { @@ -299,6 +299,7 @@ case class AlterTableSerDePropertiesCommand( override def run(sparkSession: SparkSession): Seq[Row] = { val catalog = sparkSession.sessionState.catalog val table = catalog.getTableMetadata(tableName) +DDLUtils.verifyAlterTableType(catalog, table, isView = false) // For datasource tables, disallow setting serde or specifying partition if (partSpec.isDefined && DDLUtils.isDatasourceTable(table)) { throw new AnalysisException("Operation not allowed: ALTER TABLE SET " + @@ -348,6 +349,7 @@ case class AlterTableAddPartitionCommand( override def run(sparkSession: SparkSession): Seq[Row] = { val catalog = sparkSession.sessionState.catalog val table = catalog.getTableMetadata(tableName) +DDLUtils.verifyAlterTableType(catalog, table, isView = false) if (DDLUtils.isDatasourceTable(table)) { throw new AnalysisException( "ALTER TABLE ADD PARTITION is not allowed for tables defined using the datasource API") @@ -377,7 +379,14 @@ case class AlterTableRenamePartitionCommand( extends RunnableCommand { override def run(sparkSession: SparkSession): Seq[Row] = { -sparkSession.sessionState.catalog.renamePartitions( +val catalog = sparkSession.sessionState.catalog +val table = catalog.getTableMetadata(tableName) +if (DDLUtils.isDatasourceTable(table)) { +