spark git commit: [SPARK-17458][SQL] Alias specified for aggregates in a pivot are not honored

2016-09-15 Thread hvanhovell
Repository: spark
Updated Branches:
  refs/heads/master 1202075c9 -> b72486f82


[SPARK-17458][SQL] Alias specified for aggregates in a pivot are not honored

## What changes were proposed in this pull request?

This change preserves aliases that are given for pivot aggregations

## How was this patch tested?

New unit test

Author: Andrew Ray 

Closes #15111 from aray/SPARK-17458.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b72486f8
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b72486f8
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b72486f8

Branch: refs/heads/master
Commit: b72486f82dd9920135442191be5d384028e7fb41
Parents: 1202075
Author: Andrew Ray 
Authored: Thu Sep 15 21:45:29 2016 +0200
Committer: Herman van Hovell 
Committed: Thu Sep 15 21:45:29 2016 +0200

--
 .../apache/spark/sql/catalyst/analysis/Analyzer.scala| 10 +-
 .../scala/org/apache/spark/sql/DataFramePivotSuite.scala | 11 +++
 2 files changed, 20 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/b72486f8/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 92bf8e0..5210f42 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -373,7 +373,15 @@ class Analyzer(
   case Pivot(groupByExprs, pivotColumn, pivotValues, aggregates, child) =>
 val singleAgg = aggregates.size == 1
 def outputName(value: Literal, aggregate: Expression): String = {
-  if (singleAgg) value.toString else value + "_" + aggregate.sql
+  if (singleAgg) {
+value.toString
+  } else {
+val suffix = aggregate match {
+  case n: NamedExpression => n.name
+  case _ => aggregate.sql
+}
+value + "_" + suffix
+  }
 }
 if (aggregates.forall(a => PivotFirst.supportsDataType(a.dataType))) {
   // Since evaluating |pivotValues| if statements for each input row 
can get slow this is an

http://git-wip-us.apache.org/repos/asf/spark/blob/b72486f8/sql/core/src/test/scala/org/apache/spark/sql/DataFramePivotSuite.scala
--
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/DataFramePivotSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/DataFramePivotSuite.scala
index d5cb5e1..1bbe135 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFramePivotSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFramePivotSuite.scala
@@ -197,4 +197,15 @@ class DataFramePivotSuite extends QueryTest with 
SharedSQLContext{
 Row(2013, Seq(48000.0, 7.0), Seq(3.0, 7.0)) :: Nil
 )
   }
+
+  test("pivot preserves aliases if given") {
+assertResult(
+  Array("year", "dotNET_foo", "dotNET_avg(`earnings`)", "Java_foo", 
"Java_avg(`earnings`)")
+)(
+  courseSales.groupBy($"year")
+.pivot("course", Seq("dotNET", "Java"))
+.agg(sum($"earnings").as("foo"), avg($"earnings")).columns
+)
+  }
+
 }


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-17484] Prevent invalid block locations from being reported after put() exceptions

2016-09-15 Thread joshrosen
Repository: spark
Updated Branches:
  refs/heads/branch-2.0 0169c2edc -> 9c23f4408


[SPARK-17484] Prevent invalid block locations from being reported after put() 
exceptions

## What changes were proposed in this pull request?

If a BlockManager `put()` call failed after the BlockManagerMaster was notified 
of a block's availability then incomplete cleanup logic in a `finally` block 
would never send a second block status method to inform the master of the 
block's unavailability. This, in turn, leads to fetch failures and used to be 
capable of causing complete job failures before #15037 was fixed.

This patch addresses this issue via multiple small changes:

- The `finally` block now calls `removeBlockInternal` when cleaning up from a 
failed `put()`; in addition to removing the `BlockInfo` entry (which was _all_ 
that the old cleanup logic did), this code (redundantly) tries to remove the 
block from the memory and disk stores (as an added layer of defense against 
bugs lower down in the stack) and optionally notifies the master of block 
removal (which now happens during exception-triggered cleanup).
- When a BlockManager receives a request for a block that it does not have it 
will now notify the master to update its block locations. This ensures that bad 
metadata pointing to non-existent blocks will eventually be fixed. Note that I 
could have implemented this logic in the block manager client (rather than in 
the remote server), but that would introduce the problem of distinguishing 
between transient and permanent failures; on the server, however, we know 
definitively that the block isn't present.
- Catch `NonFatal` instead of `Exception` to avoid swallowing 
`InterruptedException`s thrown from synchronous block replication calls.

This patch depends upon the refactorings in #15036, so that other patch will 
also have to be backported when backporting this fix.

For more background on this issue, including example logs from a real 
production failure, see 
[SPARK-17484](https://issues.apache.org/jira/browse/SPARK-17484).

## How was this patch tested?

Two new regression tests in BlockManagerSuite.

Author: Josh Rosen 

Closes #15085 from JoshRosen/SPARK-17484.

(cherry picked from commit 1202075c95eabba0ffebc170077df798f271a139)
Signed-off-by: Josh Rosen 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9c23f440
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9c23f440
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9c23f440

Branch: refs/heads/branch-2.0
Commit: 9c23f4408d337f4af31ebfbcc78767df67d36aed
Parents: 0169c2e
Author: Josh Rosen 
Authored: Thu Sep 15 11:54:17 2016 -0700
Committer: Josh Rosen 
Committed: Thu Sep 15 11:54:39 2016 -0700

--
 .../org/apache/spark/storage/BlockManager.scala | 37 +++-
 .../spark/storage/BlockManagerSuite.scala   | 34 ++
 2 files changed, 63 insertions(+), 8 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/9c23f440/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
--
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index 48db97a..37dfbd6 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -279,7 +279,12 @@ private[spark] class BlockManager(
 } else {
   getLocalBytes(blockId) match {
 case Some(buffer) => new BlockManagerManagedBuffer(blockInfoManager, 
blockId, buffer)
-case None => throw new BlockNotFoundException(blockId.toString)
+case None =>
+  // If this block manager receives a request for a block that it 
doesn't have then it's
+  // likely that the master has outdated block statuses for this 
block. Therefore, we send
+  // an RPC so that this block is marked as being unavailable from 
this block manager.
+  reportBlockStatus(blockId, BlockStatus.empty)
+  throw new BlockNotFoundException(blockId.toString)
   }
 }
   }
@@ -856,22 +861,38 @@ private[spark] class BlockManager(
 }
 
 val startTimeMs = System.currentTimeMillis
-var blockWasSuccessfullyStored: Boolean = false
+var exceptionWasThrown: Boolean = true
 val result: Option[T] = try {
   val res = putBody(putBlockInfo)
-  blockWasSuccessfullyStored = res.isEmpty
-  res
-} finally {
-  if (blockWasSuccessfullyStored) {
+  exceptionWasThrown = false
+  if (res.isEmpty) {
+// the block was successfully stored

spark git commit: [SPARK-17484] Prevent invalid block locations from being reported after put() exceptions

2016-09-15 Thread joshrosen
Repository: spark
Updated Branches:
  refs/heads/master a6b818200 -> 1202075c9


[SPARK-17484] Prevent invalid block locations from being reported after put() 
exceptions

## What changes were proposed in this pull request?

If a BlockManager `put()` call failed after the BlockManagerMaster was notified 
of a block's availability then incomplete cleanup logic in a `finally` block 
would never send a second block status method to inform the master of the 
block's unavailability. This, in turn, leads to fetch failures and used to be 
capable of causing complete job failures before #15037 was fixed.

This patch addresses this issue via multiple small changes:

- The `finally` block now calls `removeBlockInternal` when cleaning up from a 
failed `put()`; in addition to removing the `BlockInfo` entry (which was _all_ 
that the old cleanup logic did), this code (redundantly) tries to remove the 
block from the memory and disk stores (as an added layer of defense against 
bugs lower down in the stack) and optionally notifies the master of block 
removal (which now happens during exception-triggered cleanup).
- When a BlockManager receives a request for a block that it does not have it 
will now notify the master to update its block locations. This ensures that bad 
metadata pointing to non-existent blocks will eventually be fixed. Note that I 
could have implemented this logic in the block manager client (rather than in 
the remote server), but that would introduce the problem of distinguishing 
between transient and permanent failures; on the server, however, we know 
definitively that the block isn't present.
- Catch `NonFatal` instead of `Exception` to avoid swallowing 
`InterruptedException`s thrown from synchronous block replication calls.

This patch depends upon the refactorings in #15036, so that other patch will 
also have to be backported when backporting this fix.

For more background on this issue, including example logs from a real 
production failure, see 
[SPARK-17484](https://issues.apache.org/jira/browse/SPARK-17484).

## How was this patch tested?

Two new regression tests in BlockManagerSuite.

Author: Josh Rosen 

Closes #15085 from JoshRosen/SPARK-17484.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1202075c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1202075c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1202075c

Branch: refs/heads/master
Commit: 1202075c95eabba0ffebc170077df798f271a139
Parents: a6b8182
Author: Josh Rosen 
Authored: Thu Sep 15 11:54:17 2016 -0700
Committer: Josh Rosen 
Committed: Thu Sep 15 11:54:17 2016 -0700

--
 .../org/apache/spark/storage/BlockManager.scala | 37 +++-
 .../spark/storage/BlockManagerSuite.scala   | 34 ++
 2 files changed, 63 insertions(+), 8 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/1202075c/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
--
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index c172ac2..aa29acf 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -283,7 +283,12 @@ private[spark] class BlockManager(
 } else {
   getLocalBytes(blockId) match {
 case Some(buffer) => new BlockManagerManagedBuffer(blockInfoManager, 
blockId, buffer)
-case None => throw new BlockNotFoundException(blockId.toString)
+case None =>
+  // If this block manager receives a request for a block that it 
doesn't have then it's
+  // likely that the master has outdated block statuses for this 
block. Therefore, we send
+  // an RPC so that this block is marked as being unavailable from 
this block manager.
+  reportBlockStatus(blockId, BlockStatus.empty)
+  throw new BlockNotFoundException(blockId.toString)
   }
 }
   }
@@ -859,22 +864,38 @@ private[spark] class BlockManager(
 }
 
 val startTimeMs = System.currentTimeMillis
-var blockWasSuccessfullyStored: Boolean = false
+var exceptionWasThrown: Boolean = true
 val result: Option[T] = try {
   val res = putBody(putBlockInfo)
-  blockWasSuccessfullyStored = res.isEmpty
-  res
-} finally {
-  if (blockWasSuccessfullyStored) {
+  exceptionWasThrown = false
+  if (res.isEmpty) {
+// the block was successfully stored
 if (keepReadLock) {
   blockInfoManager.downgradeLock(blockId)
 } else {
   

spark git commit: [SPARK-17364][SQL] Antlr lexer wrongly treats full qualified identifier as a decimal number token when parsing SQL string

2016-09-15 Thread hvanhovell
Repository: spark
Updated Branches:
  refs/heads/branch-2.0 abb89c42e -> 0169c2edc


[SPARK-17364][SQL] Antlr lexer wrongly treats full qualified identifier as a 
decimal number token when parsing SQL string

## What changes were proposed in this pull request?

The Antlr lexer we use to tokenize a SQL string may wrongly tokenize a fully 
qualified identifier as a decimal number token. For example, table identifier 
`default.123_table` is wrongly tokenized as
```
default // Matches lexer rule IDENTIFIER
.123 // Matches lexer rule DECIMAL_VALUE
_TABLE // Matches lexer rule IDENTIFIER
```

The correct tokenization for `default.123_table` should be:
```
default // Matches lexer rule IDENTIFIER,
. // Matches a single dot
123_TABLE // Matches lexer rule IDENTIFIER
```

This PR fix the Antlr grammar so that it can tokenize fully qualified 
identifier correctly:
1. Fully qualified table name can be parsed correctly. For example, `select * 
from database.123_suffix`.
2. Fully qualified column name can be parsed correctly, for example `select 
a.123_suffix from a`.

### Before change

 Case 1: Failed to parse fully qualified column name

```
scala> spark.sql("select a.123_column from a").show
org.apache.spark.sql.catalyst.parser.ParseException:
extraneous input '.123' expecting {,
...
, IDENTIFIER, BACKQUOTED_IDENTIFIER}(line 1, pos 8)
== SQL ==
select a.123_column from a
^^^
```

 Case 2: Failed to parse fully qualified table name
```
scala> spark.sql("select * from default.123_table")
org.apache.spark.sql.catalyst.parser.ParseException:
extraneous input '.123' expecting {,
...
IDENTIFIER, BACKQUOTED_IDENTIFIER}(line 1, pos 21)

== SQL ==
select * from default.123_table
-^^^
```

### After Change

 Case 1: fully qualified column name, no ParseException thrown
```
scala> spark.sql("select a.123_column from a").show
```

 Case 2: fully qualified table name, no ParseException thrown
```
scala> spark.sql("select * from default.123_table")
```

## How was this patch tested?

Unit test.

Author: Sean Zhong 

Closes #15006 from clockfly/SPARK-17364.

(cherry picked from commit a6b8182006d0c3dda67c06861067ca78383ecf1b)
Signed-off-by: Herman van Hovell 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0169c2ed
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0169c2ed
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0169c2ed

Branch: refs/heads/branch-2.0
Commit: 0169c2edc35ee918b2972f2f4d4e112ccbdcb0c1
Parents: abb89c4
Author: Sean Zhong 
Authored: Thu Sep 15 20:53:48 2016 +0200
Committer: Herman van Hovell 
Committed: Thu Sep 15 20:54:01 2016 +0200

--
 .../apache/spark/sql/catalyst/parser/SqlBase.g4 | 44 
 .../catalyst/parser/ExpressionParserSuite.scala | 15 ++-
 .../parser/TableIdentifierParserSuite.scala | 13 ++
 3 files changed, 63 insertions(+), 9 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/0169c2ed/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
--
diff --git 
a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 
b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
index 4b47fa3..8b72140 100644
--- 
a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
+++ 
b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
@@ -16,6 +16,30 @@
 
 grammar SqlBase;
 
+@members {
+  /**
+   * Verify whether current token is a valid decimal token (which contains 
dot).
+   * Returns true if the character that follows the token is not a digit or 
letter or underscore.
+   *
+   * For example:
+   * For char stream "2.3", "2." is not a valid decimal token, because it is 
followed by digit '3'.
+   * For char stream "2.3_", "2.3" is not a valid decimal token, because it is 
followed by '_'.
+   * For char stream "2.3W", "2.3" is not a valid decimal token, because it is 
followed by 'W'.
+   * For char stream "12.0D 34.E2+0.12 "  12.0D is a valid decimal token 
because it is folllowed
+   * by a space. 34.E2 is a valid decimal token because it is followed by 
symbol '+'
+   * which is not a digit or letter or underscore.
+   */
+  public boolean isValidDecimal() {
+int nextChar = _input.LA(1);
+if (nextChar >= 'A' && nextChar <= 'Z' || nextChar >= '0' && nextChar <= 
'9' ||
+  nextChar == '_') {
+  return false;
+} else {
+  return true;
+}
+  }
+}
+
 tokens {
 DELIMITER
 }
@@ -908,23 +932,22 @@ INTEGER_VALUE
 ;
 
 DECIMAL_VALUE
-: DIGIT+ '.' DIGIT*
-| '.' DIGIT+
+: 

spark git commit: [SPARK-17364][SQL] Antlr lexer wrongly treats full qualified identifier as a decimal number token when parsing SQL string

2016-09-15 Thread hvanhovell
Repository: spark
Updated Branches:
  refs/heads/master fe767395f -> a6b818200


[SPARK-17364][SQL] Antlr lexer wrongly treats full qualified identifier as a 
decimal number token when parsing SQL string

## What changes were proposed in this pull request?

The Antlr lexer we use to tokenize a SQL string may wrongly tokenize a fully 
qualified identifier as a decimal number token. For example, table identifier 
`default.123_table` is wrongly tokenized as
```
default // Matches lexer rule IDENTIFIER
.123 // Matches lexer rule DECIMAL_VALUE
_TABLE // Matches lexer rule IDENTIFIER
```

The correct tokenization for `default.123_table` should be:
```
default // Matches lexer rule IDENTIFIER,
. // Matches a single dot
123_TABLE // Matches lexer rule IDENTIFIER
```

This PR fix the Antlr grammar so that it can tokenize fully qualified 
identifier correctly:
1. Fully qualified table name can be parsed correctly. For example, `select * 
from database.123_suffix`.
2. Fully qualified column name can be parsed correctly, for example `select 
a.123_suffix from a`.

### Before change

 Case 1: Failed to parse fully qualified column name

```
scala> spark.sql("select a.123_column from a").show
org.apache.spark.sql.catalyst.parser.ParseException:
extraneous input '.123' expecting {,
...
, IDENTIFIER, BACKQUOTED_IDENTIFIER}(line 1, pos 8)
== SQL ==
select a.123_column from a
^^^
```

 Case 2: Failed to parse fully qualified table name
```
scala> spark.sql("select * from default.123_table")
org.apache.spark.sql.catalyst.parser.ParseException:
extraneous input '.123' expecting {,
...
IDENTIFIER, BACKQUOTED_IDENTIFIER}(line 1, pos 21)

== SQL ==
select * from default.123_table
-^^^
```

### After Change

 Case 1: fully qualified column name, no ParseException thrown
```
scala> spark.sql("select a.123_column from a").show
```

 Case 2: fully qualified table name, no ParseException thrown
```
scala> spark.sql("select * from default.123_table")
```

## How was this patch tested?

Unit test.

Author: Sean Zhong 

Closes #15006 from clockfly/SPARK-17364.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a6b81820
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a6b81820
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a6b81820

Branch: refs/heads/master
Commit: a6b8182006d0c3dda67c06861067ca78383ecf1b
Parents: fe76739
Author: Sean Zhong 
Authored: Thu Sep 15 20:53:48 2016 +0200
Committer: Herman van Hovell 
Committed: Thu Sep 15 20:53:48 2016 +0200

--
 .../apache/spark/sql/catalyst/parser/SqlBase.g4 | 44 
 .../catalyst/parser/ExpressionParserSuite.scala | 15 ++-
 .../parser/TableIdentifierParserSuite.scala | 13 ++
 3 files changed, 63 insertions(+), 9 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/a6b81820/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
--
diff --git 
a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 
b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
index b475abd..7023c0c 100644
--- 
a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
+++ 
b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
@@ -16,6 +16,30 @@
 
 grammar SqlBase;
 
+@members {
+  /**
+   * Verify whether current token is a valid decimal token (which contains 
dot).
+   * Returns true if the character that follows the token is not a digit or 
letter or underscore.
+   *
+   * For example:
+   * For char stream "2.3", "2." is not a valid decimal token, because it is 
followed by digit '3'.
+   * For char stream "2.3_", "2.3" is not a valid decimal token, because it is 
followed by '_'.
+   * For char stream "2.3W", "2.3" is not a valid decimal token, because it is 
followed by 'W'.
+   * For char stream "12.0D 34.E2+0.12 "  12.0D is a valid decimal token 
because it is folllowed
+   * by a space. 34.E2 is a valid decimal token because it is followed by 
symbol '+'
+   * which is not a digit or letter or underscore.
+   */
+  public boolean isValidDecimal() {
+int nextChar = _input.LA(1);
+if (nextChar >= 'A' && nextChar <= 'Z' || nextChar >= '0' && nextChar <= 
'9' ||
+  nextChar == '_') {
+  return false;
+} else {
+  return true;
+}
+  }
+}
+
 tokens {
 DELIMITER
 }
@@ -920,23 +944,22 @@ INTEGER_VALUE
 ;
 
 DECIMAL_VALUE
-: DIGIT+ '.' DIGIT*
-| '.' DIGIT+
+: DECIMAL_DIGITS {isValidDecimal()}?
 ;
 
 SCIENTIFIC_DECIMAL_VALUE
-: DIGIT+ ('.' DIGIT*)? EXPONENT
-| '.' DIGIT+ EXPONENT
+: DIGIT+ 

spark git commit: [SPARK-17429][SQL] use ImplicitCastInputTypes with function Length

2016-09-15 Thread hvanhovell
Repository: spark
Updated Branches:
  refs/heads/master d403562eb -> fe767395f


[SPARK-17429][SQL] use ImplicitCastInputTypes with function Length

## What changes were proposed in this pull request?
select length(11);
select length(2.0);
these sql will return errors, but hive is ok.
this PR will support casting input types implicitly for function length
the correct result is:
select length(11) return 2
select length(2.0) return 3

Author: 岑玉海 <261810...@qq.com>
Author: cenyuhai 

Closes #15014 from cenyuhai/SPARK-17429.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fe767395
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fe767395
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fe767395

Branch: refs/heads/master
Commit: fe767395ff46ee6236cf53aece85fcd61c0b49d3
Parents: d403562
Author: 岑玉海 <261810...@qq.com>
Authored: Thu Sep 15 20:45:00 2016 +0200
Committer: Herman van Hovell 
Committed: Thu Sep 15 20:45:00 2016 +0200

--
 .../sql/catalyst/expressions/stringExpressions.scala  |  2 +-
 .../scala/org/apache/spark/sql/StringFunctionsSuite.scala | 10 ++
 2 files changed, 7 insertions(+), 5 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/fe767395/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala
index a8c23a8..1bcbb6c 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala
@@ -1057,7 +1057,7 @@ case class Substring(str: Expression, pos: Expression, 
len: Expression)
 @ExpressionDescription(
   usage = "_FUNC_(str | binary) - Returns the length of str or number of bytes 
in binary data.",
   extended = "> SELECT _FUNC_('Spark SQL');\n 9")
-case class Length(child: Expression) extends UnaryExpression with 
ExpectsInputTypes {
+case class Length(child: Expression) extends UnaryExpression with 
ImplicitCastInputTypes {
   override def dataType: DataType = IntegerType
   override def inputTypes: Seq[AbstractDataType] = 
Seq(TypeCollection(StringType, BinaryType))
 

http://git-wip-us.apache.org/repos/asf/spark/blob/fe767395/sql/core/src/test/scala/org/apache/spark/sql/StringFunctionsSuite.scala
--
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/StringFunctionsSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/StringFunctionsSuite.scala
index 1cc7746..bcc2351 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/StringFunctionsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/StringFunctionsSuite.scala
@@ -330,7 +330,8 @@ class StringFunctionsSuite extends QueryTest with 
SharedSQLContext {
   }
 
   test("string / binary length function") {
-val df = Seq(("123", Array[Byte](1, 2, 3, 4), 123)).toDF("a", "b", "c")
+val df = Seq(("123", Array[Byte](1, 2, 3, 4), 123, 2.0f, 3.015))
+  .toDF("a", "b", "c", "d", "e")
 checkAnswer(
   df.select(length($"a"), length($"b")),
   Row(3, 4))
@@ -339,9 +340,10 @@ class StringFunctionsSuite extends QueryTest with 
SharedSQLContext {
   df.selectExpr("length(a)", "length(b)"),
   Row(3, 4))
 
-intercept[AnalysisException] {
-  df.selectExpr("length(c)") // int type of the argument is unacceptable
-}
+checkAnswer(
+  df.selectExpr("length(c)", "length(d)", "length(e)"),
+  Row(3, 3, 5)
+)
   }
 
   test("initcap function") {


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-17114][SQL] Fix aggregates grouped by literals with empty input

2016-09-15 Thread hvanhovell
Repository: spark
Updated Branches:
  refs/heads/branch-2.0 e77a437d2 -> 62ab53658


[SPARK-17114][SQL] Fix aggregates grouped by literals with empty input

## What changes were proposed in this pull request?
This PR fixes an issue with aggregates that have an empty input, and use a 
literals as their grouping keys. These aggregates are currently interpreted as 
aggregates **without** grouping keys, this triggers the ungrouped code path 
(which aways returns a single row).

This PR fixes the `RemoveLiteralFromGroupExpressions` optimizer rule, which 
changes the semantics of the Aggregate by eliminating all literal grouping keys.

## How was this patch tested?
Added tests to `SQLQueryTestSuite`.

Author: Herman van Hovell 

Closes #15101 from hvanhovell/SPARK-17114-3.

(cherry picked from commit d403562eb4b5b1d804909861d3e8b75d8f6323b9)
Signed-off-by: Herman van Hovell 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/62ab5365
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/62ab5365
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/62ab5365

Branch: refs/heads/branch-2.0
Commit: 62ab536588e19293a84004f547ebc316346b869e
Parents: e77a437
Author: Herman van Hovell 
Authored: Thu Sep 15 20:24:15 2016 +0200
Committer: Herman van Hovell 
Committed: Thu Sep 15 20:24:29 2016 +0200

--
 .../sql/catalyst/optimizer/Optimizer.scala  | 11 -
 .../optimizer/AggregateOptimizeSuite.scala  | 10 +++-
 .../resources/sql-tests/inputs/group-by.sql | 17 +++
 .../sql-tests/results/group-by.sql.out  | 51 
 4 files changed, 86 insertions(+), 3 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/62ab5365/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index e743898..d824c2e 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -1439,9 +1439,16 @@ object ReplaceExceptWithAntiJoin extends 
Rule[LogicalPlan] {
  */
 object RemoveLiteralFromGroupExpressions extends Rule[LogicalPlan] {
   def apply(plan: LogicalPlan): LogicalPlan = plan transform {
-case a @ Aggregate(grouping, _, _) =>
+case a @ Aggregate(grouping, _, _) if grouping.nonEmpty =>
   val newGrouping = grouping.filter(!_.foldable)
-  a.copy(groupingExpressions = newGrouping)
+  if (newGrouping.nonEmpty) {
+a.copy(groupingExpressions = newGrouping)
+  } else {
+// All grouping expressions are literals. We should not drop them all, 
because this can
+// change the return semantics when the input of the Aggregate is 
empty (SPARK-17114). We
+// instead replace this by single, easy to hash/sort, literal 
expression.
+a.copy(groupingExpressions = Seq(Literal(0, IntegerType)))
+  }
   }
 }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/62ab5365/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/AggregateOptimizeSuite.scala
--
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/AggregateOptimizeSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/AggregateOptimizeSuite.scala
index 4c26c18..aecf59a 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/AggregateOptimizeSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/AggregateOptimizeSuite.scala
@@ -28,7 +28,7 @@ import 
org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
 import org.apache.spark.sql.catalyst.rules.RuleExecutor
 
 class AggregateOptimizeSuite extends PlanTest {
-  val conf = new SimpleCatalystConf(caseSensitiveAnalysis = false)
+  val conf = SimpleCatalystConf(caseSensitiveAnalysis = false, groupByOrdinal 
= false)
   val catalog = new SessionCatalog(new InMemoryCatalog, EmptyFunctionRegistry, 
conf)
   val analyzer = new Analyzer(catalog, conf)
 
@@ -49,6 +49,14 @@ class AggregateOptimizeSuite extends PlanTest {
 comparePlans(optimized, correctAnswer)
   }
 
+  test("do not remove all grouping expressions if they are all literals") {
+val query = testRelation.groupBy(Literal("1"), Literal(1) + 
Literal(2))(sum('b))
+val optimized = Optimize.execute(analyzer.execute(query))
+val 

spark git commit: [SPARK-17547] Ensure temp shuffle data file is cleaned up after error

2016-09-15 Thread joshrosen
Repository: spark
Updated Branches:
  refs/heads/branch-1.6 a447cd888 -> 8646b84fb


[SPARK-17547] Ensure temp shuffle data file is cleaned up after error

SPARK-8029 (#9610) modified shuffle writers to first stage their data to a 
temporary file in the same directory as the final destination file and then to 
atomically rename this temporary file at the end of the write job. However, 
this change introduced the potential for the temporary output file to be leaked 
if an exception occurs during the write because the shuffle writers' existing 
error cleanup code doesn't handle deletion of the temp file.

This patch avoids this potential cause of disk-space leaks by adding `finally` 
blocks to ensure that temp files are always deleted if they haven't been 
renamed.

Author: Josh Rosen 

Closes #15104 from JoshRosen/cleanup-tmp-data-file-in-shuffle-writer.

(cherry picked from commit 5b8f7377d54f83b93ef2bfc2a01ca65fae6d3032)
Signed-off-by: Josh Rosen 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8646b84f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8646b84f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8646b84f

Branch: refs/heads/branch-1.6
Commit: 8646b84fb8ed319e3a998f93de4821c723f7d419
Parents: a447cd8
Author: Josh Rosen 
Authored: Thu Sep 15 11:22:58 2016 -0700
Committer: Josh Rosen 
Committed: Thu Sep 15 11:24:00 2016 -0700

--
 .../sort/BypassMergeSortShuffleWriter.java  | 10 ++-
 .../spark/shuffle/sort/UnsafeShuffleWriter.java | 18 +++--
 .../shuffle/IndexShuffleBlockResolver.scala | 80 +++-
 .../spark/shuffle/sort/SortShuffleWriter.scala  | 14 +++-
 4 files changed, 73 insertions(+), 49 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/8646b84f/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java
--
diff --git 
a/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java
 
b/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java
index a1a1fb0..80d24b9 100644
--- 
a/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java
+++ 
b/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java
@@ -157,8 +157,14 @@ final class BypassMergeSortShuffleWriter extends 
ShuffleWriter {
 
 File output = shuffleBlockResolver.getDataFile(shuffleId, mapId);
 File tmp = Utils.tempFileWith(output);
-partitionLengths = writePartitionedFile(tmp);
-shuffleBlockResolver.writeIndexFileAndCommit(shuffleId, mapId, 
partitionLengths, tmp);
+try {
+  partitionLengths = writePartitionedFile(tmp);
+  shuffleBlockResolver.writeIndexFileAndCommit(shuffleId, mapId, 
partitionLengths, tmp);
+} finally {
+  if (tmp.exists() && !tmp.delete()) {
+logger.error("Error while deleting temp file {}", 
tmp.getAbsolutePath());
+  }
+}
 mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), 
partitionLengths);
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/8646b84f/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java
--
diff --git 
a/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java 
b/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java
index 744c300..d5e16fc 100644
--- a/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java
+++ b/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java
@@ -209,15 +209,21 @@ public class UnsafeShuffleWriter extends 
ShuffleWriter {
 final File output = shuffleBlockResolver.getDataFile(shuffleId, mapId);
 final File tmp = Utils.tempFileWith(output);
 try {
-  partitionLengths = mergeSpills(spills, tmp);
-} finally {
-  for (SpillInfo spill : spills) {
-if (spill.file.exists() && ! spill.file.delete()) {
-  logger.error("Error while deleting spill file {}", 
spill.file.getPath());
+  try {
+partitionLengths = mergeSpills(spills, tmp);
+  } finally {
+for (SpillInfo spill : spills) {
+  if (spill.file.exists() && ! spill.file.delete()) {
+logger.error("Error while deleting spill file {}", 
spill.file.getPath());
+  }
 }
   }
+  shuffleBlockResolver.writeIndexFileAndCommit(shuffleId, mapId, 
partitionLengths, tmp);
+} finally {
+  if (tmp.exists() && !tmp.delete()) {
+logger.error("Error while deleting temp file {}", 
tmp.getAbsolutePath());
+  }
 

spark git commit: [SPARK-17114][SQL] Fix aggregates grouped by literals with empty input

2016-09-15 Thread hvanhovell
Repository: spark
Updated Branches:
  refs/heads/master 5b8f7377d -> d403562eb


[SPARK-17114][SQL] Fix aggregates grouped by literals with empty input

## What changes were proposed in this pull request?
This PR fixes an issue with aggregates that have an empty input, and use a 
literals as their grouping keys. These aggregates are currently interpreted as 
aggregates **without** grouping keys, this triggers the ungrouped code path 
(which aways returns a single row).

This PR fixes the `RemoveLiteralFromGroupExpressions` optimizer rule, which 
changes the semantics of the Aggregate by eliminating all literal grouping keys.

## How was this patch tested?
Added tests to `SQLQueryTestSuite`.

Author: Herman van Hovell 

Closes #15101 from hvanhovell/SPARK-17114-3.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d403562e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d403562e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d403562e

Branch: refs/heads/master
Commit: d403562eb4b5b1d804909861d3e8b75d8f6323b9
Parents: 5b8f737
Author: Herman van Hovell 
Authored: Thu Sep 15 20:24:15 2016 +0200
Committer: Herman van Hovell 
Committed: Thu Sep 15 20:24:15 2016 +0200

--
 .../sql/catalyst/optimizer/Optimizer.scala  | 11 -
 .../optimizer/AggregateOptimizeSuite.scala  | 10 +++-
 .../resources/sql-tests/inputs/group-by.sql | 17 +++
 .../sql-tests/results/group-by.sql.out  | 51 
 4 files changed, 86 insertions(+), 3 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/d403562e/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index d2f0c97..0df16b7 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -1098,9 +1098,16 @@ object ReplaceExceptWithAntiJoin extends 
Rule[LogicalPlan] {
  */
 object RemoveLiteralFromGroupExpressions extends Rule[LogicalPlan] {
   def apply(plan: LogicalPlan): LogicalPlan = plan transform {
-case a @ Aggregate(grouping, _, _) =>
+case a @ Aggregate(grouping, _, _) if grouping.nonEmpty =>
   val newGrouping = grouping.filter(!_.foldable)
-  a.copy(groupingExpressions = newGrouping)
+  if (newGrouping.nonEmpty) {
+a.copy(groupingExpressions = newGrouping)
+  } else {
+// All grouping expressions are literals. We should not drop them all, 
because this can
+// change the return semantics when the input of the Aggregate is 
empty (SPARK-17114). We
+// instead replace this by single, easy to hash/sort, literal 
expression.
+a.copy(groupingExpressions = Seq(Literal(0, IntegerType)))
+  }
   }
 }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/d403562e/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/AggregateOptimizeSuite.scala
--
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/AggregateOptimizeSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/AggregateOptimizeSuite.scala
index 4c26c18..aecf59a 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/AggregateOptimizeSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/AggregateOptimizeSuite.scala
@@ -28,7 +28,7 @@ import 
org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
 import org.apache.spark.sql.catalyst.rules.RuleExecutor
 
 class AggregateOptimizeSuite extends PlanTest {
-  val conf = new SimpleCatalystConf(caseSensitiveAnalysis = false)
+  val conf = SimpleCatalystConf(caseSensitiveAnalysis = false, groupByOrdinal 
= false)
   val catalog = new SessionCatalog(new InMemoryCatalog, EmptyFunctionRegistry, 
conf)
   val analyzer = new Analyzer(catalog, conf)
 
@@ -49,6 +49,14 @@ class AggregateOptimizeSuite extends PlanTest {
 comparePlans(optimized, correctAnswer)
   }
 
+  test("do not remove all grouping expressions if they are all literals") {
+val query = testRelation.groupBy(Literal("1"), Literal(1) + 
Literal(2))(sum('b))
+val optimized = Optimize.execute(analyzer.execute(query))
+val correctAnswer = 
analyzer.execute(testRelation.groupBy(Literal(0))(sum('b)))
+
+comparePlans(optimized, correctAnswer)
+  }
+
   test("Remove 

spark git commit: [SPARK-17547] Ensure temp shuffle data file is cleaned up after error

2016-09-15 Thread joshrosen
Repository: spark
Updated Branches:
  refs/heads/branch-2.0 a09c258c9 -> e77a437d2


[SPARK-17547] Ensure temp shuffle data file is cleaned up after error

SPARK-8029 (#9610) modified shuffle writers to first stage their data to a 
temporary file in the same directory as the final destination file and then to 
atomically rename this temporary file at the end of the write job. However, 
this change introduced the potential for the temporary output file to be leaked 
if an exception occurs during the write because the shuffle writers' existing 
error cleanup code doesn't handle deletion of the temp file.

This patch avoids this potential cause of disk-space leaks by adding `finally` 
blocks to ensure that temp files are always deleted if they haven't been 
renamed.

Author: Josh Rosen 

Closes #15104 from JoshRosen/cleanup-tmp-data-file-in-shuffle-writer.

(cherry picked from commit 5b8f7377d54f83b93ef2bfc2a01ca65fae6d3032)
Signed-off-by: Josh Rosen 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e77a437d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e77a437d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e77a437d

Branch: refs/heads/branch-2.0
Commit: e77a437d292ecda66163a895427d62e4f72e2a25
Parents: a09c258
Author: Josh Rosen 
Authored: Thu Sep 15 11:22:58 2016 -0700
Committer: Josh Rosen 
Committed: Thu Sep 15 11:23:17 2016 -0700

--
 .../sort/BypassMergeSortShuffleWriter.java  | 10 ++-
 .../spark/shuffle/sort/UnsafeShuffleWriter.java | 18 +++--
 .../shuffle/IndexShuffleBlockResolver.scala | 80 +++-
 .../spark/shuffle/sort/SortShuffleWriter.scala  | 14 +++-
 4 files changed, 73 insertions(+), 49 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/e77a437d/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java
--
diff --git 
a/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java
 
b/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java
index 0e9defe..601dd6e 100644
--- 
a/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java
+++ 
b/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java
@@ -156,8 +156,14 @@ final class BypassMergeSortShuffleWriter extends 
ShuffleWriter {
 
 File output = shuffleBlockResolver.getDataFile(shuffleId, mapId);
 File tmp = Utils.tempFileWith(output);
-partitionLengths = writePartitionedFile(tmp);
-shuffleBlockResolver.writeIndexFileAndCommit(shuffleId, mapId, 
partitionLengths, tmp);
+try {
+  partitionLengths = writePartitionedFile(tmp);
+  shuffleBlockResolver.writeIndexFileAndCommit(shuffleId, mapId, 
partitionLengths, tmp);
+} finally {
+  if (tmp.exists() && !tmp.delete()) {
+logger.error("Error while deleting temp file {}", 
tmp.getAbsolutePath());
+  }
+}
 mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), 
partitionLengths);
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/e77a437d/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java
--
diff --git 
a/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java 
b/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java
index 44e6aa7..c08a5d4 100644
--- a/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java
+++ b/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java
@@ -207,15 +207,21 @@ public class UnsafeShuffleWriter extends 
ShuffleWriter {
 final File output = shuffleBlockResolver.getDataFile(shuffleId, mapId);
 final File tmp = Utils.tempFileWith(output);
 try {
-  partitionLengths = mergeSpills(spills, tmp);
-} finally {
-  for (SpillInfo spill : spills) {
-if (spill.file.exists() && ! spill.file.delete()) {
-  logger.error("Error while deleting spill file {}", 
spill.file.getPath());
+  try {
+partitionLengths = mergeSpills(spills, tmp);
+  } finally {
+for (SpillInfo spill : spills) {
+  if (spill.file.exists() && ! spill.file.delete()) {
+logger.error("Error while deleting spill file {}", 
spill.file.getPath());
+  }
 }
   }
+  shuffleBlockResolver.writeIndexFileAndCommit(shuffleId, mapId, 
partitionLengths, tmp);
+} finally {
+  if (tmp.exists() && !tmp.delete()) {
+logger.error("Error while deleting temp file {}", 
tmp.getAbsolutePath());
+  }
 

spark git commit: [SPARK-17547] Ensure temp shuffle data file is cleaned up after error

2016-09-15 Thread joshrosen
Repository: spark
Updated Branches:
  refs/heads/master 0ad8eeb4d -> 5b8f7377d


[SPARK-17547] Ensure temp shuffle data file is cleaned up after error

SPARK-8029 (#9610) modified shuffle writers to first stage their data to a 
temporary file in the same directory as the final destination file and then to 
atomically rename this temporary file at the end of the write job. However, 
this change introduced the potential for the temporary output file to be leaked 
if an exception occurs during the write because the shuffle writers' existing 
error cleanup code doesn't handle deletion of the temp file.

This patch avoids this potential cause of disk-space leaks by adding `finally` 
blocks to ensure that temp files are always deleted if they haven't been 
renamed.

Author: Josh Rosen 

Closes #15104 from JoshRosen/cleanup-tmp-data-file-in-shuffle-writer.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5b8f7377
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5b8f7377
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5b8f7377

Branch: refs/heads/master
Commit: 5b8f7377d54f83b93ef2bfc2a01ca65fae6d3032
Parents: 0ad8eeb
Author: Josh Rosen 
Authored: Thu Sep 15 11:22:58 2016 -0700
Committer: Josh Rosen 
Committed: Thu Sep 15 11:22:58 2016 -0700

--
 .../sort/BypassMergeSortShuffleWriter.java  | 10 ++-
 .../spark/shuffle/sort/UnsafeShuffleWriter.java | 18 +++--
 .../shuffle/IndexShuffleBlockResolver.scala | 80 +++-
 .../spark/shuffle/sort/SortShuffleWriter.scala  | 14 +++-
 4 files changed, 73 insertions(+), 49 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/5b8f7377/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java
--
diff --git 
a/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java
 
b/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java
index 0fcc56d..4a15559 100644
--- 
a/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java
+++ 
b/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java
@@ -160,8 +160,14 @@ final class BypassMergeSortShuffleWriter extends 
ShuffleWriter {
 
 File output = shuffleBlockResolver.getDataFile(shuffleId, mapId);
 File tmp = Utils.tempFileWith(output);
-partitionLengths = writePartitionedFile(tmp);
-shuffleBlockResolver.writeIndexFileAndCommit(shuffleId, mapId, 
partitionLengths, tmp);
+try {
+  partitionLengths = writePartitionedFile(tmp);
+  shuffleBlockResolver.writeIndexFileAndCommit(shuffleId, mapId, 
partitionLengths, tmp);
+} finally {
+  if (tmp.exists() && !tmp.delete()) {
+logger.error("Error while deleting temp file {}", 
tmp.getAbsolutePath());
+  }
+}
 mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), 
partitionLengths);
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/5b8f7377/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java
--
diff --git 
a/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java 
b/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java
index 63d376b..f235c43 100644
--- a/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java
+++ b/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java
@@ -210,15 +210,21 @@ public class UnsafeShuffleWriter extends 
ShuffleWriter {
 final File output = shuffleBlockResolver.getDataFile(shuffleId, mapId);
 final File tmp = Utils.tempFileWith(output);
 try {
-  partitionLengths = mergeSpills(spills, tmp);
-} finally {
-  for (SpillInfo spill : spills) {
-if (spill.file.exists() && ! spill.file.delete()) {
-  logger.error("Error while deleting spill file {}", 
spill.file.getPath());
+  try {
+partitionLengths = mergeSpills(spills, tmp);
+  } finally {
+for (SpillInfo spill : spills) {
+  if (spill.file.exists() && ! spill.file.delete()) {
+logger.error("Error while deleting spill file {}", 
spill.file.getPath());
+  }
 }
   }
+  shuffleBlockResolver.writeIndexFileAndCommit(shuffleId, mapId, 
partitionLengths, tmp);
+} finally {
+  if (tmp.exists() && !tmp.delete()) {
+logger.error("Error while deleting temp file {}", 
tmp.getAbsolutePath());
+  }
 }
-shuffleBlockResolver.writeIndexFileAndCommit(shuffleId, mapId, 
partitionLengths, tmp);
 mapStatus = 

spark git commit: [SPARK-17379][BUILD] Upgrade netty-all to 4.0.41 final for bug fixes

2016-09-15 Thread zsxwing
Repository: spark
Updated Branches:
  refs/heads/master b47927814 -> 0ad8eeb4d


[SPARK-17379][BUILD] Upgrade netty-all to 4.0.41 final for bug fixes

## What changes were proposed in this pull request?
Upgrade netty-all to latest in the 4.0.x line which is 4.0.41, mentions several 
bug fixes and performance improvements we may find useful, see 
netty.io/news/2016/08/29/4-0-41-Final-4-1-5-Final.html. Initially tried to use 
4.1.5 but noticed it's not backwards compatible.

## How was this patch tested?
Existing unit tests against branch-1.6 and branch-2.0 using IBM Java 8 on 
Intel, Power and Z architectures

Author: Adam Roberts 

Closes #14961 from a-roberts/netty.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0ad8eeb4
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0ad8eeb4
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0ad8eeb4

Branch: refs/heads/master
Commit: 0ad8eeb4d365c2fff5715ec22fbcf4c69c3340fd
Parents: b479278
Author: Adam Roberts 
Authored: Thu Sep 15 10:40:10 2016 -0700
Committer: Shixiong Zhu 
Committed: Thu Sep 15 10:40:10 2016 -0700

--
 .../main/java/org/apache/spark/network/util/TransportConf.java  | 5 +
 dev/deps/spark-deps-hadoop-2.2  | 2 +-
 dev/deps/spark-deps-hadoop-2.3  | 2 +-
 dev/deps/spark-deps-hadoop-2.4  | 2 +-
 dev/deps/spark-deps-hadoop-2.6  | 2 +-
 dev/deps/spark-deps-hadoop-2.7  | 2 +-
 pom.xml | 2 +-
 7 files changed, 11 insertions(+), 6 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/0ad8eeb4/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java
--
diff --git 
a/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java
 
b/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java
index 0efc400..7d5baa9 100644
--- 
a/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java
+++ 
b/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java
@@ -23,6 +23,11 @@ import com.google.common.primitives.Ints;
  * A central location that tracks all the settings we expose to users.
  */
 public class TransportConf {
+  
+  static {
+// Set this due to Netty PR #5661 for Netty 4.0.37+ to work
+System.setProperty("io.netty.maxDirectMemory", "0");
+  }
 
   private final String SPARK_NETWORK_IO_MODE_KEY;
   private final String SPARK_NETWORK_IO_PREFERDIRECTBUFS_KEY;

http://git-wip-us.apache.org/repos/asf/spark/blob/0ad8eeb4/dev/deps/spark-deps-hadoop-2.2
--
diff --git a/dev/deps/spark-deps-hadoop-2.2 b/dev/deps/spark-deps-hadoop-2.2
index 81adde6..a7259e2 100644
--- a/dev/deps/spark-deps-hadoop-2.2
+++ b/dev/deps/spark-deps-hadoop-2.2
@@ -124,7 +124,7 @@ metrics-json-3.1.2.jar
 metrics-jvm-3.1.2.jar
 minlog-1.3.0.jar
 netty-3.8.0.Final.jar
-netty-all-4.0.29.Final.jar
+netty-all-4.0.41.Final.jar
 objenesis-2.1.jar
 opencsv-2.3.jar
 oro-2.0.8.jar

http://git-wip-us.apache.org/repos/asf/spark/blob/0ad8eeb4/dev/deps/spark-deps-hadoop-2.3
--
diff --git a/dev/deps/spark-deps-hadoop-2.3 b/dev/deps/spark-deps-hadoop-2.3
index 75ab628..6986ab5 100644
--- a/dev/deps/spark-deps-hadoop-2.3
+++ b/dev/deps/spark-deps-hadoop-2.3
@@ -131,7 +131,7 @@ metrics-jvm-3.1.2.jar
 minlog-1.3.0.jar
 mx4j-3.0.2.jar
 netty-3.8.0.Final.jar
-netty-all-4.0.29.Final.jar
+netty-all-4.0.41.Final.jar
 objenesis-2.1.jar
 opencsv-2.3.jar
 oro-2.0.8.jar

http://git-wip-us.apache.org/repos/asf/spark/blob/0ad8eeb4/dev/deps/spark-deps-hadoop-2.4
--
diff --git a/dev/deps/spark-deps-hadoop-2.4 b/dev/deps/spark-deps-hadoop-2.4
index 897d802..75cccb3 100644
--- a/dev/deps/spark-deps-hadoop-2.4
+++ b/dev/deps/spark-deps-hadoop-2.4
@@ -131,7 +131,7 @@ metrics-jvm-3.1.2.jar
 minlog-1.3.0.jar
 mx4j-3.0.2.jar
 netty-3.8.0.Final.jar
-netty-all-4.0.29.Final.jar
+netty-all-4.0.41.Final.jar
 objenesis-2.1.jar
 opencsv-2.3.jar
 oro-2.0.8.jar

http://git-wip-us.apache.org/repos/asf/spark/blob/0ad8eeb4/dev/deps/spark-deps-hadoop-2.6
--
diff --git a/dev/deps/spark-deps-hadoop-2.6 b/dev/deps/spark-deps-hadoop-2.6
index f95ddb1..ef7b8a7 100644
--- a/dev/deps/spark-deps-hadoop-2.6
+++ b/dev/deps/spark-deps-hadoop-2.6
@@ -139,7 +139,7 @@ 

spark git commit: [SPARK-17451][CORE] CoarseGrainedExecutorBackend should inform driver before self-kill

2016-09-15 Thread zsxwing
Repository: spark
Updated Branches:
  refs/heads/master 2ad276954 -> b47927814


[SPARK-17451][CORE] CoarseGrainedExecutorBackend should inform driver before 
self-kill

## What changes were proposed in this pull request?

Jira : https://issues.apache.org/jira/browse/SPARK-17451

`CoarseGrainedExecutorBackend` in some failure cases exits the JVM. While this 
does not have any issue, from the driver UI there is no specific reason 
captured for this. In this PR, I am adding functionality to `exitExecutor` to 
notify driver that the executor is exiting.

## How was this patch tested?

Ran the change over a test env and took down shuffle service before the 
executor could register to it. In the driver logs, where the job failure reason 
is mentioned (ie. `Job aborted due to stage ...` it gives the correct reason:

Before:
`ExecutorLostFailure (executor Z exited caused by one of the running 
tasks) Reason: Remote RPC client disassociated. Likely due to containers 
exceeding thresholds, or network issues. Check driver logs for WARN messages.`

After:
`ExecutorLostFailure (executor Z exited caused by one of the running 
tasks) Reason: Unable to create executor due to 
java.util.concurrent.TimeoutException: Timeout waiting for task.`

Author: Tejas Patil 

Closes #15013 from tejasapatil/SPARK-17451_inform_driver.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b4792781
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b4792781
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b4792781

Branch: refs/heads/master
Commit: b479278142728eb003b9ee466fab0e8d6ec4b13d
Parents: 2ad2769
Author: Tejas Patil 
Authored: Thu Sep 15 10:23:41 2016 -0700
Committer: Shixiong Zhu 
Committed: Thu Sep 15 10:23:41 2016 -0700

--
 .../executor/CoarseGrainedExecutorBackend.scala | 26 +++-
 .../org/apache/spark/storage/BlockManager.scala |  3 +++
 2 files changed, 23 insertions(+), 6 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/b4792781/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
 
b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
index 391b97d..7eec4ae 100644
--- 
a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
+++ 
b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
@@ -31,7 +31,7 @@ import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.deploy.worker.WorkerWatcher
 import org.apache.spark.internal.Logging
 import org.apache.spark.rpc._
-import org.apache.spark.scheduler.TaskDescription
+import org.apache.spark.scheduler.{ExecutorLossReason, TaskDescription}
 import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
 import org.apache.spark.serializer.SerializerInstance
 import org.apache.spark.util.{ThreadUtils, Utils}
@@ -65,7 +65,7 @@ private[spark] class CoarseGrainedExecutorBackend(
   case Success(msg) =>
 // Always receive `true`. Just ignore it
   case Failure(e) =>
-exitExecutor(1, s"Cannot register with driver: $driverUrl", e)
+exitExecutor(1, s"Cannot register with driver: $driverUrl", e, 
notifyDriver = false)
 }(ThreadUtils.sameThread)
   }
 
@@ -129,7 +129,8 @@ private[spark] class CoarseGrainedExecutorBackend(
 if (stopping.get()) {
   logInfo(s"Driver from $remoteAddress disconnected during shutdown")
 } else if (driver.exists(_.address == remoteAddress)) {
-  exitExecutor(1, s"Driver $remoteAddress disassociated! Shutting down.")
+  exitExecutor(1, s"Driver $remoteAddress disassociated! Shutting down.", 
null,
+notifyDriver = false)
 } else {
   logWarning(s"An unknown ($remoteAddress) driver disconnected.")
 }
@@ -148,12 +149,25 @@ private[spark] class CoarseGrainedExecutorBackend(
* executor exits differently. For e.g. when an executor goes down,
* back-end may not want to take the parent process down.
*/
-  protected def exitExecutor(code: Int, reason: String, throwable: Throwable = 
null) = {
+  protected def exitExecutor(code: Int,
+ reason: String,
+ throwable: Throwable = null,
+ notifyDriver: Boolean = true) = {
+val message = "Executor self-exiting due to : " + reason
 if (throwable != null) {
-  logError(reason, throwable)
+  logError(message, throwable)
 } else {
-  logError(reason)
+  logError(message)
 }
+
+if (notifyDriver && driver.nonEmpty) {
+ 

spark git commit: [SPARK-17317][SPARKR] Add SparkR vignette to branch 2.0

2016-09-15 Thread shivaram
Repository: spark
Updated Branches:
  refs/heads/branch-2.0 5c2bc8360 -> a09c258c9


[SPARK-17317][SPARKR] Add SparkR vignette to branch 2.0

## What changes were proposed in this pull request?

This PR adds SparkR vignette to branch 2.0, which works as a friendly guidance 
going through the functionality provided by SparkR.

## How was this patch tested?

R unit test.

Author: junyangq 
Author: Shivaram Venkataraman 
Author: Junyang Qian 

Closes #15100 from junyangq/SPARKR-vignette-2.0.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a09c258c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a09c258c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a09c258c

Branch: refs/heads/branch-2.0
Commit: a09c258c9a97e701fa7650cc0651e3c6a7a1cab9
Parents: 5c2bc83
Author: junyangq 
Authored: Thu Sep 15 10:00:36 2016 -0700
Committer: Shivaram Venkataraman 
Committed: Thu Sep 15 10:00:36 2016 -0700

--
 R/create-docs.sh |  11 +-
 R/pkg/vignettes/sparkr-vignettes.Rmd | 643 ++
 2 files changed, 652 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/a09c258c/R/create-docs.sh
--
diff --git a/R/create-docs.sh b/R/create-docs.sh
index d2ae160..0dfba22 100755
--- a/R/create-docs.sh
+++ b/R/create-docs.sh
@@ -17,11 +17,13 @@
 # limitations under the License.
 #
 
-# Script to create API docs for SparkR
-# This requires `devtools` and `knitr` to be installed on the machine.
+# Script to create API docs and vignettes for SparkR
+# This requires `devtools`, `knitr` and `rmarkdown` to be installed on the 
machine.
 
 # After running this script the html docs can be found in 
 # $SPARK_HOME/R/pkg/html
+# The vignettes can be found in
+# $SPARK_HOME/R/pkg/vignettes/sparkr_vignettes.html
 
 set -o pipefail
 set -e
@@ -43,4 +45,9 @@ Rscript -e 'libDir <- "../../lib"; library(SparkR, 
lib.loc=libDir); library(knit
 
 popd
 
+# render creates SparkR vignettes
+Rscript -e 'library(rmarkdown); paths <- .libPaths(); .libPaths(c("lib", 
paths)); Sys.setenv(SPARK_HOME=tools::file_path_as_absolute("..")); 
render("pkg/vignettes/sparkr-vignettes.Rmd"); .libPaths(paths)'
+
+find pkg/vignettes/. -not -name '.' -not -name '*.Rmd' -not -name '*.md' -not 
-name '*.pdf' -not -name '*.html' -delete
+
 popd

http://git-wip-us.apache.org/repos/asf/spark/blob/a09c258c/R/pkg/vignettes/sparkr-vignettes.Rmd
--
diff --git a/R/pkg/vignettes/sparkr-vignettes.Rmd 
b/R/pkg/vignettes/sparkr-vignettes.Rmd
new file mode 100644
index 000..5156c9e
--- /dev/null
+++ b/R/pkg/vignettes/sparkr-vignettes.Rmd
@@ -0,0 +1,643 @@
+---
+title: "SparkR - Practical Guide"
+output:
+  html_document:
+theme: united
+toc: true
+toc_depth: 4
+toc_float: true
+highlight: textmate
+---
+
+## Overview
+
+SparkR is an R package that provides a light-weight frontend to use Apache 
Spark from R. With Spark `r packageVersion("SparkR")`, SparkR provides a 
distributed data frame implementation that supports data processing operations 
like selection, filtering, aggregation etc. and distributed machine learning 
using [MLlib](http://spark.apache.org/mllib/).
+
+## Getting Started
+
+We begin with an example running on the local machine and provide an overview 
of the use of SparkR: data ingestion, data processing and machine learning.
+
+First, let's load and attach the package.
+```{r, message=FALSE}
+library(SparkR)
+```
+
+`SparkSession` is the entry point into SparkR which connects your R program to 
a Spark cluster. You can create a `SparkSession` using `sparkR.session` and 
pass in options such as the application name, any Spark packages depended on, 
etc.
+
+We use default settings in which it runs in local mode. It auto downloads 
Spark package in the background if no previous installation is found. For more 
details about setup, see [Spark Session](#SetupSparkSession).
+
+```{r, message=FALSE}
+sparkR.session()
+```
+
+The operations in SparkR are centered around an R class called 
`SparkDataFrame`. It is a distributed collection of data organized into named 
columns, which is conceptually equivalent to a table in a relational database 
or a data frame in R, but with richer optimizations under the hood.
+
+`SparkDataFrame` can be constructed from a wide array of sources such as: 
structured data files, tables in Hive, external databases, or existing local R 
data frames. For example, we create a `SparkDataFrame` from a local R data 
frame,
+
+```{r}
+cars <- cbind(model = rownames(mtcars), mtcars)

spark git commit: [SPARK-17406][BUILD][HOTFIX] MiMa excludes fix

2016-09-15 Thread srowen
Repository: spark
Updated Branches:
  refs/heads/master 71a65825c -> 2ad276954


[SPARK-17406][BUILD][HOTFIX] MiMa excludes fix

## What changes were proposed in this pull request?

Following https://github.com/apache/spark/pull/14969 for some reason the MiMa 
excludes weren't complete, but still passed the PR builder. This adds 3 more 
excludes from 
https://amplab.cs.berkeley.edu/jenkins/view/Spark%20QA%20Test%20(Dashboard)/job/spark-master-test-sbt-hadoop-2.2/1749/consoleFull

It also moves the excludes to their own Seq in the build, as they probably 
should have been.
Even though this is merged to 2.1.x only / master, I left the exclude in for 
2.0.x in case we back port. It's a private API so is always a false positive.

## How was this patch tested?

Jenkins build

Author: Sean Owen 

Closes #15110 from srowen/SPARK-17406.2.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2ad27695
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2ad27695
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2ad27695

Branch: refs/heads/master
Commit: 2ad276954858b0a7b3f442b9e440c72cbb1610e2
Parents: 71a6582
Author: Sean Owen 
Authored: Thu Sep 15 13:54:41 2016 +0100
Committer: Sean Owen 
Committed: Thu Sep 15 13:54:41 2016 +0100

--
 project/MimaExcludes.scala | 29 +
 1 file changed, 17 insertions(+), 12 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/2ad27695/project/MimaExcludes.scala
--
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index 37fff2e..1bdcf9a 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -426,18 +426,6 @@ object MimaExcludes {
   
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.storage.StorageStatusListener.this"),
   
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.streaming.scheduler.BatchInfo.streamIdToNumRecords"),
   
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.ui.exec.ExecutorsListener.storageStatusList"),
-  
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.ui.exec.ExecutorsListener.executorIdToData"),
-  
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.ui.exec.ExecutorsListener.executorToTasksActive"),
-  
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.ui.exec.ExecutorsListener.executorToTasksComplete"),
-  
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.ui.exec.ExecutorsListener.executorToInputRecords"),
-  
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.ui.exec.ExecutorsListener.executorToShuffleRead"),
-  
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.ui.exec.ExecutorsListener.executorToTasksFailed"),
-  
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.ui.exec.ExecutorsListener.executorToShuffleWrite"),
-  
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.ui.exec.ExecutorsListener.executorToDuration"),
-  
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.ui.exec.ExecutorsListener.executorToInputBytes"),
-  
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.ui.exec.ExecutorsListener.executorToLogUrls"),
-  
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.ui.exec.ExecutorsListener.executorToOutputBytes"),
-  
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.ui.exec.ExecutorsListener.executorToOutputRecords"),
   
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.ui.exec.ExecutorsListener.this"),
   
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.ui.storage.StorageListener.storageStatusList"),
   
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.ExceptionFailure.apply"),
@@ -807,6 +795,23 @@ object MimaExcludes {
   // SPARK-17096: Improve exception string reported through the 
StreamingQueryListener
   
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.streaming.StreamingQueryListener#QueryTerminated.stackTrace"),
   
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.streaming.StreamingQueryListener#QueryTerminated.this")
+) ++ Seq(
+  // SPARK-17406 limit timeline executor events
+  
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.ui.exec.ExecutorsListener.executorIdToData"),
+  
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.ui.exec.ExecutorsListener.executorToTasksActive"),
+  

spark git commit: [SPARK-17536][SQL] Minor performance improvement to JDBC batch inserts

2016-09-15 Thread srowen
Repository: spark
Updated Branches:
  refs/heads/master ad79fc0a8 -> 71a65825c


[SPARK-17536][SQL] Minor performance improvement to JDBC batch inserts

## What changes were proposed in this pull request?

Optimize a while loop during batch inserts

## How was this patch tested?

Unit tests were done, specifically "mvn  test" for sql

Author: John Muller 

Closes #15098 from blue666man/SPARK-17536.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/71a65825
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/71a65825
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/71a65825

Branch: refs/heads/master
Commit: 71a65825c5d5d0886ac3e11f9945cfcb39573ac3
Parents: ad79fc0
Author: John Muller 
Authored: Thu Sep 15 10:00:28 2016 +0100
Committer: Sean Owen 
Committed: Thu Sep 15 10:00:28 2016 +0100

--
 .../apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala| 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/71a65825/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
index 132472a..b09fd51 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
@@ -590,12 +590,12 @@ object JdbcUtils extends Logging {
   val stmt = insertStatement(conn, table, rddSchema, dialect)
   val setters: Array[JDBCValueSetter] = rddSchema.fields.map(_.dataType)
 .map(makeSetter(conn, dialect, _)).toArray
+  val numFields = rddSchema.fields.length
 
   try {
 var rowCount = 0
 while (iterator.hasNext) {
   val row = iterator.next()
-  val numFields = rddSchema.fields.length
   var i = 0
   while (i < numFields) {
 if (row.isNullAt(i)) {


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-17406][WEB UI] limit timeline executor events

2016-09-15 Thread srowen
Repository: spark
Updated Branches:
  refs/heads/master 647ee05e5 -> ad79fc0a8


[SPARK-17406][WEB UI] limit timeline executor events

## What changes were proposed in this pull request?
The job page will be too slow to open when there are thousands of executor 
events(added or removed). I found that in ExecutorsTab file, executorIdToData 
will not remove elements, it will increase all the time.Before this pr, it 
looks like 
[timeline1.png](https://issues.apache.org/jira/secure/attachment/12827112/timeline1.png).
 After this pr, it looks like 
[timeline2.png](https://issues.apache.org/jira/secure/attachment/12827113/timeline2.png)(we
 can set how many executor events will be displayed)

Author: cenyuhai 

Closes #14969 from cenyuhai/SPARK-17406.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ad79fc0a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ad79fc0a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ad79fc0a

Branch: refs/heads/master
Commit: ad79fc0a8407a950a03869f2f8cdc3ed0bf13875
Parents: 647ee05
Author: cenyuhai 
Authored: Thu Sep 15 09:58:53 2016 +0100
Committer: Sean Owen 
Committed: Thu Sep 15 09:58:53 2016 +0100

--
 .../apache/spark/ui/exec/ExecutorsPage.scala|  41 +++
 .../org/apache/spark/ui/exec/ExecutorsTab.scala | 112 +++
 .../org/apache/spark/ui/jobs/AllJobsPage.scala  |  66 +--
 .../apache/spark/ui/jobs/ExecutorTable.scala|   3 +-
 .../org/apache/spark/ui/jobs/JobPage.scala  |  67 ++-
 .../org/apache/spark/ui/jobs/StagePage.scala|   4 +-
 .../scala/org/apache/spark/ui/jobs/UIData.scala |   5 -
 project/MimaExcludes.scala  |  12 ++
 8 files changed, 162 insertions(+), 148 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/ad79fc0a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala
--
diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala 
b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala
index 982e891..7953d77 100644
--- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala
@@ -17,14 +17,12 @@
 
 package org.apache.spark.ui.exec
 
-import java.net.URLEncoder
 import javax.servlet.http.HttpServletRequest
 
 import scala.xml.Node
 
 import org.apache.spark.status.api.v1.ExecutorSummary
-import org.apache.spark.ui.{ToolTips, UIUtils, WebUIPage}
-import org.apache.spark.util.Utils
+import org.apache.spark.ui.{UIUtils, WebUIPage}
 
 // This isn't even used anymore -- but we need to keep it b/c of a MiMa false 
positive
 private[ui] case class ExecutorSummaryInfo(
@@ -83,18 +81,7 @@ private[spark] object ExecutorsPage {
 val memUsed = status.memUsed
 val maxMem = status.maxMem
 val diskUsed = status.diskUsed
-val totalCores = listener.executorToTotalCores.getOrElse(execId, 0)
-val maxTasks = listener.executorToTasksMax.getOrElse(execId, 0)
-val activeTasks = listener.executorToTasksActive.getOrElse(execId, 0)
-val failedTasks = listener.executorToTasksFailed.getOrElse(execId, 0)
-val completedTasks = listener.executorToTasksComplete.getOrElse(execId, 0)
-val totalTasks = activeTasks + failedTasks + completedTasks
-val totalDuration = listener.executorToDuration.getOrElse(execId, 0L)
-val totalGCTime = listener.executorToJvmGCTime.getOrElse(execId, 0L)
-val totalInputBytes = listener.executorToInputBytes.getOrElse(execId, 0L)
-val totalShuffleRead = listener.executorToShuffleRead.getOrElse(execId, 0L)
-val totalShuffleWrite = listener.executorToShuffleWrite.getOrElse(execId, 
0L)
-val executorLogs = listener.executorToLogUrls.getOrElse(execId, Map.empty)
+val taskSummary = listener.executorToTaskSummary.getOrElse(execId, 
ExecutorTaskSummary(execId))
 
 new ExecutorSummary(
   execId,
@@ -103,19 +90,19 @@ private[spark] object ExecutorsPage {
   rddBlocks,
   memUsed,
   diskUsed,
-  totalCores,
-  maxTasks,
-  activeTasks,
-  failedTasks,
-  completedTasks,
-  totalTasks,
-  totalDuration,
-  totalGCTime,
-  totalInputBytes,
-  totalShuffleRead,
-  totalShuffleWrite,
+  taskSummary.totalCores,
+  taskSummary.tasksMax,
+  taskSummary.tasksActive,
+  taskSummary.tasksFailed,
+  taskSummary.tasksComplete,
+  taskSummary.tasksActive + taskSummary.tasksFailed + 
taskSummary.tasksComplete,
+  taskSummary.duration,
+  taskSummary.jvmGCTime,
+  taskSummary.inputBytes,
+  taskSummary.shuffleRead,
+  taskSummary.shuffleWrite,
   maxMem,
-  

spark git commit: [SPARK-17521] Error when I use sparkContext.makeRDD(Seq())

2016-09-15 Thread srowen
Repository: spark
Updated Branches:
  refs/heads/master f893e2625 -> 647ee05e5


[SPARK-17521] Error when I use sparkContext.makeRDD(Seq())

## What changes were proposed in this pull request?

 when i use sc.makeRDD below
```
val data3 = sc.makeRDD(Seq())
println(data3.partitions.length)
```
I got an error:
Exception in thread "main" java.lang.IllegalArgumentException: Positive number 
of slices required

We can fix this bug just modify the last line ,do a check of seq.size
```
  def makeRDD[T: ClassTag](seq: Seq[(T, Seq[String])]): RDD[T] = withScope {
assertNotStopped()
val indexToPrefs = seq.zipWithIndex.map(t => (t._2, t._1._2)).toMap
new ParallelCollectionRDD[T](this, seq.map(_._1), math.max(seq.size, 
defaultParallelism), indexToPrefs)
  }
```

## How was this patch tested?

 manual tests

(If this patch involves UI changes, please attach a screenshot; otherwise, 
remove this)

Author: codlife <1004910...@qq.com>
Author: codlife 

Closes #15077 from codlife/master.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/647ee05e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/647ee05e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/647ee05e

Branch: refs/heads/master
Commit: 647ee05e5815bde361662a9286ac602c44b4d4e6
Parents: f893e26
Author: codlife <1004910...@qq.com>
Authored: Thu Sep 15 09:38:13 2016 +0100
Committer: Sean Owen 
Committed: Thu Sep 15 09:38:13 2016 +0100

--
 core/src/main/scala/org/apache/spark/SparkContext.scala | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/647ee05e/core/src/main/scala/org/apache/spark/SparkContext.scala
--
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala 
b/core/src/main/scala/org/apache/spark/SparkContext.scala
index e32e4aa..35b6334 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -795,7 +795,7 @@ class SparkContext(config: SparkConf) extends Logging with 
ExecutorAllocationCli
   def makeRDD[T: ClassTag](seq: Seq[(T, Seq[String])]): RDD[T] = withScope {
 assertNotStopped()
 val indexToPrefs = seq.zipWithIndex.map(t => (t._2, t._1._2)).toMap
-new ParallelCollectionRDD[T](this, seq.map(_._1), seq.size, indexToPrefs)
+new ParallelCollectionRDD[T](this, seq.map(_._1), math.max(seq.size, 1), 
indexToPrefs)
   }
 
   /**


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-17521] Error when I use sparkContext.makeRDD(Seq())

2016-09-15 Thread srowen
Repository: spark
Updated Branches:
  refs/heads/branch-2.0 bb2bdb440 -> 5c2bc8360


[SPARK-17521] Error when I use sparkContext.makeRDD(Seq())

## What changes were proposed in this pull request?

 when i use sc.makeRDD below
```
val data3 = sc.makeRDD(Seq())
println(data3.partitions.length)
```
I got an error:
Exception in thread "main" java.lang.IllegalArgumentException: Positive number 
of slices required

We can fix this bug just modify the last line ,do a check of seq.size
```
  def makeRDD[T: ClassTag](seq: Seq[(T, Seq[String])]): RDD[T] = withScope {
assertNotStopped()
val indexToPrefs = seq.zipWithIndex.map(t => (t._2, t._1._2)).toMap
new ParallelCollectionRDD[T](this, seq.map(_._1), math.max(seq.size, 
defaultParallelism), indexToPrefs)
  }
```

## How was this patch tested?

 manual tests

(If this patch involves UI changes, please attach a screenshot; otherwise, 
remove this)

Author: codlife <1004910...@qq.com>
Author: codlife 

Closes #15077 from codlife/master.

(cherry picked from commit 647ee05e5815bde361662a9286ac602c44b4d4e6)
Signed-off-by: Sean Owen 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5c2bc836
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5c2bc836
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5c2bc836

Branch: refs/heads/branch-2.0
Commit: 5c2bc8360019fb08e2e62e50bb261f7ce19b231e
Parents: bb2bdb4
Author: codlife <1004910...@qq.com>
Authored: Thu Sep 15 09:38:13 2016 +0100
Committer: Sean Owen 
Committed: Thu Sep 15 09:38:22 2016 +0100

--
 core/src/main/scala/org/apache/spark/SparkContext.scala | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/5c2bc836/core/src/main/scala/org/apache/spark/SparkContext.scala
--
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala 
b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 71511b8..214758f 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -788,7 +788,7 @@ class SparkContext(config: SparkConf) extends Logging with 
ExecutorAllocationCli
   def makeRDD[T: ClassTag](seq: Seq[(T, Seq[String])]): RDD[T] = withScope {
 assertNotStopped()
 val indexToPrefs = seq.zipWithIndex.map(t => (t._2, t._1._2)).toMap
-new ParallelCollectionRDD[T](this, seq.map(_._1), seq.size, indexToPrefs)
+new ParallelCollectionRDD[T](this, seq.map(_._1), math.max(seq.size, 1), 
indexToPrefs)
   }
 
   /**


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-17524][TESTS] Use specified spark.buffer.pageSize

2016-09-15 Thread srowen
Repository: spark
Updated Branches:
  refs/heads/master d15b4f90e -> f893e2625


[SPARK-17524][TESTS] Use specified spark.buffer.pageSize

## What changes were proposed in this pull request?

This PR has the appendRowUntilExceedingPageSize test in 
RowBasedKeyValueBatchSuite use whatever spark.buffer.pageSize value a user has 
specified to prevent a test failure for anyone testing Apache Spark on a box 
with a reduced page size. The test is currently hardcoded to use the default 
page size which is 64 MB so this minor PR is a test improvement

## How was this patch tested?
Existing unit tests with 1 MB page size and with 64 MB (the default) page size

Author: Adam Roberts 

Closes #15079 from a-roberts/patch-5.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f893e262
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f893e262
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f893e262

Branch: refs/heads/master
Commit: f893e262500e2f183de88e984300dd5b085e1f71
Parents: d15b4f9
Author: Adam Roberts 
Authored: Thu Sep 15 09:37:12 2016 +0100
Committer: Sean Owen 
Committed: Thu Sep 15 09:37:12 2016 +0100

--
 .../sql/catalyst/expressions/RowBasedKeyValueBatchSuite.java   | 6 --
 1 file changed, 4 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/f893e262/sql/catalyst/src/test/java/org/apache/spark/sql/catalyst/expressions/RowBasedKeyValueBatchSuite.java
--
diff --git 
a/sql/catalyst/src/test/java/org/apache/spark/sql/catalyst/expressions/RowBasedKeyValueBatchSuite.java
 
b/sql/catalyst/src/test/java/org/apache/spark/sql/catalyst/expressions/RowBasedKeyValueBatchSuite.java
index 0dd129c..fb3dbe8 100644
--- 
a/sql/catalyst/src/test/java/org/apache/spark/sql/catalyst/expressions/RowBasedKeyValueBatchSuite.java
+++ 
b/sql/catalyst/src/test/java/org/apache/spark/sql/catalyst/expressions/RowBasedKeyValueBatchSuite.java
@@ -338,15 +338,17 @@ public class RowBasedKeyValueBatchSuite {
 
   @Test
   public void appendRowUntilExceedingPageSize() throws Exception {
+// Use default size or spark.buffer.pageSize if specified
+int pageSizeToUse = (int) memoryManager.pageSizeBytes();
 RowBasedKeyValueBatch batch = RowBasedKeyValueBatch.allocate(keySchema,
-valueSchema, taskMemoryManager, 64 * 1024 * 1024); //enough 
capacity
+valueSchema, taskMemoryManager, pageSizeToUse); //enough capacity
 try {
   UnsafeRow key = makeKeyRow(1, "A");
   UnsafeRow value = makeValueRow(1, 1);
   int recordLength = 8 + key.getSizeInBytes() + value.getSizeInBytes() + 8;
   int totalSize = 4;
   int numRows = 0;
-  while (totalSize + recordLength < 64 * 1024 * 1024) { // default page 
size
+  while (totalSize + recordLength < pageSizeToUse) {
 appendRow(batch, key, value);
 totalSize += recordLength;
 numRows++;


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-17507][ML][MLLIB] check weight vector size in ANN

2016-09-15 Thread srowen
Repository: spark
Updated Branches:
  refs/heads/master 6a6adb167 -> d15b4f90e


[SPARK-17507][ML][MLLIB] check weight vector size in ANN

## What changes were proposed in this pull request?

as the TODO described,
check weight vector size and if wrong throw exception.

## How was this patch tested?

existing tests.

Author: WeichenXu 

Closes #15060 from WeichenXu123/check_input_weight_size_of_ann.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d15b4f90
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d15b4f90
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d15b4f90

Branch: refs/heads/master
Commit: d15b4f90e64f7ec5cf14c7c57d2cb4234c3ce677
Parents: 6a6adb1
Author: WeichenXu 
Authored: Thu Sep 15 09:30:15 2016 +0100
Committer: Sean Owen 
Committed: Thu Sep 15 09:30:15 2016 +0100

--
 mllib/src/main/scala/org/apache/spark/ml/ann/Layer.scala | 10 --
 1 file changed, 4 insertions(+), 6 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/d15b4f90/mllib/src/main/scala/org/apache/spark/ml/ann/Layer.scala
--
diff --git a/mllib/src/main/scala/org/apache/spark/ml/ann/Layer.scala 
b/mllib/src/main/scala/org/apache/spark/ml/ann/Layer.scala
index 88909a9..e7e0dae 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/ann/Layer.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/ann/Layer.scala
@@ -545,7 +545,9 @@ private[ann] object FeedForwardModel {
* @return model
*/
   def apply(topology: FeedForwardTopology, weights: Vector): FeedForwardModel 
= {
-// TODO: check that weights size is equal to sum of layers sizes
+val expectedWeightSize = topology.layers.map(_.weightSize).sum
+require(weights.size == expectedWeightSize,
+  s"Expected weight vector of size ${expectedWeightSize} but got size 
${weights.size}.")
 new FeedForwardModel(weights, topology)
   }
 
@@ -559,11 +561,7 @@ private[ann] object FeedForwardModel {
   def apply(topology: FeedForwardTopology, seed: Long = 11L): FeedForwardModel 
= {
 val layers = topology.layers
 val layerModels = new Array[LayerModel](layers.length)
-var totalSize = 0
-for (i <- 0 until topology.layers.length) {
-  totalSize += topology.layers(i).weightSize
-}
-val weights = BDV.zeros[Double](totalSize)
+val weights = BDV.zeros[Double](topology.layers.map(_.weightSize).sum)
 var offset = 0
 val random = new XORShiftRandom(seed)
 for (i <- 0 until layers.length) {


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-17440][SPARK-17441] Fixed Multiple Bugs in ALTER TABLE

2016-09-15 Thread wenchen
Repository: spark
Updated Branches:
  refs/heads/master bb3229436 -> 6a6adb167


[SPARK-17440][SPARK-17441] Fixed Multiple Bugs in ALTER TABLE

### What changes were proposed in this pull request?
For the following `ALTER TABLE` DDL, we should issue an exception when the 
target table is a `VIEW`:
```SQL
 ALTER TABLE viewName SET LOCATION '/path/to/your/lovely/heart'

 ALTER TABLE viewName SET SERDE 'whatever'

 ALTER TABLE viewName SET SERDEPROPERTIES ('x' = 'y')

 ALTER TABLE viewName PARTITION (a=1, b=2) SET SERDEPROPERTIES ('x' = 'y')

 ALTER TABLE viewName ADD IF NOT EXISTS PARTITION (a='4', b='8')

 ALTER TABLE viewName DROP IF EXISTS PARTITION (a='2')

 ALTER TABLE viewName RECOVER PARTITIONS

 ALTER TABLE viewName PARTITION (a='1', b='q') RENAME TO PARTITION (a='100', 
b='p')
```

In addition, `ALTER TABLE RENAME PARTITION` is unable to handle data source 
tables, just like the other `ALTER PARTITION` commands. We should issue an 
exception instead.

### How was this patch tested?
Added a few test cases.

Author: gatorsmile 

Closes #15004 from gatorsmile/altertable.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6a6adb16
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6a6adb16
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6a6adb16

Branch: refs/heads/master
Commit: 6a6adb1673775df63a62270879eac70f5f8d7d75
Parents: bb32294
Author: gatorsmile 
Authored: Thu Sep 15 14:43:10 2016 +0800
Committer: Wenchen Fan 
Committed: Thu Sep 15 14:43:10 2016 +0800

--
 .../spark/sql/execution/command/ddl.scala   | 45 +
 .../spark/sql/execution/command/tables.scala|  4 +-
 .../spark/sql/execution/command/DDLSuite.scala  | 63 ++
 .../spark/sql/hive/execution/HiveDDLSuite.scala | 67 +++-
 4 files changed, 120 insertions(+), 59 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/6a6adb16/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
index dcda2f8..c0ccdca 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
@@ -230,8 +230,8 @@ case class AlterTableSetPropertiesCommand(
 
   override def run(sparkSession: SparkSession): Seq[Row] = {
 val catalog = sparkSession.sessionState.catalog
-DDLUtils.verifyAlterTableType(catalog, tableName, isView)
 val table = catalog.getTableMetadata(tableName)
+DDLUtils.verifyAlterTableType(catalog, table, isView)
 // This overrides old properties
 val newTable = table.copy(properties = table.properties ++ properties)
 catalog.alterTable(newTable)
@@ -258,8 +258,8 @@ case class AlterTableUnsetPropertiesCommand(
 
   override def run(sparkSession: SparkSession): Seq[Row] = {
 val catalog = sparkSession.sessionState.catalog
-DDLUtils.verifyAlterTableType(catalog, tableName, isView)
 val table = catalog.getTableMetadata(tableName)
+DDLUtils.verifyAlterTableType(catalog, table, isView)
 if (!ifExists) {
   propKeys.foreach { k =>
 if (!table.properties.contains(k)) {
@@ -299,6 +299,7 @@ case class AlterTableSerDePropertiesCommand(
   override def run(sparkSession: SparkSession): Seq[Row] = {
 val catalog = sparkSession.sessionState.catalog
 val table = catalog.getTableMetadata(tableName)
+DDLUtils.verifyAlterTableType(catalog, table, isView = false)
 // For datasource tables, disallow setting serde or specifying partition
 if (partSpec.isDefined && DDLUtils.isDatasourceTable(table)) {
   throw new AnalysisException("Operation not allowed: ALTER TABLE SET " +
@@ -348,6 +349,7 @@ case class AlterTableAddPartitionCommand(
   override def run(sparkSession: SparkSession): Seq[Row] = {
 val catalog = sparkSession.sessionState.catalog
 val table = catalog.getTableMetadata(tableName)
+DDLUtils.verifyAlterTableType(catalog, table, isView = false)
 if (DDLUtils.isDatasourceTable(table)) {
   throw new AnalysisException(
 "ALTER TABLE ADD PARTITION is not allowed for tables defined using the 
datasource API")
@@ -377,7 +379,14 @@ case class AlterTableRenamePartitionCommand(
   extends RunnableCommand {
 
   override def run(sparkSession: SparkSession): Seq[Row] = {
-sparkSession.sessionState.catalog.renamePartitions(
+val catalog = sparkSession.sessionState.catalog
+val table = catalog.getTableMetadata(tableName)
+if (DDLUtils.isDatasourceTable(table)) {
+