spark git commit: [SPARK-18339][SPARK-18513][SQL] Don't push down current_timestamp for filters in StructuredStreaming and persist batch and watermark timestamps to offset log.

2016-11-28 Thread tdas
Repository: spark
Updated Branches:
  refs/heads/branch-2.1 27a1a5c99 -> ea6957da2


[SPARK-18339][SPARK-18513][SQL] Don't push down current_timestamp for filters 
in StructuredStreaming and persist batch and watermark timestamps to offset log.

## What changes were proposed in this pull request?

For the following workflow:
1. I have a column called time which is at minute level precision in a 
Streaming DataFrame
2. I want to perform groupBy time, count
3. Then I want my MemorySink to only have the last 30 minutes of counts and I 
perform this by
.where('time >= current_timestamp().cast("long") - 30 * 60)
what happens is that the `filter` gets pushed down before the aggregation, and 
the filter happens on the source data for the aggregation instead of the result 
of the aggregation (where I actually want to filter).
I guess the main issue here is that `current_timestamp` is non-deterministic in 
the streaming context and shouldn't be pushed down the filter.
Does this require us to store the `current_timestamp` for each trigger of the 
streaming job, that is something to discuss.

Furthermore, we want to persist current batch timestamp and watermark timestamp 
to the offset log so that these values are consistent across multiple 
executions of the same batch.

brkyvz zsxwing tdas

## How was this patch tested?

A test was added to StreamingAggregationSuite ensuring the above use case is 
handled. The test injects a stream of time values (in seconds) to a query that 
runs in complete mode and only outputs the (count) aggregation results for the 
past 10 seconds.

Author: Tyson Condie 

Closes #15949 from tcondie/SPARK-18339.

(cherry picked from commit 3c0beea4752d39ee630a107316f40aff4a1b4ae7)
Signed-off-by: Tathagata Das 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ea6957da
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ea6957da
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ea6957da

Branch: refs/heads/branch-2.1
Commit: ea6957da20d3e03b95342a03a188c7ab5880cac7
Parents: 27a1a5c
Author: Tyson Condie 
Authored: Mon Nov 28 23:07:17 2016 -0800
Committer: Tathagata Das 
Committed: Mon Nov 28 23:08:47 2016 -0800

--
 .../expressions/datetimeExpressions.scala   |  33 +-
 .../streaming/IncrementalExecution.scala|  19 +++-
 .../execution/streaming/StreamExecution.scala   |  67 ++---
 .../execution/streaming/StreamProgress.scala|   4 +-
 .../spark/sql/execution/streaming/memory.scala  |   4 +
 .../StreamExecutionMetadataSuite.scala  |  35 +++
 .../streaming/StreamingAggregationSuite.scala   | 100 +++
 .../sql/streaming/StreamingQuerySuite.scala |   4 +-
 .../spark/sql/streaming/WatermarkSuite.scala|  40 +---
 9 files changed, 273 insertions(+), 33 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/ea6957da/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala
index 1db1d19..ef1ac36 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala
@@ -17,14 +17,14 @@
 
 package org.apache.spark.sql.catalyst.expressions
 
+import java.sql.Timestamp
 import java.text.SimpleDateFormat
 import java.util.{Calendar, Locale, TimeZone}
 
 import scala.util.Try
 
 import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, 
CodegenFallback,
-  ExprCode}
+import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, 
CodegenFallback, ExprCode}
 import org.apache.spark.sql.catalyst.util.DateTimeUtils
 import org.apache.spark.sql.types._
 import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String}
@@ -72,6 +72,35 @@ case class CurrentTimestamp() extends LeafExpression with 
CodegenFallback {
 }
 
 /**
+ * Expression representing the current batch time, which is used by 
StreamExecution to
+ * 1. prevent optimizer from pushing this expression below a stateful operator
+ * 2. allow IncrementalExecution to substitute this expression with a 
Literal(timestamp)
+ *
+ * There is no code generation since this expression should be replaced with a 
literal.
+ */
+case class CurrentBatchTimestamp(timestampMs: Long, dataType: DataType)
+  extends LeafExpression with Nondeterministic with 

spark git commit: [SPARK-18339][SPARK-18513][SQL] Don't push down current_timestamp for filters in StructuredStreaming and persist batch and watermark timestamps to offset log.

2016-11-28 Thread tdas
Repository: spark
Updated Branches:
  refs/heads/master e2318ede0 -> 3c0beea47


[SPARK-18339][SPARK-18513][SQL] Don't push down current_timestamp for filters 
in StructuredStreaming and persist batch and watermark timestamps to offset log.

## What changes were proposed in this pull request?

For the following workflow:
1. I have a column called time which is at minute level precision in a 
Streaming DataFrame
2. I want to perform groupBy time, count
3. Then I want my MemorySink to only have the last 30 minutes of counts and I 
perform this by
.where('time >= current_timestamp().cast("long") - 30 * 60)
what happens is that the `filter` gets pushed down before the aggregation, and 
the filter happens on the source data for the aggregation instead of the result 
of the aggregation (where I actually want to filter).
I guess the main issue here is that `current_timestamp` is non-deterministic in 
the streaming context and shouldn't be pushed down the filter.
Does this require us to store the `current_timestamp` for each trigger of the 
streaming job, that is something to discuss.

Furthermore, we want to persist current batch timestamp and watermark timestamp 
to the offset log so that these values are consistent across multiple 
executions of the same batch.

brkyvz zsxwing tdas

## How was this patch tested?

A test was added to StreamingAggregationSuite ensuring the above use case is 
handled. The test injects a stream of time values (in seconds) to a query that 
runs in complete mode and only outputs the (count) aggregation results for the 
past 10 seconds.

Author: Tyson Condie 

Closes #15949 from tcondie/SPARK-18339.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3c0beea4
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3c0beea4
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3c0beea4

Branch: refs/heads/master
Commit: 3c0beea4752d39ee630a107316f40aff4a1b4ae7
Parents: e2318ed
Author: Tyson Condie 
Authored: Mon Nov 28 23:07:17 2016 -0800
Committer: Tathagata Das 
Committed: Mon Nov 28 23:07:17 2016 -0800

--
 .../expressions/datetimeExpressions.scala   |  33 +-
 .../streaming/IncrementalExecution.scala|  19 +++-
 .../execution/streaming/StreamExecution.scala   |  67 ++---
 .../execution/streaming/StreamProgress.scala|   4 +-
 .../spark/sql/execution/streaming/memory.scala  |   4 +
 .../StreamExecutionMetadataSuite.scala  |  35 +++
 .../streaming/StreamingAggregationSuite.scala   | 100 +++
 .../sql/streaming/StreamingQuerySuite.scala |   4 +-
 .../spark/sql/streaming/WatermarkSuite.scala|  40 +---
 9 files changed, 273 insertions(+), 33 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/3c0beea4/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala
index 1db1d19..ef1ac36 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala
@@ -17,14 +17,14 @@
 
 package org.apache.spark.sql.catalyst.expressions
 
+import java.sql.Timestamp
 import java.text.SimpleDateFormat
 import java.util.{Calendar, Locale, TimeZone}
 
 import scala.util.Try
 
 import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, 
CodegenFallback,
-  ExprCode}
+import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, 
CodegenFallback, ExprCode}
 import org.apache.spark.sql.catalyst.util.DateTimeUtils
 import org.apache.spark.sql.types._
 import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String}
@@ -72,6 +72,35 @@ case class CurrentTimestamp() extends LeafExpression with 
CodegenFallback {
 }
 
 /**
+ * Expression representing the current batch time, which is used by 
StreamExecution to
+ * 1. prevent optimizer from pushing this expression below a stateful operator
+ * 2. allow IncrementalExecution to substitute this expression with a 
Literal(timestamp)
+ *
+ * There is no code generation since this expression should be replaced with a 
literal.
+ */
+case class CurrentBatchTimestamp(timestampMs: Long, dataType: DataType)
+  extends LeafExpression with Nondeterministic with CodegenFallback {
+
+  override def nullable: Boolean = false
+
+  override def prettyName: String = "current_batch_timestamp"
+
+  

spark git commit: [SPARK-18544][SQL] Append with df.saveAsTable writes data to wrong location

2016-11-28 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/branch-2.1 1759cf69a -> 27a1a5c99


[SPARK-18544][SQL] Append with df.saveAsTable writes data to wrong location

## What changes were proposed in this pull request?

We failed to properly propagate table metadata for existing tables for the 
saveAsTable command. This caused a downstream component to think the table was 
MANAGED, writing data to the wrong location.

## How was this patch tested?

Unit test that fails before the patch.

Author: Eric Liang 

Closes #15983 from ericl/spark-18544.

(cherry picked from commit e2318ede04fa7a756d1c8151775e1f2406a176ca)
Signed-off-by: Reynold Xin 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/27a1a5c9
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/27a1a5c9
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/27a1a5c9

Branch: refs/heads/branch-2.1
Commit: 27a1a5c99ff471ee15b56995d56cfd39b3ffe6e8
Parents: 1759cf6
Author: Eric Liang 
Authored: Mon Nov 28 21:58:01 2016 -0800
Committer: Reynold Xin 
Committed: Mon Nov 28 21:58:10 2016 -0800

--
 .../org/apache/spark/sql/DataFrameWriter.scala  | 21 
 .../command/createDataSourceTables.scala|  3 ++-
 .../PartitionProviderCompatibilitySuite.scala   | 19 ++
 3 files changed, 34 insertions(+), 9 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/27a1a5c9/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
--
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index 2d86342..8294e41 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -373,8 +373,19 @@ final class DataFrameWriter[T] private[sql](ds: 
Dataset[T]) {
 throw new AnalysisException(s"Table $tableIdent already exists.")
 
   case _ =>
-val storage = 
DataSource.buildStorageFormatFromOptions(extraOptions.toMap)
-val tableType = if (storage.locationUri.isDefined) {
+val existingTable = if (tableExists) {
+  
Some(df.sparkSession.sessionState.catalog.getTableMetadata(tableIdent))
+} else {
+  None
+}
+val storage = if (tableExists) {
+  existingTable.get.storage
+} else {
+  DataSource.buildStorageFormatFromOptions(extraOptions.toMap)
+}
+val tableType = if (tableExists) {
+  existingTable.get.tableType
+} else if (storage.locationUri.isDefined) {
   CatalogTableType.EXTERNAL
 } else {
   CatalogTableType.MANAGED
@@ -391,12 +402,6 @@ final class DataFrameWriter[T] private[sql](ds: 
Dataset[T]) {
 )
 df.sparkSession.sessionState.executePlan(
   CreateTable(tableDesc, mode, Some(df.logicalPlan))).toRdd
-if (tableDesc.partitionColumnNames.nonEmpty &&
-df.sparkSession.sqlContext.conf.manageFilesourcePartitions) {
-  // Need to recover partitions into the metastore so our saved data 
is visible.
-  df.sparkSession.sessionState.executePlan(
-AlterTableRecoverPartitionsCommand(tableDesc.identifier)).toRdd
-}
 }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/27a1a5c9/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
index add732c..422700c 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
@@ -212,7 +212,8 @@ case class CreateDataSourceTableAsSelectCommand(
   className = provider,
   partitionColumns = table.partitionColumnNames,
   bucketSpec = table.bucketSpec,
-  options = table.storage.properties ++ pathOption)
+  options = table.storage.properties ++ pathOption,
+  catalogTable = Some(table))
 
 val result = try {
   dataSource.write(mode, df)

http://git-wip-us.apache.org/repos/asf/spark/blob/27a1a5c9/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionProviderCompatibilitySuite.scala
--
diff --git 

spark git commit: [SPARK-18544][SQL] Append with df.saveAsTable writes data to wrong location

2016-11-28 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/master d449988b8 -> e2318ede0


[SPARK-18544][SQL] Append with df.saveAsTable writes data to wrong location

## What changes were proposed in this pull request?

We failed to properly propagate table metadata for existing tables for the 
saveAsTable command. This caused a downstream component to think the table was 
MANAGED, writing data to the wrong location.

## How was this patch tested?

Unit test that fails before the patch.

Author: Eric Liang 

Closes #15983 from ericl/spark-18544.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e2318ede
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e2318ede
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e2318ede

Branch: refs/heads/master
Commit: e2318ede04fa7a756d1c8151775e1f2406a176ca
Parents: d449988
Author: Eric Liang 
Authored: Mon Nov 28 21:58:01 2016 -0800
Committer: Reynold Xin 
Committed: Mon Nov 28 21:58:01 2016 -0800

--
 .../org/apache/spark/sql/DataFrameWriter.scala  | 21 
 .../command/createDataSourceTables.scala|  3 ++-
 .../PartitionProviderCompatibilitySuite.scala   | 19 ++
 3 files changed, 34 insertions(+), 9 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/e2318ede/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
--
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index 2d86342..8294e41 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -373,8 +373,19 @@ final class DataFrameWriter[T] private[sql](ds: 
Dataset[T]) {
 throw new AnalysisException(s"Table $tableIdent already exists.")
 
   case _ =>
-val storage = 
DataSource.buildStorageFormatFromOptions(extraOptions.toMap)
-val tableType = if (storage.locationUri.isDefined) {
+val existingTable = if (tableExists) {
+  
Some(df.sparkSession.sessionState.catalog.getTableMetadata(tableIdent))
+} else {
+  None
+}
+val storage = if (tableExists) {
+  existingTable.get.storage
+} else {
+  DataSource.buildStorageFormatFromOptions(extraOptions.toMap)
+}
+val tableType = if (tableExists) {
+  existingTable.get.tableType
+} else if (storage.locationUri.isDefined) {
   CatalogTableType.EXTERNAL
 } else {
   CatalogTableType.MANAGED
@@ -391,12 +402,6 @@ final class DataFrameWriter[T] private[sql](ds: 
Dataset[T]) {
 )
 df.sparkSession.sessionState.executePlan(
   CreateTable(tableDesc, mode, Some(df.logicalPlan))).toRdd
-if (tableDesc.partitionColumnNames.nonEmpty &&
-df.sparkSession.sqlContext.conf.manageFilesourcePartitions) {
-  // Need to recover partitions into the metastore so our saved data 
is visible.
-  df.sparkSession.sessionState.executePlan(
-AlterTableRecoverPartitionsCommand(tableDesc.identifier)).toRdd
-}
 }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/e2318ede/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
index add732c..422700c 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
@@ -212,7 +212,8 @@ case class CreateDataSourceTableAsSelectCommand(
   className = provider,
   partitionColumns = table.partitionColumnNames,
   bucketSpec = table.bucketSpec,
-  options = table.storage.properties ++ pathOption)
+  options = table.storage.properties ++ pathOption,
+  catalogTable = Some(table))
 
 val result = try {
   dataSource.write(mode, df)

http://git-wip-us.apache.org/repos/asf/spark/blob/e2318ede/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionProviderCompatibilitySuite.scala
--
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionProviderCompatibilitySuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionProviderCompatibilitySuite.scala
index 

spark git commit: [SPARK-18058][SQL][TRIVIAL] Use dataType.sameResult(...) instead equality on asNullable datatypes

2016-11-28 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/branch-2.1 c4cbdc864 -> 1759cf69a


[SPARK-18058][SQL][TRIVIAL] Use dataType.sameResult(...) instead equality on 
asNullable datatypes

## What changes were proposed in this pull request?
This is absolutely minor. PR https://github.com/apache/spark/pull/15595 uses 
`dt1.asNullable == dt2.asNullable` expressions in a few places. It is however 
more efficient to call `dt1.sameType(dt2)`. I have replaced every instance of 
the first pattern with the second pattern (3/5 were introduced by #15595).

## How was this patch tested?
Existing tests.

Author: Herman van Hovell 

Closes #16041 from hvanhovell/SPARK-18058.

(cherry picked from commit d449988b8819775fcfd27da53bb5143a7aab01f7)
Signed-off-by: Reynold Xin 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1759cf69
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1759cf69
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1759cf69

Branch: refs/heads/branch-2.1
Commit: 1759cf69aa1a7059a5fe78d012a54bc0ba02677c
Parents: c4cbdc8
Author: Herman van Hovell 
Authored: Mon Nov 28 21:43:33 2016 -0800
Committer: Reynold Xin 
Committed: Mon Nov 28 21:43:38 2016 -0800

--
 .../org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala | 2 +-
 .../sql/catalyst/expressions/conditionalExpressions.scala  | 2 +-
 .../sql/catalyst/plans/logical/basicLogicalOperators.scala | 6 +++---
 .../spark/sql/execution/datasources/DataSourceStrategy.scala   | 2 +-
 4 files changed, 6 insertions(+), 6 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/1759cf69/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
index 26d2638..db41752 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
@@ -306,7 +306,7 @@ trait CheckAnalysis extends PredicateHelper {
   // Check if the data types match.
   dataTypes(child).zip(ref).zipWithIndex.foreach { case ((dt1, 
dt2), ci) =>
 // SPARK-18058: we shall not care about the nullability of 
columns
-if (dt1.asNullable != dt2.asNullable) {
+if (!dt1.sameType(dt2)) {
   failAnalysis(
 s"""
   |${operator.nodeName} can only be performed on tables 
with the compatible

http://git-wip-us.apache.org/repos/asf/spark/blob/1759cf69/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/conditionalExpressions.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/conditionalExpressions.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/conditionalExpressions.scala
index a7d9e2d..afc190e 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/conditionalExpressions.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/conditionalExpressions.scala
@@ -41,7 +41,7 @@ case class If(predicate: Expression, trueValue: Expression, 
falseValue: Expressi
 if (predicate.dataType != BooleanType) {
   TypeCheckResult.TypeCheckFailure(
 s"type of predicate expression in If should be boolean, not 
${predicate.dataType}")
-} else if (trueValue.dataType.asNullable != 
falseValue.dataType.asNullable) {
+} else if (!trueValue.dataType.sameType(falseValue.dataType)) {
   TypeCheckResult.TypeCheckFailure(s"differing types in '$sql' " +
 s"(${trueValue.dataType.simpleString} and 
${falseValue.dataType.simpleString}).")
 } else {

http://git-wip-us.apache.org/repos/asf/spark/blob/1759cf69/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
index dd6c8fd..da42df3 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
+++ 

spark git commit: [SPARK-18058][SQL][TRIVIAL] Use dataType.sameResult(...) instead equality on asNullable datatypes

2016-11-28 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/master 8b325b17e -> d449988b8


[SPARK-18058][SQL][TRIVIAL] Use dataType.sameResult(...) instead equality on 
asNullable datatypes

## What changes were proposed in this pull request?
This is absolutely minor. PR https://github.com/apache/spark/pull/15595 uses 
`dt1.asNullable == dt2.asNullable` expressions in a few places. It is however 
more efficient to call `dt1.sameType(dt2)`. I have replaced every instance of 
the first pattern with the second pattern (3/5 were introduced by #15595).

## How was this patch tested?
Existing tests.

Author: Herman van Hovell 

Closes #16041 from hvanhovell/SPARK-18058.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d449988b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d449988b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d449988b

Branch: refs/heads/master
Commit: d449988b8819775fcfd27da53bb5143a7aab01f7
Parents: 8b325b1
Author: Herman van Hovell 
Authored: Mon Nov 28 21:43:33 2016 -0800
Committer: Reynold Xin 
Committed: Mon Nov 28 21:43:33 2016 -0800

--
 .../org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala | 2 +-
 .../sql/catalyst/expressions/conditionalExpressions.scala  | 2 +-
 .../sql/catalyst/plans/logical/basicLogicalOperators.scala | 6 +++---
 .../spark/sql/execution/datasources/DataSourceStrategy.scala   | 2 +-
 4 files changed, 6 insertions(+), 6 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/d449988b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
index 26d2638..db41752 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
@@ -306,7 +306,7 @@ trait CheckAnalysis extends PredicateHelper {
   // Check if the data types match.
   dataTypes(child).zip(ref).zipWithIndex.foreach { case ((dt1, 
dt2), ci) =>
 // SPARK-18058: we shall not care about the nullability of 
columns
-if (dt1.asNullable != dt2.asNullable) {
+if (!dt1.sameType(dt2)) {
   failAnalysis(
 s"""
   |${operator.nodeName} can only be performed on tables 
with the compatible

http://git-wip-us.apache.org/repos/asf/spark/blob/d449988b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/conditionalExpressions.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/conditionalExpressions.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/conditionalExpressions.scala
index a7d9e2d..afc190e 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/conditionalExpressions.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/conditionalExpressions.scala
@@ -41,7 +41,7 @@ case class If(predicate: Expression, trueValue: Expression, 
falseValue: Expressi
 if (predicate.dataType != BooleanType) {
   TypeCheckResult.TypeCheckFailure(
 s"type of predicate expression in If should be boolean, not 
${predicate.dataType}")
-} else if (trueValue.dataType.asNullable != 
falseValue.dataType.asNullable) {
+} else if (!trueValue.dataType.sameType(falseValue.dataType)) {
   TypeCheckResult.TypeCheckFailure(s"differing types in '$sql' " +
 s"(${trueValue.dataType.simpleString} and 
${falseValue.dataType.simpleString}).")
 } else {

http://git-wip-us.apache.org/repos/asf/spark/blob/d449988b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
index 4e333d5..7aaefc8 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
@@ -134,7 +134,7 @@ abstract class SetOperation(left: LogicalPlan, right: 
LogicalPlan) extends Binar
 childrenResolved 

spark git commit: [SPARK-18547][CORE] Propagate I/O encryption key when executors register.

2016-11-28 Thread zsxwing
Repository: spark
Updated Branches:
  refs/heads/branch-2.1 45e2b3c0e -> c4cbdc864


[SPARK-18547][CORE] Propagate I/O encryption key when executors register.

This change modifies the method used to propagate encryption keys used during
shuffle. Instead of relying on YARN's UserGroupInformation credential 
propagation,
this change explicitly distributes the key using the messages exchanged between
driver and executor during registration. When RPC encryption is enabled, this 
means
key propagation is also secure.

This allows shuffle encryption to work in non-YARN mode, which means that it's
easier to write unit tests for areas of the code that are affected by the 
feature.

The key is stored in the SecurityManager; because there are many instances of
that class used in the code, the key is only guaranteed to exist in the instance
managed by the SparkEnv. This path was chosen to avoid storing the key in the
SparkConf, which would risk having the key being written to disk as part of the
configuration (as, for example, is done when starting YARN applications).

Tested by new and existing unit tests (which were moved from the YARN module to
core), and by running apps with shuffle encryption enabled.

Author: Marcelo Vanzin 

Closes #15981 from vanzin/SPARK-18547.

(cherry picked from commit 8b325b17ecdf013b7a6edcb7ee3773546bd914df)
Signed-off-by: Shixiong Zhu 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c4cbdc86
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c4cbdc86
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c4cbdc86

Branch: refs/heads/branch-2.1
Commit: c4cbdc864f7191ab1d49cdc360fe78ec16f48db5
Parents: 45e2b3c
Author: Marcelo Vanzin 
Authored: Mon Nov 28 21:10:57 2016 -0800
Committer: Shixiong Zhu 
Committed: Mon Nov 28 21:11:04 2016 -0800

--
 .../org/apache/spark/SecurityManager.scala  |  23 +---
 .../scala/org/apache/spark/SparkContext.scala   |   4 -
 .../main/scala/org/apache/spark/SparkEnv.scala  |  33 +++--
 .../executor/CoarseGrainedExecutorBackend.scala |   6 +-
 .../cluster/CoarseGrainedClusterMessage.scala   |   7 +-
 .../cluster/CoarseGrainedSchedulerBackend.scala |   6 +-
 .../spark/security/CryptoStreamUtils.scala  |  28 ++--
 .../spark/serializer/SerializerManager.scala|  18 ++-
 .../spark/security/CryptoStreamUtilsSuite.scala | 135 +++
 docs/configuration.md   |   3 +-
 .../spark/executor/MesosExecutorBackend.scala   |   2 +-
 .../cluster/mesos/MesosClusterManager.scala |   4 +
 .../mesos/MesosClusterManagerSuite.scala|  11 +-
 .../org/apache/spark/deploy/yarn/Client.scala   |   5 -
 .../spark/deploy/yarn/IOEncryptionSuite.scala   | 108 ---
 15 files changed, 166 insertions(+), 227 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/c4cbdc86/core/src/main/scala/org/apache/spark/SecurityManager.scala
--
diff --git a/core/src/main/scala/org/apache/spark/SecurityManager.scala 
b/core/src/main/scala/org/apache/spark/SecurityManager.scala
index 199365a..87fe563 100644
--- a/core/src/main/scala/org/apache/spark/SecurityManager.scala
+++ b/core/src/main/scala/org/apache/spark/SecurityManager.scala
@@ -21,7 +21,6 @@ import java.lang.{Byte => JByte}
 import java.net.{Authenticator, PasswordAuthentication}
 import java.security.{KeyStore, SecureRandom}
 import java.security.cert.X509Certificate
-import javax.crypto.KeyGenerator
 import javax.net.ssl._
 
 import com.google.common.hash.HashCodes
@@ -33,7 +32,6 @@ import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.internal.Logging
 import org.apache.spark.internal.config._
 import org.apache.spark.network.sasl.SecretKeyHolder
-import org.apache.spark.security.CryptoStreamUtils._
 import org.apache.spark.util.Utils
 
 /**
@@ -185,7 +183,9 @@ import org.apache.spark.util.Utils
  *  setting `spark.ssl.useNodeLocalConf` to `true`.
  */
 
-private[spark] class SecurityManager(sparkConf: SparkConf)
+private[spark] class SecurityManager(
+sparkConf: SparkConf,
+ioEncryptionKey: Option[Array[Byte]] = None)
   extends Logging with SecretKeyHolder {
 
   import SecurityManager._
@@ -415,6 +415,8 @@ private[spark] class SecurityManager(sparkConf: SparkConf)
 logInfo("Changing acls enabled to: " + aclsOn)
   }
 
+  def getIOEncryptionKey(): Option[Array[Byte]] = ioEncryptionKey
+
   /**
* Generates or looks up the secret key.
*
@@ -559,19 +561,4 @@ private[spark] object SecurityManager {
   // key used to store the spark secret in the Hadoop UGI
   val SECRET_LOOKUP_KEY = "sparkCookie"
 
-  /**
-   * Setup the cryptographic key used by IO 

spark git commit: [SPARK-18547][CORE] Propagate I/O encryption key when executors register.

2016-11-28 Thread zsxwing
Repository: spark
Updated Branches:
  refs/heads/master 1633ff3b6 -> 8b325b17e


[SPARK-18547][CORE] Propagate I/O encryption key when executors register.

This change modifies the method used to propagate encryption keys used during
shuffle. Instead of relying on YARN's UserGroupInformation credential 
propagation,
this change explicitly distributes the key using the messages exchanged between
driver and executor during registration. When RPC encryption is enabled, this 
means
key propagation is also secure.

This allows shuffle encryption to work in non-YARN mode, which means that it's
easier to write unit tests for areas of the code that are affected by the 
feature.

The key is stored in the SecurityManager; because there are many instances of
that class used in the code, the key is only guaranteed to exist in the instance
managed by the SparkEnv. This path was chosen to avoid storing the key in the
SparkConf, which would risk having the key being written to disk as part of the
configuration (as, for example, is done when starting YARN applications).

Tested by new and existing unit tests (which were moved from the YARN module to
core), and by running apps with shuffle encryption enabled.

Author: Marcelo Vanzin 

Closes #15981 from vanzin/SPARK-18547.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8b325b17
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8b325b17
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8b325b17

Branch: refs/heads/master
Commit: 8b325b17ecdf013b7a6edcb7ee3773546bd914df
Parents: 1633ff3
Author: Marcelo Vanzin 
Authored: Mon Nov 28 21:10:57 2016 -0800
Committer: Shixiong Zhu 
Committed: Mon Nov 28 21:10:57 2016 -0800

--
 .../org/apache/spark/SecurityManager.scala  |  23 +---
 .../scala/org/apache/spark/SparkContext.scala   |   4 -
 .../main/scala/org/apache/spark/SparkEnv.scala  |  33 +++--
 .../executor/CoarseGrainedExecutorBackend.scala |   6 +-
 .../cluster/CoarseGrainedClusterMessage.scala   |   7 +-
 .../cluster/CoarseGrainedSchedulerBackend.scala |   6 +-
 .../spark/security/CryptoStreamUtils.scala  |  28 ++--
 .../spark/serializer/SerializerManager.scala|  18 ++-
 .../spark/security/CryptoStreamUtilsSuite.scala | 135 +++
 docs/configuration.md   |   3 +-
 .../spark/executor/MesosExecutorBackend.scala   |   2 +-
 .../cluster/mesos/MesosClusterManager.scala |   4 +
 .../mesos/MesosClusterManagerSuite.scala|  11 +-
 .../org/apache/spark/deploy/yarn/Client.scala   |   5 -
 .../spark/deploy/yarn/IOEncryptionSuite.scala   | 108 ---
 15 files changed, 166 insertions(+), 227 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/8b325b17/core/src/main/scala/org/apache/spark/SecurityManager.scala
--
diff --git a/core/src/main/scala/org/apache/spark/SecurityManager.scala 
b/core/src/main/scala/org/apache/spark/SecurityManager.scala
index 199365a..87fe563 100644
--- a/core/src/main/scala/org/apache/spark/SecurityManager.scala
+++ b/core/src/main/scala/org/apache/spark/SecurityManager.scala
@@ -21,7 +21,6 @@ import java.lang.{Byte => JByte}
 import java.net.{Authenticator, PasswordAuthentication}
 import java.security.{KeyStore, SecureRandom}
 import java.security.cert.X509Certificate
-import javax.crypto.KeyGenerator
 import javax.net.ssl._
 
 import com.google.common.hash.HashCodes
@@ -33,7 +32,6 @@ import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.internal.Logging
 import org.apache.spark.internal.config._
 import org.apache.spark.network.sasl.SecretKeyHolder
-import org.apache.spark.security.CryptoStreamUtils._
 import org.apache.spark.util.Utils
 
 /**
@@ -185,7 +183,9 @@ import org.apache.spark.util.Utils
  *  setting `spark.ssl.useNodeLocalConf` to `true`.
  */
 
-private[spark] class SecurityManager(sparkConf: SparkConf)
+private[spark] class SecurityManager(
+sparkConf: SparkConf,
+ioEncryptionKey: Option[Array[Byte]] = None)
   extends Logging with SecretKeyHolder {
 
   import SecurityManager._
@@ -415,6 +415,8 @@ private[spark] class SecurityManager(sparkConf: SparkConf)
 logInfo("Changing acls enabled to: " + aclsOn)
   }
 
+  def getIOEncryptionKey(): Option[Array[Byte]] = ioEncryptionKey
+
   /**
* Generates or looks up the secret key.
*
@@ -559,19 +561,4 @@ private[spark] object SecurityManager {
   // key used to store the spark secret in the Hadoop UGI
   val SECRET_LOOKUP_KEY = "sparkCookie"
 
-  /**
-   * Setup the cryptographic key used by IO encryption in credentials. The key 
is generated using
-   * [[KeyGenerator]]. The algorithm and key length is specified by the 

spark git commit: [SPARK-18588][SS][KAFKA] Ignore the flaky kafka test

2016-11-28 Thread zsxwing
Repository: spark
Updated Branches:
  refs/heads/branch-2.1 a0c1c699e -> 45e2b3c0e


[SPARK-18588][SS][KAFKA] Ignore the flaky kafka test

## What changes were proposed in this pull request?

Ignore the flaky test to unblock other PRs while I'm debugging it.

## How was this patch tested?

Jenkins

Author: Shixiong Zhu 

Closes #16051 from zsxwing/ignore-flaky-kafka-test.

(cherry picked from commit 1633ff3b6c97e33191859f34c868782cbb0972fd)
Signed-off-by: Shixiong Zhu 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/45e2b3c0
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/45e2b3c0
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/45e2b3c0

Branch: refs/heads/branch-2.1
Commit: 45e2b3c0e4cd5c6e1ce6d9c99950eda726d27250
Parents: a0c1c69
Author: Shixiong Zhu 
Authored: Mon Nov 28 21:04:20 2016 -0800
Committer: Shixiong Zhu 
Committed: Mon Nov 28 21:04:29 2016 -0800

--
 .../scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/45e2b3c0/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
--
diff --git 
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
index f9f6258..e1af14f 100644
--- 
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
+++ 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
@@ -838,7 +838,7 @@ class KafkaSourceStressForDontFailOnDataLossSuite extends 
StreamTest with Shared
 }
   }
 
-  test("stress test for failOnDataLoss=false") {
+  ignore("stress test for failOnDataLoss=false") {
 val reader = spark
   .readStream
   .format("kafka")


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-18588][SS][KAFKA] Ignore the flaky kafka test

2016-11-28 Thread zsxwing
Repository: spark
Updated Branches:
  refs/heads/master e64a2047e -> 1633ff3b6


[SPARK-18588][SS][KAFKA] Ignore the flaky kafka test

## What changes were proposed in this pull request?

Ignore the flaky test to unblock other PRs while I'm debugging it.

## How was this patch tested?

Jenkins

Author: Shixiong Zhu 

Closes #16051 from zsxwing/ignore-flaky-kafka-test.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1633ff3b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1633ff3b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1633ff3b

Branch: refs/heads/master
Commit: 1633ff3b6c97e33191859f34c868782cbb0972fd
Parents: e64a204
Author: Shixiong Zhu 
Authored: Mon Nov 28 21:04:20 2016 -0800
Committer: Shixiong Zhu 
Committed: Mon Nov 28 21:04:20 2016 -0800

--
 .../scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/1633ff3b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
--
diff --git 
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
index f9f6258..e1af14f 100644
--- 
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
+++ 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
@@ -838,7 +838,7 @@ class KafkaSourceStressForDontFailOnDataLossSuite extends 
StreamTest with Shared
 }
   }
 
-  test("stress test for failOnDataLoss=false") {
+  ignore("stress test for failOnDataLoss=false") {
 val reader = spark
   .readStream
   .format("kafka")


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-18523][PYSPARK] Make SparkContext.stop more reliable

2016-11-28 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/branch-2.1 cdf315ba1 -> c46928ff9


[SPARK-18523][PYSPARK] Make SparkContext.stop more reliable

## What changes were proposed in this pull request?

This PR fixes SparkContext broken state in which it may fall if spark driver 
get crashed or killed by OOM.

## How was this patch tested?

1. Start SparkContext;
2. Find Spark driver process and `kill -9` it;
3. Call `sc.stop()`;
4. Create new SparkContext after that;

Without this patch you will crash on step 3 and won't be able to do step 4 
without manual reset private attibutes or IPython notebook / shell restart.

Author: Alexander Shorin 

Closes #15961 from kxepal/18523-make-spark-context-stop-more-reliable.

(cherry picked from commit 71352c94ad2a60d1695bd7ac0f4452539270e10c)
Signed-off-by: Reynold Xin 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c46928ff
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c46928ff
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c46928ff

Branch: refs/heads/branch-2.1
Commit: c46928ff97371421613720a0d8d7f2baaa64bb73
Parents: cdf315b
Author: Alexander Shorin 
Authored: Mon Nov 28 18:28:24 2016 -0800
Committer: Reynold Xin 
Committed: Mon Nov 28 18:28:29 2016 -0800

--
 python/pyspark/context.py | 17 +++--
 1 file changed, 15 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/c46928ff/python/pyspark/context.py
--
diff --git a/python/pyspark/context.py b/python/pyspark/context.py
index 2fd3aee..5c4e79c 100644
--- a/python/pyspark/context.py
+++ b/python/pyspark/context.py
@@ -26,6 +26,8 @@ import warnings
 from threading import RLock
 from tempfile import NamedTemporaryFile
 
+from py4j.protocol import Py4JError
+
 from pyspark import accumulators
 from pyspark.accumulators import Accumulator
 from pyspark.broadcast import Broadcast
@@ -373,8 +375,19 @@ class SparkContext(object):
 Shut down the SparkContext.
 """
 if getattr(self, "_jsc", None):
-self._jsc.stop()
-self._jsc = None
+try:
+self._jsc.stop()
+except Py4JError:
+# Case: SPARK-18523
+warnings.warn(
+'Unable to cleanly shutdown Spark JVM process.'
+' It is possible that the process has crashed,'
+' been killed or may also be in a zombie state.',
+RuntimeWarning
+)
+pass
+finally:
+self._jsc = None
 if getattr(self, "_accumulatorServer", None):
 self._accumulatorServer.shutdown()
 self._accumulatorServer = None


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-18403][SQL] Fix unsafe data false sharing issue in ObjectHashAggregateExec

2016-11-28 Thread wenchen
Repository: spark
Updated Branches:
  refs/heads/master 05f7c6ffa -> 2e809903d


[SPARK-18403][SQL] Fix unsafe data false sharing issue in 
ObjectHashAggregateExec

## What changes were proposed in this pull request?

This PR fixes a random OOM issue occurred while running 
`ObjectHashAggregateSuite`.

This issue can be steadily reproduced under the following conditions:

1. The aggregation must be evaluated using `ObjectHashAggregateExec`;
2. There must be an input column whose data type involves `ArrayType` (an input 
column of `MapType` may even cause SIGSEGV);
3. Sort-based aggregation fallback must be triggered during evaluation.

The root cause is that while falling back to sort-based aggregation, we must 
sort and feed already evaluated partial aggregation buffers living in the hash 
map to the sort-based aggregator using an external sorter. However, the 
underlying mutable byte buffer of `UnsafeRow`s produced by the iterator of the 
external sorter is reused and may get overwritten when the iterator steps 
forward. After the last entry is consumed, the byte buffer points to a block of 
uninitialized memory filled by `5a`. Therefore, while reading an 
`UnsafeArrayData` out of the `UnsafeRow`, `5a5a5a5a` is treated as array size 
and triggers a memory allocation for a ridiculously large array and immediately 
blows up the JVM with an OOM.

To fix this issue, we only need to add `.copy()` accordingly.

## How was this patch tested?

New regression test case added in `ObjectHashAggregateSuite`.

Author: Cheng Lian 

Closes #15976 from liancheng/investigate-oom.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2e809903
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2e809903
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2e809903

Branch: refs/heads/master
Commit: 2e809903d459b5b5aa6fd882b5c4a0c915af4d43
Parents: 05f7c6f
Author: Cheng Lian 
Authored: Tue Nov 29 09:01:03 2016 +0800
Committer: Wenchen Fan 
Committed: Tue Nov 29 09:01:03 2016 +0800

--
 .../aggregate/ObjectAggregationIterator.scala   |  11 +-
 .../execution/ObjectHashAggregateSuite.scala| 164 +++
 2 files changed, 101 insertions(+), 74 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/2e809903/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/ObjectAggregationIterator.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/ObjectAggregationIterator.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/ObjectAggregationIterator.scala
index 3c7b9ee..3a7fcf1 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/ObjectAggregationIterator.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/ObjectAggregationIterator.scala
@@ -262,7 +262,9 @@ class SortBasedAggregator(
   // Firstly, update the aggregation buffer with input rows.
   while (hasNextInput &&
 groupingKeyOrdering.compare(inputIterator.getKey, groupingKey) == 
0) {
-processRow(result.aggregationBuffer, inputIterator.getValue)
+// Since `inputIterator.getValue` is an `UnsafeRow` whose 
underlying buffer will be
+// overwritten when `inputIterator` steps forward, we need to do a 
deep copy here.
+processRow(result.aggregationBuffer, inputIterator.getValue.copy())
 hasNextInput = inputIterator.next()
   }
 
@@ -271,7 +273,12 @@ class SortBasedAggregator(
   // be called after calling processRow.
   while (hasNextAggBuffer &&
 groupingKeyOrdering.compare(initialAggBufferIterator.getKey, 
groupingKey) == 0) {
-mergeAggregationBuffers(result.aggregationBuffer, 
initialAggBufferIterator.getValue)
+mergeAggregationBuffers(
+  result.aggregationBuffer,
+  // Since `inputIterator.getValue` is an `UnsafeRow` whose 
underlying buffer will be
+  // overwritten when `inputIterator` steps forward, we need to do 
a deep copy here.
+  initialAggBufferIterator.getValue.copy()
+)
 hasNextAggBuffer = initialAggBufferIterator.next()
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/2e809903/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ObjectHashAggregateSuite.scala
--
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ObjectHashAggregateSuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ObjectHashAggregateSuite.scala
index 

spark git commit: [SPARK-18408][ML] API Improvements for LSH

2016-11-28 Thread jkbradley
Repository: spark
Updated Branches:
  refs/heads/branch-2.1 75d73d13e -> cdf315ba1


[SPARK-18408][ML] API Improvements for LSH

## What changes were proposed in this pull request?

(1) Change output schema to `Array of Vector` instead of `Vectors`
(2) Use `numHashTables` as the dimension of Array
(3) Rename `RandomProjection` to `BucketedRandomProjectionLSH`, `MinHash` to 
`MinHashLSH`
(4) Make `randUnitVectors/randCoefficients` private
(5) Make Multi-Probe NN Search and `hashDistance` private for future discussion

Saved for future PRs:
(1) AND-amplification and `numHashFunctions` as the dimension of Vector are 
saved for a future PR.
(2) `hashDistance` and MultiProbe NN Search needs more discussion. The current 
implementation is just a backward compatible one.

## How was this patch tested?
Related unit tests are modified to make sure the performance of LSH are 
ensured, and the outputs of the APIs meets expectation.

Author: Yun Ni 
Author: Yunni 

Closes #15874 from Yunni/SPARK-18408-yunn-api-improvements.

(cherry picked from commit 05f7c6ffab2a6be548375cd624dc27092677232f)
Signed-off-by: Joseph K. Bradley 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/cdf315ba
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/cdf315ba
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/cdf315ba

Branch: refs/heads/branch-2.1
Commit: cdf315ba1bd732291f05756281070eb7aa4e123f
Parents: 75d73d1
Author: Yun Ni 
Authored: Mon Nov 28 15:14:46 2016 -0800
Committer: Joseph K. Bradley 
Committed: Mon Nov 28 15:14:55 2016 -0800

--
 .../feature/BucketedRandomProjectionLSH.scala   | 234 +++
 .../scala/org/apache/spark/ml/feature/LSH.scala | 138 ++-
 .../org/apache/spark/ml/feature/MinHash.scala   | 195 
 .../apache/spark/ml/feature/MinHashLSH.scala| 201 
 .../spark/ml/feature/RandomProjection.scala | 225 --
 .../BucketedRandomProjectionLSHSuite.scala  | 213 +
 .../org/apache/spark/ml/feature/LSHTest.scala   |  17 +-
 .../spark/ml/feature/MinHashLSHSuite.scala  | 161 +
 .../apache/spark/ml/feature/MinHashSuite.scala  | 126 --
 .../ml/feature/RandomProjectionSuite.scala  | 197 
 10 files changed, 896 insertions(+), 811 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/cdf315ba/mllib/src/main/scala/org/apache/spark/ml/feature/BucketedRandomProjectionLSH.scala
--
diff --git 
a/mllib/src/main/scala/org/apache/spark/ml/feature/BucketedRandomProjectionLSH.scala
 
b/mllib/src/main/scala/org/apache/spark/ml/feature/BucketedRandomProjectionLSH.scala
new file mode 100644
index 000..cbac163
--- /dev/null
+++ 
b/mllib/src/main/scala/org/apache/spark/ml/feature/BucketedRandomProjectionLSH.scala
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ml.feature
+
+import scala.util.Random
+
+import breeze.linalg.normalize
+import org.apache.hadoop.fs.Path
+
+import org.apache.spark.annotation.{Experimental, Since}
+import org.apache.spark.ml.linalg._
+import org.apache.spark.ml.param._
+import org.apache.spark.ml.param.shared.HasSeed
+import org.apache.spark.ml.util._
+import org.apache.spark.mllib.util.MLUtils
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.types.StructType
+
+/**
+ * :: Experimental ::
+ *
+ * Params for [[BucketedRandomProjectionLSH]].
+ */
+private[ml] trait BucketedRandomProjectionLSHParams extends Params {
+
+  /**
+   * The length of each hash bucket, a larger bucket lowers the false negative 
rate. The number of
+   * buckets will be `(max L2 norm of input vectors) / bucketLength`.
+   *
+   *
+   * If input vectors are normalized, 1-10 times of pow(numRecords, 
-1/inputDim) would be a
+   * reasonable value
+   * @group param
+   */
+  val bucketLength: DoubleParam = 

spark git commit: [SPARK-18408][ML] API Improvements for LSH

2016-11-28 Thread jkbradley
Repository: spark
Updated Branches:
  refs/heads/master 8b1609beb -> 05f7c6ffa


[SPARK-18408][ML] API Improvements for LSH

## What changes were proposed in this pull request?

(1) Change output schema to `Array of Vector` instead of `Vectors`
(2) Use `numHashTables` as the dimension of Array
(3) Rename `RandomProjection` to `BucketedRandomProjectionLSH`, `MinHash` to 
`MinHashLSH`
(4) Make `randUnitVectors/randCoefficients` private
(5) Make Multi-Probe NN Search and `hashDistance` private for future discussion

Saved for future PRs:
(1) AND-amplification and `numHashFunctions` as the dimension of Vector are 
saved for a future PR.
(2) `hashDistance` and MultiProbe NN Search needs more discussion. The current 
implementation is just a backward compatible one.

## How was this patch tested?
Related unit tests are modified to make sure the performance of LSH are 
ensured, and the outputs of the APIs meets expectation.

Author: Yun Ni 
Author: Yunni 

Closes #15874 from Yunni/SPARK-18408-yunn-api-improvements.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/05f7c6ff
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/05f7c6ff
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/05f7c6ff

Branch: refs/heads/master
Commit: 05f7c6ffab2a6be548375cd624dc27092677232f
Parents: 8b1609b
Author: Yun Ni 
Authored: Mon Nov 28 15:14:46 2016 -0800
Committer: Joseph K. Bradley 
Committed: Mon Nov 28 15:14:46 2016 -0800

--
 .../feature/BucketedRandomProjectionLSH.scala   | 234 +++
 .../scala/org/apache/spark/ml/feature/LSH.scala | 138 ++-
 .../org/apache/spark/ml/feature/MinHash.scala   | 195 
 .../apache/spark/ml/feature/MinHashLSH.scala| 201 
 .../spark/ml/feature/RandomProjection.scala | 225 --
 .../BucketedRandomProjectionLSHSuite.scala  | 213 +
 .../org/apache/spark/ml/feature/LSHTest.scala   |  17 +-
 .../spark/ml/feature/MinHashLSHSuite.scala  | 161 +
 .../apache/spark/ml/feature/MinHashSuite.scala  | 126 --
 .../ml/feature/RandomProjectionSuite.scala  | 197 
 10 files changed, 896 insertions(+), 811 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/05f7c6ff/mllib/src/main/scala/org/apache/spark/ml/feature/BucketedRandomProjectionLSH.scala
--
diff --git 
a/mllib/src/main/scala/org/apache/spark/ml/feature/BucketedRandomProjectionLSH.scala
 
b/mllib/src/main/scala/org/apache/spark/ml/feature/BucketedRandomProjectionLSH.scala
new file mode 100644
index 000..cbac163
--- /dev/null
+++ 
b/mllib/src/main/scala/org/apache/spark/ml/feature/BucketedRandomProjectionLSH.scala
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ml.feature
+
+import scala.util.Random
+
+import breeze.linalg.normalize
+import org.apache.hadoop.fs.Path
+
+import org.apache.spark.annotation.{Experimental, Since}
+import org.apache.spark.ml.linalg._
+import org.apache.spark.ml.param._
+import org.apache.spark.ml.param.shared.HasSeed
+import org.apache.spark.ml.util._
+import org.apache.spark.mllib.util.MLUtils
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.types.StructType
+
+/**
+ * :: Experimental ::
+ *
+ * Params for [[BucketedRandomProjectionLSH]].
+ */
+private[ml] trait BucketedRandomProjectionLSHParams extends Params {
+
+  /**
+   * The length of each hash bucket, a larger bucket lowers the false negative 
rate. The number of
+   * buckets will be `(max L2 norm of input vectors) / bucketLength`.
+   *
+   *
+   * If input vectors are normalized, 1-10 times of pow(numRecords, 
-1/inputDim) would be a
+   * reasonable value
+   * @group param
+   */
+  val bucketLength: DoubleParam = new DoubleParam(this, "bucketLength",
+"the length of each hash bucket, a larger bucket lowers the false negative 
rate.",
+

spark git commit: [SPARK-18553][CORE][BRANCH-2.0] Fix leak of TaskSetManager following executor loss

2016-11-28 Thread joshrosen
Repository: spark
Updated Branches:
  refs/heads/branch-2.0 f158045fd -> 9ff03fa23


[SPARK-18553][CORE][BRANCH-2.0] Fix leak of TaskSetManager following executor 
loss

## What changes were proposed in this pull request?

This patch fixes a critical resource leak in the TaskScheduler which could 
cause RDDs and ShuffleDependencies to be kept alive indefinitely if an executor 
with running tasks is permanently lost and the associated stage fails.

This problem was originally identified by analyzing the heap dump of a driver 
belonging to a cluster that had run out of shuffle space. This dump contained 
several `ShuffleDependency` instances that were retained by `TaskSetManager`s 
inside the scheduler but were not otherwise referenced. Each of these 
`TaskSetManager`s was considered a "zombie" but had no running tasks and 
therefore should have been cleaned up. However, these zombie task sets were 
still referenced by the `TaskSchedulerImpl.taskIdToTaskSetManager` map.

Entries are added to the `taskIdToTaskSetManager` map when tasks are launched 
and are removed inside of `TaskScheduler.statusUpdate()`, which is invoked by 
the scheduler backend while processing `StatusUpdate` messages from executors. 
The problem with this design is that a completely dead executor will never send 
a `StatusUpdate`. There is [some 
code](https://github.com/apache/spark/blob/072f4c518cdc57d705beec6bcc3113d9a6740819/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala#L338)
 in `statusUpdate` which handles tasks that exit with the `TaskState.LOST` 
state (which is supposed to correspond to a task failure triggered by total 
executor loss), but this state only seems to be used in Mesos fine-grained 
mode. There doesn't seem to be any code which performs per-task state cleanup 
for tasks that were running on an executor that completely disappears without 
sending any sort of final death message. The `executorLost` and 
[`removeExecutor`](https://github.com/apache
 
/spark/blob/072f4c518cdc57d705beec6bcc3113d9a6740819/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala#L527)
 methods don't appear to perform any cleanup of the `taskId -> *` mappings, 
causing the leaks observed here.

This patch's fix is to maintain a `executorId -> running task id` mapping so 
that these `taskId -> *` maps can be properly cleaned up following an executor 
loss.

There are some potential corner-case interactions that I'm concerned about 
here, especially some details in [the 
comment](https://github.com/apache/spark/blob/072f4c518cdc57d705beec6bcc3113d9a6740819/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala#L523)
 in `removeExecutor`, so I'd appreciate a very careful review of these changes.

This PR is opened against branch-2.0, where I first observed this problem, but 
will also need to be fixed in master, branch-2.1, and branch-1.6 (which I'll do 
in followup PRs after this fix is reviewed and merged).

## How was this patch tested?

I added a new unit test to `TaskSchedulerImplSuite`. You can check out this PR 
as of 25e455e711b978cd331ee0f484f70fde31307634 to see the failing test.

cc kayousterhout, markhamstra, rxin for review.

Author: Josh Rosen 

Closes #15986 from JoshRosen/fix-leak-following-total-executor-loss.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9ff03fa2
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9ff03fa2
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9ff03fa2

Branch: refs/heads/branch-2.0
Commit: 9ff03fa23e664bc0241914c7b5a7bda0c38eec15
Parents: f158045
Author: Josh Rosen 
Authored: Mon Nov 28 13:17:24 2016 -0800
Committer: Josh Rosen 
Committed: Mon Nov 28 13:17:24 2016 -0800

--
 .../spark/scheduler/TaskSchedulerImpl.scala | 80 
 .../StandaloneDynamicAllocationSuite.scala  |  7 +-
 .../scheduler/TaskSchedulerImplSuite.scala  | 68 +
 3 files changed, 120 insertions(+), 35 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/9ff03fa2/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala 
b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
index d22321b..b2ef41e 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
@@ -88,10 +88,12 @@ private[spark] class TaskSchedulerImpl(
   // Incrementing task IDs
   val nextTaskId = new AtomicLong(0)
 
-  // Number of tasks running on each executor
-  private 

spark git commit: [SPARK-18117][CORE] Add test for TaskSetBlacklist

2016-11-28 Thread irashid
Repository: spark
Updated Branches:
  refs/heads/master ad67993b7 -> 8b1609beb


[SPARK-18117][CORE] Add test for TaskSetBlacklist

## What changes were proposed in this pull request?

This adds tests to verify the interaction between TaskSetBlacklist and
TaskSchedulerImpl.  TaskSetBlacklist was introduced by SPARK-17675 but
it neglected to add these tests.

This change does not fix any bugs -- it is just for increasing test
coverage.
## How was this patch tested?

Jenkins

Author: Imran Rashid 

Closes #15644 from squito/taskset_blacklist_test_update.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8b1609be
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8b1609be
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8b1609be

Branch: refs/heads/master
Commit: 8b1609bebe489b2ef78db4be6e9836687089fe3d
Parents: ad67993
Author: Imran Rashid 
Authored: Mon Nov 28 13:47:09 2016 -0600
Committer: Imran Rashid 
Committed: Mon Nov 28 13:47:09 2016 -0600

--
 .../apache/spark/scheduler/TaskSetManager.scala |   2 +-
 .../scheduler/TaskSchedulerImplSuite.scala  | 254 ++-
 .../spark/scheduler/TaskSetManagerSuite.scala   |  45 +++-
 3 files changed, 292 insertions(+), 9 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/8b1609be/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala 
b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
index b766e41..f2a432c 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
@@ -84,7 +84,7 @@ private[spark] class TaskSetManager(
   var totalResultSize = 0L
   var calculatedTasks = 0
 
-  private val taskSetBlacklistHelperOpt: Option[TaskSetBlacklist] = {
+  private[scheduler] val taskSetBlacklistHelperOpt: Option[TaskSetBlacklist] = 
{
 if (BlacklistTracker.isBlacklistEnabled(conf)) {
   Some(new TaskSetBlacklist(conf, stageId, clock))
 } else {

http://git-wip-us.apache.org/repos/asf/spark/blob/8b1609be/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
--
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala 
b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
index f5f1947..5dc7708 100644
--- 
a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
@@ -17,7 +17,12 @@
 
 package org.apache.spark.scheduler
 
+import scala.collection.mutable.HashMap
+
+import org.mockito.Matchers.{anyInt, anyString, eq => meq}
+import org.mockito.Mockito.{atLeast, atMost, never, spy, verify, when}
 import org.scalatest.BeforeAndAfterEach
+import org.scalatest.mock.MockitoSugar
 
 import org.apache.spark._
 import org.apache.spark.internal.config
@@ -31,7 +36,7 @@ class FakeSchedulerBackend extends SchedulerBackend {
 }
 
 class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with 
BeforeAndAfterEach
-with Logging {
+with Logging with MockitoSugar {
 
   var failedTaskSetException: Option[Throwable] = None
   var failedTaskSetReason: String = null
@@ -40,11 +45,16 @@ class TaskSchedulerImplSuite extends SparkFunSuite with 
LocalSparkContext with B
   var taskScheduler: TaskSchedulerImpl = null
   var dagScheduler: DAGScheduler = null
 
+  val stageToMockTaskSetBlacklist = new HashMap[Int, TaskSetBlacklist]()
+  val stageToMockTaskSetManager = new HashMap[Int, TaskSetManager]()
+
   override def beforeEach(): Unit = {
 super.beforeEach()
 failedTaskSet = false
 failedTaskSetException = None
 failedTaskSetReason = null
+stageToMockTaskSetBlacklist.clear()
+stageToMockTaskSetManager.clear()
   }
 
   override def afterEach(): Unit = {
@@ -66,6 +76,30 @@ class TaskSchedulerImplSuite extends SparkFunSuite with 
LocalSparkContext with B
 }
 sc = new SparkContext(conf)
 taskScheduler = new TaskSchedulerImpl(sc)
+setupHelper()
+  }
+
+  def setupSchedulerWithMockTaskSetBlacklist(): TaskSchedulerImpl = {
+val conf = new 
SparkConf().setMaster("local").setAppName("TaskSchedulerImplSuite")
+conf.set(config.BLACKLIST_ENABLED, true)
+sc = new SparkContext(conf)
+taskScheduler =
+  new TaskSchedulerImpl(sc, sc.conf.getInt("spark.task.maxFailures", 4)) {
+override def createTaskSetManager(taskSet: TaskSet, maxFailures: Int): 
TaskSetManager = {
+  val 

[1/2] spark git commit: Preparing Spark release v2.1.0-rc1

2016-11-28 Thread pwendell
Repository: spark
Updated Branches:
  refs/heads/branch-2.1 b386943b2 -> 75d73d13e


Preparing Spark release v2.1.0-rc1


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/80aabc0b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/80aabc0b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/80aabc0b

Branch: refs/heads/branch-2.1
Commit: 80aabc0bd33dc5661a90133156247e7a8c1bf7f5
Parents: b386943
Author: Patrick Wendell 
Authored: Mon Nov 28 11:48:12 2016 -0800
Committer: Patrick Wendell 
Committed: Mon Nov 28 11:48:12 2016 -0800

--
 assembly/pom.xml  | 2 +-
 common/network-common/pom.xml | 2 +-
 common/network-shuffle/pom.xml| 2 +-
 common/network-yarn/pom.xml   | 2 +-
 common/sketch/pom.xml | 2 +-
 common/tags/pom.xml   | 2 +-
 common/unsafe/pom.xml | 2 +-
 core/pom.xml  | 2 +-
 docs/_config.yml  | 2 +-
 examples/pom.xml  | 2 +-
 external/docker-integration-tests/pom.xml | 2 +-
 external/flume-assembly/pom.xml   | 2 +-
 external/flume-sink/pom.xml   | 2 +-
 external/flume/pom.xml| 2 +-
 external/java8-tests/pom.xml  | 2 +-
 external/kafka-0-10-assembly/pom.xml  | 2 +-
 external/kafka-0-10-sql/pom.xml   | 2 +-
 external/kafka-0-10/pom.xml   | 2 +-
 external/kafka-0-8-assembly/pom.xml   | 2 +-
 external/kafka-0-8/pom.xml| 2 +-
 external/kinesis-asl-assembly/pom.xml | 2 +-
 external/kinesis-asl/pom.xml  | 2 +-
 external/spark-ganglia-lgpl/pom.xml   | 2 +-
 graphx/pom.xml| 2 +-
 launcher/pom.xml  | 2 +-
 mesos/pom.xml | 2 +-
 mllib-local/pom.xml   | 2 +-
 mllib/pom.xml | 2 +-
 pom.xml   | 2 +-
 python/pyspark/version.py | 2 +-
 repl/pom.xml  | 2 +-
 sql/catalyst/pom.xml  | 2 +-
 sql/core/pom.xml  | 2 +-
 sql/hive-thriftserver/pom.xml | 2 +-
 sql/hive/pom.xml  | 2 +-
 streaming/pom.xml | 2 +-
 tools/pom.xml | 2 +-
 yarn/pom.xml  | 2 +-
 38 files changed, 38 insertions(+), 38 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/80aabc0b/assembly/pom.xml
--
diff --git a/assembly/pom.xml b/assembly/pom.xml
index ec243ea..aebfd12 100644
--- a/assembly/pom.xml
+++ b/assembly/pom.xml
@@ -21,7 +21,7 @@
   
 org.apache.spark
 spark-parent_2.11
-2.1.0-SNAPSHOT
+2.1.0
 ../pom.xml
   
 

http://git-wip-us.apache.org/repos/asf/spark/blob/80aabc0b/common/network-common/pom.xml
--
diff --git a/common/network-common/pom.xml b/common/network-common/pom.xml
index fcefe64..67d78d5 100644
--- a/common/network-common/pom.xml
+++ b/common/network-common/pom.xml
@@ -22,7 +22,7 @@
   
 org.apache.spark
 spark-parent_2.11
-2.1.0-SNAPSHOT
+2.1.0
 ../../pom.xml
   
 

http://git-wip-us.apache.org/repos/asf/spark/blob/80aabc0b/common/network-shuffle/pom.xml
--
diff --git a/common/network-shuffle/pom.xml b/common/network-shuffle/pom.xml
index 511e1f2..9379097 100644
--- a/common/network-shuffle/pom.xml
+++ b/common/network-shuffle/pom.xml
@@ -22,7 +22,7 @@
   
 org.apache.spark
 spark-parent_2.11
-2.1.0-SNAPSHOT
+2.1.0
 ../../pom.xml
   
 

http://git-wip-us.apache.org/repos/asf/spark/blob/80aabc0b/common/network-yarn/pom.xml
--
diff --git a/common/network-yarn/pom.xml b/common/network-yarn/pom.xml
index 606ad15..53cb8dd 100644
--- a/common/network-yarn/pom.xml
+++ b/common/network-yarn/pom.xml
@@ -22,7 +22,7 @@
   
 org.apache.spark
 spark-parent_2.11
-2.1.0-SNAPSHOT
+2.1.0
 ../../pom.xml
   
 

http://git-wip-us.apache.org/repos/asf/spark/blob/80aabc0b/common/sketch/pom.xml
--
diff --git a/common/sketch/pom.xml b/common/sketch/pom.xml
index 626f023..89bee85 100644
--- a/common/sketch/pom.xml
+++ b/common/sketch/pom.xml
@@ -22,7 +22,7 @@
   
 org.apache.spark
 spark-parent_2.11
-2.1.0-SNAPSHOT
+2.1.0
 ../../pom.xml
   
 


[2/2] spark git commit: Preparing development version 2.1.1-SNAPSHOT

2016-11-28 Thread pwendell
Preparing development version 2.1.1-SNAPSHOT


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/75d73d13
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/75d73d13
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/75d73d13

Branch: refs/heads/branch-2.1
Commit: 75d73d13e82aa88a7043d60b041b97fdb19e49b9
Parents: 80aabc0
Author: Patrick Wendell 
Authored: Mon Nov 28 11:48:21 2016 -0800
Committer: Patrick Wendell 
Committed: Mon Nov 28 11:48:21 2016 -0800

--
 R/pkg/DESCRIPTION | 2 +-
 assembly/pom.xml  | 2 +-
 common/network-common/pom.xml | 2 +-
 common/network-shuffle/pom.xml| 2 +-
 common/network-yarn/pom.xml   | 2 +-
 common/sketch/pom.xml | 2 +-
 common/tags/pom.xml   | 2 +-
 common/unsafe/pom.xml | 2 +-
 core/pom.xml  | 2 +-
 docs/_config.yml  | 4 ++--
 examples/pom.xml  | 2 +-
 external/docker-integration-tests/pom.xml | 2 +-
 external/flume-assembly/pom.xml   | 2 +-
 external/flume-sink/pom.xml   | 2 +-
 external/flume/pom.xml| 2 +-
 external/java8-tests/pom.xml  | 2 +-
 external/kafka-0-10-assembly/pom.xml  | 2 +-
 external/kafka-0-10-sql/pom.xml   | 2 +-
 external/kafka-0-10/pom.xml   | 2 +-
 external/kafka-0-8-assembly/pom.xml   | 2 +-
 external/kafka-0-8/pom.xml| 2 +-
 external/kinesis-asl-assembly/pom.xml | 2 +-
 external/kinesis-asl/pom.xml  | 2 +-
 external/spark-ganglia-lgpl/pom.xml   | 2 +-
 graphx/pom.xml| 2 +-
 launcher/pom.xml  | 2 +-
 mesos/pom.xml | 2 +-
 mllib-local/pom.xml   | 2 +-
 mllib/pom.xml | 2 +-
 pom.xml   | 2 +-
 python/pyspark/version.py | 2 +-
 repl/pom.xml  | 2 +-
 sql/catalyst/pom.xml  | 2 +-
 sql/core/pom.xml  | 2 +-
 sql/hive-thriftserver/pom.xml | 2 +-
 sql/hive/pom.xml  | 2 +-
 streaming/pom.xml | 2 +-
 tools/pom.xml | 2 +-
 yarn/pom.xml  | 2 +-
 39 files changed, 40 insertions(+), 40 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/75d73d13/R/pkg/DESCRIPTION
--
diff --git a/R/pkg/DESCRIPTION b/R/pkg/DESCRIPTION
index 981ae12..46fb178 100644
--- a/R/pkg/DESCRIPTION
+++ b/R/pkg/DESCRIPTION
@@ -1,7 +1,7 @@
 Package: SparkR
 Type: Package
 Title: R Frontend for Apache Spark
-Version: 2.1.0
+Version: 2.1.1
 Date: 2016-11-06
 Authors@R: c(person("Shivaram", "Venkataraman", role = c("aut", "cre"),
 email = "shiva...@cs.berkeley.edu"),

http://git-wip-us.apache.org/repos/asf/spark/blob/75d73d13/assembly/pom.xml
--
diff --git a/assembly/pom.xml b/assembly/pom.xml
index aebfd12..29522fd 100644
--- a/assembly/pom.xml
+++ b/assembly/pom.xml
@@ -21,7 +21,7 @@
   
 org.apache.spark
 spark-parent_2.11
-2.1.0
+2.1.1-SNAPSHOT
 ../pom.xml
   
 

http://git-wip-us.apache.org/repos/asf/spark/blob/75d73d13/common/network-common/pom.xml
--
diff --git a/common/network-common/pom.xml b/common/network-common/pom.xml
index 67d78d5..85644c4 100644
--- a/common/network-common/pom.xml
+++ b/common/network-common/pom.xml
@@ -22,7 +22,7 @@
   
 org.apache.spark
 spark-parent_2.11
-2.1.0
+2.1.1-SNAPSHOT
 ../../pom.xml
   
 

http://git-wip-us.apache.org/repos/asf/spark/blob/75d73d13/common/network-shuffle/pom.xml
--
diff --git a/common/network-shuffle/pom.xml b/common/network-shuffle/pom.xml
index 9379097..e15ede9 100644
--- a/common/network-shuffle/pom.xml
+++ b/common/network-shuffle/pom.xml
@@ -22,7 +22,7 @@
   
 org.apache.spark
 spark-parent_2.11
-2.1.0
+2.1.1-SNAPSHOT
 ../../pom.xml
   
 

http://git-wip-us.apache.org/repos/asf/spark/blob/75d73d13/common/network-yarn/pom.xml
--
diff --git a/common/network-yarn/pom.xml b/common/network-yarn/pom.xml
index 53cb8dd..c93a355 100644
--- a/common/network-yarn/pom.xml
+++ b/common/network-yarn/pom.xml
@@ -22,7 +22,7 @@
   
 org.apache.spark
 spark-parent_2.11
-2.1.0
+  

[spark] Git Push Summary

2016-11-28 Thread pwendell
Repository: spark
Updated Tags:  refs/tags/v2.1.0-rc1 [created] 80aabc0bd

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-18597][SQL] Do not push-down join conditions to the left side of a Left Anti join [BRANCH-2.0]

2016-11-28 Thread hvanhovell
Repository: spark
Updated Branches:
  refs/heads/branch-2.0 759bd4a6a -> f158045fd


[SPARK-18597][SQL] Do not push-down join conditions to the left side of a Left 
Anti join [BRANCH-2.0]

## What changes were proposed in this pull request?
We currently push down join conditions of a Left Anti join to both sides of the 
join. This is similar to Inner, Left Semi and Existence (a specialized left 
semi) join. The problem is that this changes the semantics of the join; a left 
anti join filters out rows that matches the join condition.

This PR fixes this by only pushing down conditions to the right hand side of 
the join. This is similar to the behavior of left outer join.

This PR is a backport of https://github.com/apache/spark/pull/16026

## How was this patch tested?
Added tests to `FilterPushdownSuite.scala` and created a SQLQueryTestSuite file 
for left anti joins with a regression test.

Author: Herman van Hovell 

Closes #16039 from hvanhovell/SPARK-18597-branch-2.0.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f158045f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f158045f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f158045f

Branch: refs/heads/branch-2.0
Commit: f158045fde14b3018493e9059e9dcb2095f11c54
Parents: 759bd4a
Author: Herman van Hovell 
Authored: Mon Nov 28 11:20:59 2016 -0800
Committer: Herman van Hovell 
Committed: Mon Nov 28 11:20:59 2016 -0800

--
 .../sql/catalyst/optimizer/Optimizer.scala  |  6 ++--
 .../optimizer/FilterPushdownSuite.scala | 33 
 .../resources/sql-tests/inputs/anti-join.sql|  7 +
 .../sql-tests/results/anti-join.sql.out | 29 +
 4 files changed, 72 insertions(+), 3 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/f158045f/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index 0a28ef4..3a71463 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -1289,7 +1289,7 @@ object PushPredicateThroughJoin extends Rule[LogicalPlan] 
with PredicateHelper {
 split(joinCondition.map(splitConjunctivePredicates).getOrElse(Nil), 
left, right)
 
   joinType match {
-case Inner | LeftExistence(_) =>
+case Inner |  LeftSemi | ExistenceJoin(_) =>
   // push down the single side only join filter for both sides sub 
queries
   val newLeft = leftJoinConditions.
 reduceLeftOption(And).map(Filter(_, left)).getOrElse(left)
@@ -1306,14 +1306,14 @@ object PushPredicateThroughJoin extends 
Rule[LogicalPlan] with PredicateHelper {
   val newJoinCond = (rightJoinConditions ++ 
commonJoinCondition).reduceLeftOption(And)
 
   Join(newLeft, newRight, RightOuter, newJoinCond)
-case LeftOuter =>
+case LeftOuter | LeftAnti =>
   // push down the right side only join filter for right sub query
   val newLeft = left
   val newRight = rightJoinConditions.
 reduceLeftOption(And).map(Filter(_, right)).getOrElse(right)
   val newJoinCond = (leftJoinConditions ++ 
commonJoinCondition).reduceLeftOption(And)
 
-  Join(newLeft, newRight, LeftOuter, newJoinCond)
+  Join(newLeft, newRight, joinType, newJoinCond)
 case FullOuter => j
 case NaturalJoin(_) => sys.error("Untransformed NaturalJoin node")
 case UsingJoin(_, _) => sys.error("Untransformed Using join node")

http://git-wip-us.apache.org/repos/asf/spark/blob/f158045f/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
--
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
index 019f132..3e67282 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
@@ -514,6 +514,39 @@ class FilterPushdownSuite extends PlanTest {
 comparePlans(optimized, analysis.EliminateSubqueryAliases(correctAnswer))
   }
 
+  test("joins: push down where clause into left anti join") {
+val 

spark git commit: [SPARK-17680][SQL][TEST] Added test cases for InMemoryRelation

2016-11-28 Thread andrewor14
Repository: spark
Updated Branches:
  refs/heads/branch-2.1 81e3f9711 -> b386943b2


[SPARK-17680][SQL][TEST] Added test cases for InMemoryRelation

## What changes were proposed in this pull request?

This pull request adds test cases for the following cases:
- keep all data types with null or without null
- access `CachedBatch` disabling whole stage codegen
- access only some columns in `CachedBatch`

This PR is a part of https://github.com/apache/spark/pull/15219. Here are 
motivations to add these tests. When https://github.com/apache/spark/pull/15219 
is enabled, the first two cases are handled by specialized (generated) code. 
The third one is a pitfall.

In general, even for now, it would be helpful to increase test coverage.
## How was this patch tested?

added test suites itself

Author: Kazuaki Ishizaki 

Closes #15462 from kiszk/columnartestsuites.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b386943b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b386943b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b386943b

Branch: refs/heads/branch-2.1
Commit: b386943b2fe6af5237270bfa520295c1711bb341
Parents: 81e3f97
Author: Kazuaki Ishizaki 
Authored: Mon Nov 28 14:06:37 2016 -0500
Committer: Andrew Or 
Committed: Mon Nov 28 14:07:34 2016 -0500

--
 .../columnar/InMemoryColumnarQuerySuite.scala   | 148 ++-
 1 file changed, 146 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/b386943b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala
--
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala
index b272c8e..afeb478 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala
@@ -20,18 +20,96 @@ package org.apache.spark.sql.execution.columnar
 import java.nio.charset.StandardCharsets
 import java.sql.{Date, Timestamp}
 
-import org.apache.spark.sql.{QueryTest, Row}
+import org.apache.spark.sql.{DataFrame, QueryTest, Row}
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.SharedSQLContext
 import org.apache.spark.sql.test.SQLTestData._
 import org.apache.spark.sql.types._
-import org.apache.spark.storage.StorageLevel.MEMORY_ONLY
+import org.apache.spark.storage.StorageLevel._
 
 class InMemoryColumnarQuerySuite extends QueryTest with SharedSQLContext {
   import testImplicits._
 
   setupTestData()
 
+  private def cachePrimitiveTest(data: DataFrame, dataType: String) {
+data.createOrReplaceTempView(s"testData$dataType")
+val storageLevel = MEMORY_ONLY
+val plan = spark.sessionState.executePlan(data.logicalPlan).sparkPlan
+val inMemoryRelation = InMemoryRelation(useCompression = true, 5, 
storageLevel, plan, None)
+
+assert(inMemoryRelation.cachedColumnBuffers.getStorageLevel == 
storageLevel)
+inMemoryRelation.cachedColumnBuffers.collect().head match {
+  case _: CachedBatch =>
+  case other => fail(s"Unexpected cached batch type: 
${other.getClass.getName}")
+}
+checkAnswer(inMemoryRelation, data.collect().toSeq)
+  }
+
+  private def testPrimitiveType(nullability: Boolean): Unit = {
+val dataTypes = Seq(BooleanType, ByteType, ShortType, IntegerType, 
LongType,
+  FloatType, DoubleType, DateType, TimestampType, DecimalType(25, 5), 
DecimalType(6, 5))
+val schema = StructType(dataTypes.zipWithIndex.map { case (dataType, 
index) =>
+  StructField(s"col$index", dataType, nullability)
+})
+val rdd = spark.sparkContext.parallelize((1 to 10).map(i => Row(
+  if (nullability && i % 3 == 0) null else if (i % 2 == 0) true else false,
+  if (nullability && i % 3 == 0) null else i.toByte,
+  if (nullability && i % 3 == 0) null else i.toShort,
+  if (nullability && i % 3 == 0) null else i.toInt,
+  if (nullability && i % 3 == 0) null else i.toLong,
+  if (nullability && i % 3 == 0) null else (i + 0.25).toFloat,
+  if (nullability && i % 3 == 0) null else (i + 0.75).toDouble,
+  if (nullability && i % 3 == 0) null else new Date(i),
+  if (nullability && i % 3 == 0) null else new Timestamp(i * 100L),
+  if (nullability && i % 3 == 0) null else 
BigDecimal(Long.MaxValue.toString + ".12345"),
+  if (nullability && i % 3 == 0) null
+  else new java.math.BigDecimal(s"${i % 9 + 1}" + ".23456")
+)))
+

spark git commit: [SPARK-16282][SQL] Implement percentile SQL function.

2016-11-28 Thread hvanhovell
Repository: spark
Updated Branches:
  refs/heads/branch-2.1 4d7947856 -> 81e3f9711


[SPARK-16282][SQL] Implement percentile SQL function.

## What changes were proposed in this pull request?

Implement percentile SQL function. It computes the exact percentile(s) of expr 
at pc with range in [0, 1].

## How was this patch tested?

Add a new testsuite `PercentileSuite` to test percentile directly.
Updated related testcases in `ExpressionToSQLSuite`.

Author: jiangxingbo 
Author: 蒋星博 
Author: jiangxingbo 

Closes #14136 from jiangxb1987/percentile.

(cherry picked from commit 0f5f52a3d1e5dcf5b970c49e324e322b9deb00f3)
Signed-off-by: Herman van Hovell 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/81e3f971
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/81e3f971
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/81e3f971

Branch: refs/heads/branch-2.1
Commit: 81e3f9711da5758fdeb297fe057685f648b6458b
Parents: 4d79478
Author: jiangxingbo 
Authored: Mon Nov 28 11:05:58 2016 -0800
Committer: Herman van Hovell 
Committed: Mon Nov 28 11:06:12 2016 -0800

--
 .../catalyst/analysis/FunctionRegistry.scala|   1 +
 .../expressions/aggregate/Percentile.scala  | 269 +++
 .../expressions/aggregate/PercentileSuite.scala | 245 +
 .../spark/sql/hive/HiveSessionCatalog.scala |   3 +-
 .../sql/catalyst/ExpressionToSQLSuite.scala |   2 +
 5 files changed, 518 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/81e3f971/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
index 007cdc1..2636afe 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
@@ -249,6 +249,7 @@ object FunctionRegistry {
 expression[Max]("max"),
 expression[Average]("mean"),
 expression[Min]("min"),
+expression[Percentile]("percentile"),
 expression[Skewness]("skewness"),
 expression[ApproximatePercentile]("percentile_approx"),
 expression[StddevSamp]("std"),

http://git-wip-us.apache.org/repos/asf/spark/blob/81e3f971/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Percentile.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Percentile.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Percentile.scala
new file mode 100644
index 000..356e088
--- /dev/null
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Percentile.scala
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions.aggregate
+
+import java.io.{ByteArrayInputStream, ByteArrayOutputStream, DataInputStream, 
DataOutputStream}
+import java.util
+
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
+import 
org.apache.spark.sql.catalyst.analysis.TypeCheckResult.{TypeCheckFailure, 
TypeCheckSuccess}
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.util._
+import org.apache.spark.sql.types._
+import org.apache.spark.util.collection.OpenHashMap
+
+/**
+ * The Percentile aggregate function returns the exact percentile(s) of 
numeric column `expr` at
+ * the given percentage(s) with value 

spark git commit: [SPARK-17680][SQL][TEST] Added test cases for InMemoryRelation

2016-11-28 Thread andrewor14
Repository: spark
Updated Branches:
  refs/heads/master 0f5f52a3d -> ad67993b7


[SPARK-17680][SQL][TEST] Added test cases for InMemoryRelation

## What changes were proposed in this pull request?

This pull request adds test cases for the following cases:
- keep all data types with null or without null
- access `CachedBatch` disabling whole stage codegen
- access only some columns in `CachedBatch`

This PR is a part of https://github.com/apache/spark/pull/15219. Here are 
motivations to add these tests. When https://github.com/apache/spark/pull/15219 
is enabled, the first two cases are handled by specialized (generated) code. 
The third one is a pitfall.

In general, even for now, it would be helpful to increase test coverage.
## How was this patch tested?

added test suites itself

Author: Kazuaki Ishizaki 

Closes #15462 from kiszk/columnartestsuites.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ad67993b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ad67993b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ad67993b

Branch: refs/heads/master
Commit: ad67993b73490a24e7012df23810dab1712e7689
Parents: 0f5f52a
Author: Kazuaki Ishizaki 
Authored: Mon Nov 28 14:06:37 2016 -0500
Committer: Andrew Or 
Committed: Mon Nov 28 14:06:37 2016 -0500

--
 .../columnar/InMemoryColumnarQuerySuite.scala   | 148 ++-
 1 file changed, 146 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/ad67993b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala
--
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala
index b272c8e..afeb478 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala
@@ -20,18 +20,96 @@ package org.apache.spark.sql.execution.columnar
 import java.nio.charset.StandardCharsets
 import java.sql.{Date, Timestamp}
 
-import org.apache.spark.sql.{QueryTest, Row}
+import org.apache.spark.sql.{DataFrame, QueryTest, Row}
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.SharedSQLContext
 import org.apache.spark.sql.test.SQLTestData._
 import org.apache.spark.sql.types._
-import org.apache.spark.storage.StorageLevel.MEMORY_ONLY
+import org.apache.spark.storage.StorageLevel._
 
 class InMemoryColumnarQuerySuite extends QueryTest with SharedSQLContext {
   import testImplicits._
 
   setupTestData()
 
+  private def cachePrimitiveTest(data: DataFrame, dataType: String) {
+data.createOrReplaceTempView(s"testData$dataType")
+val storageLevel = MEMORY_ONLY
+val plan = spark.sessionState.executePlan(data.logicalPlan).sparkPlan
+val inMemoryRelation = InMemoryRelation(useCompression = true, 5, 
storageLevel, plan, None)
+
+assert(inMemoryRelation.cachedColumnBuffers.getStorageLevel == 
storageLevel)
+inMemoryRelation.cachedColumnBuffers.collect().head match {
+  case _: CachedBatch =>
+  case other => fail(s"Unexpected cached batch type: 
${other.getClass.getName}")
+}
+checkAnswer(inMemoryRelation, data.collect().toSeq)
+  }
+
+  private def testPrimitiveType(nullability: Boolean): Unit = {
+val dataTypes = Seq(BooleanType, ByteType, ShortType, IntegerType, 
LongType,
+  FloatType, DoubleType, DateType, TimestampType, DecimalType(25, 5), 
DecimalType(6, 5))
+val schema = StructType(dataTypes.zipWithIndex.map { case (dataType, 
index) =>
+  StructField(s"col$index", dataType, nullability)
+})
+val rdd = spark.sparkContext.parallelize((1 to 10).map(i => Row(
+  if (nullability && i % 3 == 0) null else if (i % 2 == 0) true else false,
+  if (nullability && i % 3 == 0) null else i.toByte,
+  if (nullability && i % 3 == 0) null else i.toShort,
+  if (nullability && i % 3 == 0) null else i.toInt,
+  if (nullability && i % 3 == 0) null else i.toLong,
+  if (nullability && i % 3 == 0) null else (i + 0.25).toFloat,
+  if (nullability && i % 3 == 0) null else (i + 0.75).toDouble,
+  if (nullability && i % 3 == 0) null else new Date(i),
+  if (nullability && i % 3 == 0) null else new Timestamp(i * 100L),
+  if (nullability && i % 3 == 0) null else 
BigDecimal(Long.MaxValue.toString + ".12345"),
+  if (nullability && i % 3 == 0) null
+  else new java.math.BigDecimal(s"${i % 9 + 1}" + ".23456")
+)))
+

spark git commit: [SPARK-16282][SQL] Implement percentile SQL function.

2016-11-28 Thread hvanhovell
Repository: spark
Updated Branches:
  refs/heads/master 185642846 -> 0f5f52a3d


[SPARK-16282][SQL] Implement percentile SQL function.

## What changes were proposed in this pull request?

Implement percentile SQL function. It computes the exact percentile(s) of expr 
at pc with range in [0, 1].

## How was this patch tested?

Add a new testsuite `PercentileSuite` to test percentile directly.
Updated related testcases in `ExpressionToSQLSuite`.

Author: jiangxingbo 
Author: 蒋星博 
Author: jiangxingbo 

Closes #14136 from jiangxb1987/percentile.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0f5f52a3
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0f5f52a3
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0f5f52a3

Branch: refs/heads/master
Commit: 0f5f52a3d1e5dcf5b970c49e324e322b9deb00f3
Parents: 1856428
Author: jiangxingbo 
Authored: Mon Nov 28 11:05:58 2016 -0800
Committer: Herman van Hovell 
Committed: Mon Nov 28 11:05:58 2016 -0800

--
 .../catalyst/analysis/FunctionRegistry.scala|   1 +
 .../expressions/aggregate/Percentile.scala  | 269 +++
 .../expressions/aggregate/PercentileSuite.scala | 245 +
 .../spark/sql/hive/HiveSessionCatalog.scala |   3 +-
 .../sql/catalyst/ExpressionToSQLSuite.scala |   2 +
 5 files changed, 518 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/0f5f52a3/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
index 007cdc1..2636afe 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
@@ -249,6 +249,7 @@ object FunctionRegistry {
 expression[Max]("max"),
 expression[Average]("mean"),
 expression[Min]("min"),
+expression[Percentile]("percentile"),
 expression[Skewness]("skewness"),
 expression[ApproximatePercentile]("percentile_approx"),
 expression[StddevSamp]("std"),

http://git-wip-us.apache.org/repos/asf/spark/blob/0f5f52a3/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Percentile.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Percentile.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Percentile.scala
new file mode 100644
index 000..356e088
--- /dev/null
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Percentile.scala
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions.aggregate
+
+import java.io.{ByteArrayInputStream, ByteArrayOutputStream, DataInputStream, 
DataOutputStream}
+import java.util
+
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
+import 
org.apache.spark.sql.catalyst.analysis.TypeCheckResult.{TypeCheckFailure, 
TypeCheckSuccess}
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.util._
+import org.apache.spark.sql.types._
+import org.apache.spark.util.collection.OpenHashMap
+
+/**
+ * The Percentile aggregate function returns the exact percentile(s) of 
numeric column `expr` at
+ * the given percentage(s) with value range in [0.0, 1.0].
+ *
+ * The operator is bound to the slower sort based aggregation path because the 
number of elements
+ * and their 

spark git commit: [SQL][MINOR] DESC should use 'Catalog' as partition provider

2016-11-28 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/master eba727757 -> 185642846


[SQL][MINOR] DESC should use 'Catalog' as partition provider

## What changes were proposed in this pull request?

`CatalogTable` has a parameter named `tracksPartitionsInCatalog`, and in 
`CatalogTable.toString` we use `"Partition Provider: Catalog"` to represent it. 
This PR fixes `DESC TABLE` to make it consistent with `CatalogTable.toString`.

## How was this patch tested?

N/A

Author: Wenchen Fan 

Closes #16035 from cloud-fan/minor.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/18564284
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/18564284
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/18564284

Branch: refs/heads/master
Commit: 185642846e25fa812f9c7f398ab20bffc1e25273
Parents: eba7277
Author: Wenchen Fan 
Authored: Mon Nov 28 10:57:17 2016 -0800
Committer: Reynold Xin 
Committed: Mon Nov 28 10:57:17 2016 -0800

--
 .../main/scala/org/apache/spark/sql/execution/command/tables.scala | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/18564284/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
index ca4d20a..57d66f1 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
@@ -489,7 +489,7 @@ case class DescribeTableCommand(
 if (table.tableType == CatalogTableType.VIEW) describeViewInfo(table, 
buffer)
 
 if (DDLUtils.isDatasourceTable(table) && table.tracksPartitionsInCatalog) {
-  append(buffer, "Partition Provider:", "Hive", "")
+  append(buffer, "Partition Provider:", "Catalog", "")
 }
   }
 


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-18602] Set the version of org.codehaus.janino:commons-compiler to 3.0.0 to match the version of org.codehaus.janino:janino

2016-11-28 Thread yhuai
Repository: spark
Updated Branches:
  refs/heads/branch-2.1 32b259fae -> 34ad4d520


[SPARK-18602] Set the version of org.codehaus.janino:commons-compiler to 3.0.0 
to match the version of org.codehaus.janino:janino

## What changes were proposed in this pull request?
org.codehaus.janino:janino depends on org.codehaus.janino:commons-compiler and 
we have been upgraded to org.codehaus.janino:janino 3.0.0.

However, seems we are still pulling in org.codehaus.janino:commons-compiler 
2.7.6 because of calcite. It looks like an accident because we exclude janino 
from calcite (see here 
https://github.com/apache/spark/blob/branch-2.1/pom.xml#L1759). So, this PR 
upgrades org.codehaus.janino:commons-compiler to 3.0.0.

## How was this patch tested?
jenkins

Author: Yin Huai 

Closes #16025 from yhuai/janino-commons-compile.

(cherry picked from commit eba727757ed5dc23c635e1926795aea62ec0fc66)
Signed-off-by: Yin Huai 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/34ad4d52
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/34ad4d52
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/34ad4d52

Branch: refs/heads/branch-2.1
Commit: 34ad4d520ae0e4302972097c5985ab2c5a8d5e04
Parents: 32b259f
Author: Yin Huai 
Authored: Mon Nov 28 10:09:30 2016 -0800
Committer: Yin Huai 
Committed: Mon Nov 28 10:09:50 2016 -0800

--
 dev/deps/spark-deps-hadoop-2.2 | 2 +-
 dev/deps/spark-deps-hadoop-2.3 | 2 +-
 dev/deps/spark-deps-hadoop-2.4 | 2 +-
 dev/deps/spark-deps-hadoop-2.6 | 2 +-
 dev/deps/spark-deps-hadoop-2.7 | 2 +-
 pom.xml| 9 +
 sql/catalyst/pom.xml   | 4 
 7 files changed, 18 insertions(+), 5 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/34ad4d52/dev/deps/spark-deps-hadoop-2.2
--
diff --git a/dev/deps/spark-deps-hadoop-2.2 b/dev/deps/spark-deps-hadoop-2.2
index bbdea06..89bfcef 100644
--- a/dev/deps/spark-deps-hadoop-2.2
+++ b/dev/deps/spark-deps-hadoop-2.2
@@ -24,7 +24,7 @@ commons-beanutils-core-1.8.0.jar
 commons-cli-1.2.jar
 commons-codec-1.10.jar
 commons-collections-3.2.2.jar
-commons-compiler-2.7.6.jar
+commons-compiler-3.0.0.jar
 commons-compress-1.4.1.jar
 commons-configuration-1.6.jar
 commons-crypto-1.0.0.jar

http://git-wip-us.apache.org/repos/asf/spark/blob/34ad4d52/dev/deps/spark-deps-hadoop-2.3
--
diff --git a/dev/deps/spark-deps-hadoop-2.3 b/dev/deps/spark-deps-hadoop-2.3
index a2dec41..8df3858 100644
--- a/dev/deps/spark-deps-hadoop-2.3
+++ b/dev/deps/spark-deps-hadoop-2.3
@@ -27,7 +27,7 @@ commons-beanutils-core-1.8.0.jar
 commons-cli-1.2.jar
 commons-codec-1.10.jar
 commons-collections-3.2.2.jar
-commons-compiler-2.7.6.jar
+commons-compiler-3.0.0.jar
 commons-compress-1.4.1.jar
 commons-configuration-1.6.jar
 commons-crypto-1.0.0.jar

http://git-wip-us.apache.org/repos/asf/spark/blob/34ad4d52/dev/deps/spark-deps-hadoop-2.4
--
diff --git a/dev/deps/spark-deps-hadoop-2.4 b/dev/deps/spark-deps-hadoop-2.4
index c1f02b9..71e7fb6 100644
--- a/dev/deps/spark-deps-hadoop-2.4
+++ b/dev/deps/spark-deps-hadoop-2.4
@@ -27,7 +27,7 @@ commons-beanutils-core-1.8.0.jar
 commons-cli-1.2.jar
 commons-codec-1.10.jar
 commons-collections-3.2.2.jar
-commons-compiler-2.7.6.jar
+commons-compiler-3.0.0.jar
 commons-compress-1.4.1.jar
 commons-configuration-1.6.jar
 commons-crypto-1.0.0.jar

http://git-wip-us.apache.org/repos/asf/spark/blob/34ad4d52/dev/deps/spark-deps-hadoop-2.6
--
diff --git a/dev/deps/spark-deps-hadoop-2.6 b/dev/deps/spark-deps-hadoop-2.6
index 4f04636..ba31391 100644
--- a/dev/deps/spark-deps-hadoop-2.6
+++ b/dev/deps/spark-deps-hadoop-2.6
@@ -31,7 +31,7 @@ commons-beanutils-core-1.8.0.jar
 commons-cli-1.2.jar
 commons-codec-1.10.jar
 commons-collections-3.2.2.jar
-commons-compiler-2.7.6.jar
+commons-compiler-3.0.0.jar
 commons-compress-1.4.1.jar
 commons-configuration-1.6.jar
 commons-crypto-1.0.0.jar

http://git-wip-us.apache.org/repos/asf/spark/blob/34ad4d52/dev/deps/spark-deps-hadoop-2.7
--
diff --git a/dev/deps/spark-deps-hadoop-2.7 b/dev/deps/spark-deps-hadoop-2.7
index da3af9f..b129e5a 100644
--- a/dev/deps/spark-deps-hadoop-2.7
+++ b/dev/deps/spark-deps-hadoop-2.7
@@ -31,7 +31,7 @@ commons-beanutils-core-1.8.0.jar
 commons-cli-1.2.jar
 commons-codec-1.10.jar
 commons-collections-3.2.2.jar
-commons-compiler-2.7.6.jar
+commons-compiler-3.0.0.jar
 commons-compress-1.4.1.jar
 commons-configuration-1.6.jar
 

spark git commit: [SPARK-18602] Set the version of org.codehaus.janino:commons-compiler to 3.0.0 to match the version of org.codehaus.janino:janino

2016-11-28 Thread yhuai
Repository: spark
Updated Branches:
  refs/heads/master 237c3b964 -> eba727757


[SPARK-18602] Set the version of org.codehaus.janino:commons-compiler to 3.0.0 
to match the version of org.codehaus.janino:janino

## What changes were proposed in this pull request?
org.codehaus.janino:janino depends on org.codehaus.janino:commons-compiler and 
we have been upgraded to org.codehaus.janino:janino 3.0.0.

However, seems we are still pulling in org.codehaus.janino:commons-compiler 
2.7.6 because of calcite. It looks like an accident because we exclude janino 
from calcite (see here 
https://github.com/apache/spark/blob/branch-2.1/pom.xml#L1759). So, this PR 
upgrades org.codehaus.janino:commons-compiler to 3.0.0.

## How was this patch tested?
jenkins

Author: Yin Huai 

Closes #16025 from yhuai/janino-commons-compile.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/eba72775
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/eba72775
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/eba72775

Branch: refs/heads/master
Commit: eba727757ed5dc23c635e1926795aea62ec0fc66
Parents: 237c3b9
Author: Yin Huai 
Authored: Mon Nov 28 10:09:30 2016 -0800
Committer: Yin Huai 
Committed: Mon Nov 28 10:09:30 2016 -0800

--
 dev/deps/spark-deps-hadoop-2.2 | 2 +-
 dev/deps/spark-deps-hadoop-2.3 | 2 +-
 dev/deps/spark-deps-hadoop-2.4 | 2 +-
 dev/deps/spark-deps-hadoop-2.6 | 2 +-
 dev/deps/spark-deps-hadoop-2.7 | 2 +-
 pom.xml| 9 +
 sql/catalyst/pom.xml   | 4 
 7 files changed, 18 insertions(+), 5 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/eba72775/dev/deps/spark-deps-hadoop-2.2
--
diff --git a/dev/deps/spark-deps-hadoop-2.2 b/dev/deps/spark-deps-hadoop-2.2
index bbdea06..89bfcef 100644
--- a/dev/deps/spark-deps-hadoop-2.2
+++ b/dev/deps/spark-deps-hadoop-2.2
@@ -24,7 +24,7 @@ commons-beanutils-core-1.8.0.jar
 commons-cli-1.2.jar
 commons-codec-1.10.jar
 commons-collections-3.2.2.jar
-commons-compiler-2.7.6.jar
+commons-compiler-3.0.0.jar
 commons-compress-1.4.1.jar
 commons-configuration-1.6.jar
 commons-crypto-1.0.0.jar

http://git-wip-us.apache.org/repos/asf/spark/blob/eba72775/dev/deps/spark-deps-hadoop-2.3
--
diff --git a/dev/deps/spark-deps-hadoop-2.3 b/dev/deps/spark-deps-hadoop-2.3
index a2dec41..8df3858 100644
--- a/dev/deps/spark-deps-hadoop-2.3
+++ b/dev/deps/spark-deps-hadoop-2.3
@@ -27,7 +27,7 @@ commons-beanutils-core-1.8.0.jar
 commons-cli-1.2.jar
 commons-codec-1.10.jar
 commons-collections-3.2.2.jar
-commons-compiler-2.7.6.jar
+commons-compiler-3.0.0.jar
 commons-compress-1.4.1.jar
 commons-configuration-1.6.jar
 commons-crypto-1.0.0.jar

http://git-wip-us.apache.org/repos/asf/spark/blob/eba72775/dev/deps/spark-deps-hadoop-2.4
--
diff --git a/dev/deps/spark-deps-hadoop-2.4 b/dev/deps/spark-deps-hadoop-2.4
index c1f02b9..71e7fb6 100644
--- a/dev/deps/spark-deps-hadoop-2.4
+++ b/dev/deps/spark-deps-hadoop-2.4
@@ -27,7 +27,7 @@ commons-beanutils-core-1.8.0.jar
 commons-cli-1.2.jar
 commons-codec-1.10.jar
 commons-collections-3.2.2.jar
-commons-compiler-2.7.6.jar
+commons-compiler-3.0.0.jar
 commons-compress-1.4.1.jar
 commons-configuration-1.6.jar
 commons-crypto-1.0.0.jar

http://git-wip-us.apache.org/repos/asf/spark/blob/eba72775/dev/deps/spark-deps-hadoop-2.6
--
diff --git a/dev/deps/spark-deps-hadoop-2.6 b/dev/deps/spark-deps-hadoop-2.6
index 4f04636..ba31391 100644
--- a/dev/deps/spark-deps-hadoop-2.6
+++ b/dev/deps/spark-deps-hadoop-2.6
@@ -31,7 +31,7 @@ commons-beanutils-core-1.8.0.jar
 commons-cli-1.2.jar
 commons-codec-1.10.jar
 commons-collections-3.2.2.jar
-commons-compiler-2.7.6.jar
+commons-compiler-3.0.0.jar
 commons-compress-1.4.1.jar
 commons-configuration-1.6.jar
 commons-crypto-1.0.0.jar

http://git-wip-us.apache.org/repos/asf/spark/blob/eba72775/dev/deps/spark-deps-hadoop-2.7
--
diff --git a/dev/deps/spark-deps-hadoop-2.7 b/dev/deps/spark-deps-hadoop-2.7
index da3af9f..b129e5a 100644
--- a/dev/deps/spark-deps-hadoop-2.7
+++ b/dev/deps/spark-deps-hadoop-2.7
@@ -31,7 +31,7 @@ commons-beanutils-core-1.8.0.jar
 commons-cli-1.2.jar
 commons-codec-1.10.jar
 commons-collections-3.2.2.jar
-commons-compiler-2.7.6.jar
+commons-compiler-3.0.0.jar
 commons-compress-1.4.1.jar
 commons-configuration-1.6.jar
 commons-crypto-1.0.0.jar

http://git-wip-us.apache.org/repos/asf/spark/blob/eba72775/pom.xml

spark git commit: [SPARK-18535][UI][YARN] Redact sensitive information from Spark logs and UI

2016-11-28 Thread vanzin
Repository: spark
Updated Branches:
  refs/heads/master d31ff9b7c -> 237c3b964


[SPARK-18535][UI][YARN] Redact sensitive information from Spark logs and UI

## What changes were proposed in this pull request?

This patch adds a new property called `spark.secret.redactionPattern` that
allows users to specify a scala regex to decide which Spark configuration
properties and environment variables in driver and executor environments
contain sensitive information. When this regex matches the property or
environment variable name, its value is redacted from the environment UI and
various logs like YARN and event logs.

This change uses this property to redact information from event logs and YARN
logs. It also, updates the UI code to adhere to this property instead of
hardcoding the logic to decipher which properties are sensitive.

Here's an image of the UI post-redaction:
![image](https://cloud.githubusercontent.com/assets/1709451/20506215/4cc30654-b007-11e6-8aee-4cde253fba2f.png)

Here's the text in the YARN logs, post-redaction:
``HADOOP_CREDSTORE_PASSWORD -> *(redacted)``

Here's the text in the event logs, post-redaction:
``...,"spark.executorEnv.HADOOP_CREDSTORE_PASSWORD":"*(redacted)","spark.yarn.appMasterEnv.HADOOP_CREDSTORE_PASSWORD":"*(redacted)",...``

## How was this patch tested?
1. Unit tests are added to ensure that redaction works.
2. A YARN job reading data off of S3 with confidential information
(hadoop credential provider password) being provided in the environment
variables of driver and executor. And, afterwards, logs were grepped to make
sure that no mention of secret password was present. It was also ensure that
the job was able to read the data off of S3 correctly, thereby ensuring that
the sensitive information was being trickled down to the right places to read
the data.
3. The event logs were checked to make sure no mention of secret password was
present.
4. UI environment tab was checked to make sure there was no secret information
being displayed.

Author: Mark Grover 

Closes #15971 from markgrover/master_redaction.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/237c3b96
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/237c3b96
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/237c3b96

Branch: refs/heads/master
Commit: 237c3b9642a1a7c5e7884824b21877590d5d0b3b
Parents: d31ff9b
Author: Mark Grover 
Authored: Mon Nov 28 08:59:47 2016 -0800
Committer: Marcelo Vanzin 
Committed: Mon Nov 28 08:59:47 2016 -0800

--
 .../apache/spark/internal/config/package.scala  |  9 +
 .../spark/scheduler/EventLoggingListener.scala  | 13 -
 .../apache/spark/ui/env/EnvironmentPage.scala   | 12 
 .../apache/spark/ui/env/EnvironmentTab.scala|  1 +
 .../scala/org/apache/spark/util/Utils.scala | 14 +-
 .../scheduler/EventLoggingListenerSuite.scala   | 12 
 .../org/apache/spark/util/UtilsSuite.scala  | 20 
 docs/configuration.md   |  9 +
 .../spark/deploy/yarn/ExecutorRunnable.scala|  3 +--
 9 files changed, 81 insertions(+), 12 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/237c3b96/core/src/main/scala/org/apache/spark/internal/config/package.scala
--
diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala 
b/core/src/main/scala/org/apache/spark/internal/config/package.scala
index 2951bdc..a69a2b5 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/package.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -223,4 +223,13 @@ package object config {
   " bigger files.")
 .longConf
 .createWithDefault(4 * 1024 * 1024)
+
+  private[spark] val SECRET_REDACTION_PATTERN =
+ConfigBuilder("spark.redaction.regex")
+  .doc("Regex to decide which Spark configuration properties and 
environment variables in " +
+"driver and executor environments contain sensitive information. When 
this regex matches " +
+"a property, its value is redacted from the environment UI and various 
logs like YARN " +
+"and event logs.")
+  .stringConf
+  .createWithDefault("(?i)secret|password")
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/237c3b96/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala 
b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
index ce78774..f39565e 100644
--- 

spark git commit: [SPARK-17732][SQL] Revert ALTER TABLE DROP PARTITION should support comparators

2016-11-28 Thread hvanhovell
Repository: spark
Updated Branches:
  refs/heads/master 38e29824d -> d31ff9b7c


[SPARK-17732][SQL] Revert ALTER TABLE DROP PARTITION should support comparators

## What changes were proposed in this pull request?

https://github.com/apache/spark/pull/15704 will fail if we use int literal in 
`DROP PARTITION`, and we have reverted it in branch-2.1.

This PR reverts it in master branch, and add a regression test for it, to make 
sure the master branch is healthy.

## How was this patch tested?

new regression test

Author: Wenchen Fan 

Closes #16036 from cloud-fan/revert.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d31ff9b7
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d31ff9b7
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d31ff9b7

Branch: refs/heads/master
Commit: d31ff9b7caf4eba66724947b68f517072e6a011c
Parents: 38e2982
Author: Wenchen Fan 
Authored: Mon Nov 28 08:46:00 2016 -0800
Committer: Herman van Hovell 
Committed: Mon Nov 28 08:46:00 2016 -0800

--
 .../apache/spark/sql/catalyst/parser/SqlBase.g4 |   6 +-
 .../spark/sql/catalyst/parser/AstBuilder.scala  |  30 +-
 .../spark/sql/execution/SparkSqlParser.scala|   2 +-
 .../spark/sql/execution/command/ddl.scala   |  51 ++---
 .../datasources/DataSourceStrategy.scala|   8 +-
 .../sql/execution/command/DDLCommandSuite.scala |   9 +-
 .../spark/sql/execution/command/DDLSuite.scala  |  14 ++-
 .../spark/sql/hive/execution/HiveDDLSuite.scala | 103 ---
 8 files changed, 34 insertions(+), 189 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/d31ff9b7/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
--
diff --git 
a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 
b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
index 4531fe4..df85c70 100644
--- 
a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
+++ 
b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
@@ -235,7 +235,11 @@ partitionSpecLocation
 ;
 
 partitionSpec
-: PARTITION '(' expression (',' expression)* ')'
+: PARTITION '(' partitionVal (',' partitionVal)* ')'
+;
+
+partitionVal
+: identifier (EQ constant)?
 ;
 
 describeFuncName

http://git-wip-us.apache.org/repos/asf/spark/blob/d31ff9b7/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
index 8e8d374..3fa7bf1 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
@@ -194,15 +194,10 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with 
Logging {
*/
   override def visitPartitionSpec(
   ctx: PartitionSpecContext): Map[String, Option[String]] = 
withOrigin(ctx) {
-val parts = ctx.expression.asScala.map { pVal =>
-  expression(pVal) match {
-case UnresolvedAttribute(name :: Nil) =>
-  name -> None
-case cmp @ EqualTo(UnresolvedAttribute(name :: Nil), constant: 
Literal) =>
-  name -> Option(constant.toString)
-case _ =>
-  throw new ParseException("Invalid partition filter specification", 
ctx)
-  }
+val parts = ctx.partitionVal.asScala.map { pVal =>
+  val name = pVal.identifier.getText
+  val value = Option(pVal.constant).map(visitStringConstant)
+  name -> value
 }
 // Before calling `toMap`, we check duplicated keys to avoid silently 
ignore partition values
 // in partition spec like PARTITION(a='1', b='2', a='3'). The real 
semantical check for
@@ -212,23 +207,6 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with 
Logging {
   }
 
   /**
-   * Create a partition filter specification.
-   */
-  def visitPartitionFilterSpec(ctx: PartitionSpecContext): Expression = 
withOrigin(ctx) {
-val parts = ctx.expression.asScala.map { pVal =>
-  expression(pVal) match {
-case EqualNullSafe(_, _) =>
-  throw new ParseException("'<=>' operator is not allowed in partition 
specification.", ctx)
-case cmp @ BinaryComparison(UnresolvedAttribute(name :: Nil), 
constant: Literal) =>
-  cmp.withNewChildren(Seq(AttributeReference(name, StringType)(), 
constant))
-case _ =>
-  throw new ParseException("Invalid 

spark git commit: [SPARK-18597][SQL] Do not push-down join conditions to the right side of a LEFT ANTI join

2016-11-28 Thread hvanhovell
Repository: spark
Updated Branches:
  refs/heads/master 9f273c517 -> 38e29824d


[SPARK-18597][SQL] Do not push-down join conditions to the right side of a LEFT 
ANTI join

## What changes were proposed in this pull request?
We currently push down join conditions of a Left Anti join to both sides of the 
join. This is similar to Inner, Left Semi and Existence (a specialized left 
semi) join. The problem is that this changes the semantics of the join; a left 
anti join filters out rows that matches the join condition.

This PR fixes this by only pushing down conditions to the left hand side of the 
join. This is similar to the behavior of left outer join.

## How was this patch tested?
Added tests to `FilterPushdownSuite.scala` and created a SQLQueryTestSuite file 
for left anti joins with a regression test.

Author: Herman van Hovell 

Closes #16026 from hvanhovell/SPARK-18597.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/38e29824
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/38e29824
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/38e29824

Branch: refs/heads/master
Commit: 38e29824d9a50464daa397c28e89610ed0aed4b6
Parents: 9f273c5
Author: Herman van Hovell 
Authored: Mon Nov 28 07:10:52 2016 -0800
Committer: Herman van Hovell 
Committed: Mon Nov 28 07:10:52 2016 -0800

--
 .../sql/catalyst/optimizer/Optimizer.scala  |  6 ++--
 .../optimizer/FilterPushdownSuite.scala | 33 
 .../resources/sql-tests/inputs/anti-join.sql|  7 +
 .../sql-tests/results/anti-join.sql.out | 29 +
 4 files changed, 72 insertions(+), 3 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/38e29824/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index 2679e02..805cad5 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -932,7 +932,7 @@ object PushPredicateThroughJoin extends Rule[LogicalPlan] 
with PredicateHelper {
 split(joinCondition.map(splitConjunctivePredicates).getOrElse(Nil), 
left, right)
 
   joinType match {
-case _: InnerLike | LeftExistence(_) =>
+case _: InnerLike |  LeftSemi | ExistenceJoin(_) =>
   // push down the single side only join filter for both sides sub 
queries
   val newLeft = leftJoinConditions.
 reduceLeftOption(And).map(Filter(_, left)).getOrElse(left)
@@ -949,14 +949,14 @@ object PushPredicateThroughJoin extends Rule[LogicalPlan] 
with PredicateHelper {
   val newJoinCond = (rightJoinConditions ++ 
commonJoinCondition).reduceLeftOption(And)
 
   Join(newLeft, newRight, RightOuter, newJoinCond)
-case LeftOuter =>
+case LeftOuter | LeftAnti =>
   // push down the right side only join filter for right sub query
   val newLeft = left
   val newRight = rightJoinConditions.
 reduceLeftOption(And).map(Filter(_, right)).getOrElse(right)
   val newJoinCond = (leftJoinConditions ++ 
commonJoinCondition).reduceLeftOption(And)
 
-  Join(newLeft, newRight, LeftOuter, newJoinCond)
+  Join(newLeft, newRight, joinType, newJoinCond)
 case FullOuter => j
 case NaturalJoin(_) => sys.error("Untransformed NaturalJoin node")
 case UsingJoin(_, _) => sys.error("Untransformed Using join node")

http://git-wip-us.apache.org/repos/asf/spark/blob/38e29824/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
--
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
index 019f132..3e67282 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
@@ -514,6 +514,39 @@ class FilterPushdownSuite extends PlanTest {
 comparePlans(optimized, analysis.EliminateSubqueryAliases(correctAnswer))
   }
 
+  test("joins: push down where clause into left anti join") {
+val x = testRelation.subquery('x)
+val y = testRelation.subquery('y)
+val 

spark git commit: [SPARK-18597][SQL] Do not push-down join conditions to the right side of a LEFT ANTI join

2016-11-28 Thread hvanhovell
Repository: spark
Updated Branches:
  refs/heads/branch-2.1 a9d4febe9 -> 32b259fae


[SPARK-18597][SQL] Do not push-down join conditions to the right side of a LEFT 
ANTI join

## What changes were proposed in this pull request?
We currently push down join conditions of a Left Anti join to both sides of the 
join. This is similar to Inner, Left Semi and Existence (a specialized left 
semi) join. The problem is that this changes the semantics of the join; a left 
anti join filters out rows that matches the join condition.

This PR fixes this by only pushing down conditions to the left hand side of the 
join. This is similar to the behavior of left outer join.

## How was this patch tested?
Added tests to `FilterPushdownSuite.scala` and created a SQLQueryTestSuite file 
for left anti joins with a regression test.

Author: Herman van Hovell 

Closes #16026 from hvanhovell/SPARK-18597.

(cherry picked from commit 38e29824d9a50464daa397c28e89610ed0aed4b6)
Signed-off-by: Herman van Hovell 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/32b259fa
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/32b259fa
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/32b259fa

Branch: refs/heads/branch-2.1
Commit: 32b259faed7e0573c0f465954205cbd3b94ee440
Parents: a9d4feb
Author: Herman van Hovell 
Authored: Mon Nov 28 07:10:52 2016 -0800
Committer: Herman van Hovell 
Committed: Mon Nov 28 07:11:02 2016 -0800

--
 .../sql/catalyst/optimizer/Optimizer.scala  |  6 ++--
 .../optimizer/FilterPushdownSuite.scala | 33 
 .../resources/sql-tests/inputs/anti-join.sql|  7 +
 .../sql-tests/results/anti-join.sql.out | 29 +
 4 files changed, 72 insertions(+), 3 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/32b259fa/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index 2679e02..805cad5 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -932,7 +932,7 @@ object PushPredicateThroughJoin extends Rule[LogicalPlan] 
with PredicateHelper {
 split(joinCondition.map(splitConjunctivePredicates).getOrElse(Nil), 
left, right)
 
   joinType match {
-case _: InnerLike | LeftExistence(_) =>
+case _: InnerLike |  LeftSemi | ExistenceJoin(_) =>
   // push down the single side only join filter for both sides sub 
queries
   val newLeft = leftJoinConditions.
 reduceLeftOption(And).map(Filter(_, left)).getOrElse(left)
@@ -949,14 +949,14 @@ object PushPredicateThroughJoin extends Rule[LogicalPlan] 
with PredicateHelper {
   val newJoinCond = (rightJoinConditions ++ 
commonJoinCondition).reduceLeftOption(And)
 
   Join(newLeft, newRight, RightOuter, newJoinCond)
-case LeftOuter =>
+case LeftOuter | LeftAnti =>
   // push down the right side only join filter for right sub query
   val newLeft = left
   val newRight = rightJoinConditions.
 reduceLeftOption(And).map(Filter(_, right)).getOrElse(right)
   val newJoinCond = (leftJoinConditions ++ 
commonJoinCondition).reduceLeftOption(And)
 
-  Join(newLeft, newRight, LeftOuter, newJoinCond)
+  Join(newLeft, newRight, joinType, newJoinCond)
 case FullOuter => j
 case NaturalJoin(_) => sys.error("Untransformed NaturalJoin node")
 case UsingJoin(_, _) => sys.error("Untransformed Using join node")

http://git-wip-us.apache.org/repos/asf/spark/blob/32b259fa/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
--
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
index 019f132..3e67282 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
@@ -514,6 +514,39 @@ class FilterPushdownSuite extends PlanTest {
 comparePlans(optimized, analysis.EliminateSubqueryAliases(correctAnswer))
   }
 
+  test("joins: push 

spark git commit: [SPARK-17783][SQL] Hide Credentials in CREATE and DESC FORMATTED/EXTENDED a PERSISTENT/TEMP Table for JDBC

2016-11-28 Thread hvanhovell
Repository: spark
Updated Branches:
  refs/heads/branch-2.1 e449f7546 -> a9d4febe9


[SPARK-17783][SQL] Hide Credentials in CREATE and DESC FORMATTED/EXTENDED a 
PERSISTENT/TEMP Table for JDBC

### What changes were proposed in this pull request?

We should never expose the Credentials in the EXPLAIN and DESC 
FORMATTED/EXTENDED command. However, below commands exposed the credentials.

In the related PR: https://github.com/apache/spark/pull/10452

> URL patterns to specify credential seems to be vary between different 
> databases.

Thus, we hide the whole `url` value if it contains the keyword `password`. We 
also hide the `password` property.

Before the fix, the command outputs look like:

``` SQL
CREATE TABLE tab1
USING org.apache.spark.sql.jdbc
OPTIONS (
 url 'jdbc:h2:mem:testdb0;user=testUser;password=testPass',
 dbtable 'TEST.PEOPLE',
 user 'testUser',
 password '$password')

DESC FORMATTED tab1
DESC EXTENDED tab1
```

Before the fix,
- The output of SQL statement EXPLAIN
```
== Physical Plan ==
ExecutedCommand
   +- CreateDataSourceTableCommand CatalogTable(
Table: `tab1`
Created: Wed Nov 16 23:00:10 PST 2016
Last Access: Wed Dec 31 15:59:59 PST 1969
Type: MANAGED
Provider: org.apache.spark.sql.jdbc
Storage(Properties: 
[url=jdbc:h2:mem:testdb0;user=testUser;password=testPass, dbtable=TEST.PEOPLE, 
user=testUser, password=testPass])), false
```

- The output of `DESC FORMATTED`
```
...
|Storage Desc Parameters:|  
|   |
|  url   
|jdbc:h2:mem:testdb0;user=testUser;password=testPass   |   |
|  dbtable   |TEST.PEOPLE   
|   |
|  user  |testUser  
|   |
|  password  |testPass  
|   |
++--+---+
```

- The output of `DESC EXTENDED`
```
|# Detailed Table Information|CatalogTable(
Table: `default`.`tab1`
Created: Wed Nov 16 23:00:10 PST 2016
Last Access: Wed Dec 31 15:59:59 PST 1969
Type: MANAGED
Schema: [StructField(NAME,StringType,false), 
StructField(THEID,IntegerType,false)]
Provider: org.apache.spark.sql.jdbc
Storage(Location: 
file:/Users/xiaoli/IdeaProjects/sparkDelivery/spark-warehouse/tab1, Properties: 
[url=jdbc:h2:mem:testdb0;user=testUser;password=testPass, dbtable=TEST.PEOPLE, 
user=testUser, password=testPass]))|   |
```

After the fix,
- The output of SQL statement EXPLAIN
```
== Physical Plan ==
ExecutedCommand
   +- CreateDataSourceTableCommand CatalogTable(
Table: `tab1`
Created: Wed Nov 16 22:43:49 PST 2016
Last Access: Wed Dec 31 15:59:59 PST 1969
Type: MANAGED
Provider: org.apache.spark.sql.jdbc
Storage(Properties: [url=###, dbtable=TEST.PEOPLE, user=testUser, 
password=###])), false
```
- The output of `DESC FORMATTED`
```
...
|Storage Desc Parameters:|  
|   |
|  url   |###   
|   |
|  dbtable   |TEST.PEOPLE   
|   |
|  user  |testUser  
|   |
|  password  |###   
|   |
++--+---+
```

- The output of `DESC EXTENDED`
```
|# Detailed Table Information|CatalogTable(
Table: `default`.`tab1`
Created: Wed Nov 16 22:43:49 PST 2016
Last Access: Wed Dec 31 15:59:59 PST 1969
Type: MANAGED
Schema: [StructField(NAME,StringType,false), 
StructField(THEID,IntegerType,false)]
Provider: org.apache.spark.sql.jdbc
Storage(Location: 
file:/Users/xiaoli/IdeaProjects/sparkDelivery/spark-warehouse/tab1, Properties: 
[url=###, dbtable=TEST.PEOPLE, user=testUser, password=###]))|   |
```

### How was this patch tested?

Added test cases

Author: gatorsmile 

Closes #15358 from gatorsmile/maskCredentials.

(cherry picked from commit 9f273c5173c05017c3009faaf3e10f2f70a842d0)
Signed-off-by: Herman van Hovell 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a9d4febe
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a9d4febe
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a9d4febe

Branch: refs/heads/branch-2.1
Commit: a9d4febe900aa3eb9c595089e7283a64a24c8761

spark git commit: [SPARK-17783][SQL] Hide Credentials in CREATE and DESC FORMATTED/EXTENDED a PERSISTENT/TEMP Table for JDBC

2016-11-28 Thread hvanhovell
Repository: spark
Updated Branches:
  refs/heads/master 70dfdcbbf -> 9f273c517


[SPARK-17783][SQL] Hide Credentials in CREATE and DESC FORMATTED/EXTENDED a 
PERSISTENT/TEMP Table for JDBC

### What changes were proposed in this pull request?

We should never expose the Credentials in the EXPLAIN and DESC 
FORMATTED/EXTENDED command. However, below commands exposed the credentials.

In the related PR: https://github.com/apache/spark/pull/10452

> URL patterns to specify credential seems to be vary between different 
> databases.

Thus, we hide the whole `url` value if it contains the keyword `password`. We 
also hide the `password` property.

Before the fix, the command outputs look like:

``` SQL
CREATE TABLE tab1
USING org.apache.spark.sql.jdbc
OPTIONS (
 url 'jdbc:h2:mem:testdb0;user=testUser;password=testPass',
 dbtable 'TEST.PEOPLE',
 user 'testUser',
 password '$password')

DESC FORMATTED tab1
DESC EXTENDED tab1
```

Before the fix,
- The output of SQL statement EXPLAIN
```
== Physical Plan ==
ExecutedCommand
   +- CreateDataSourceTableCommand CatalogTable(
Table: `tab1`
Created: Wed Nov 16 23:00:10 PST 2016
Last Access: Wed Dec 31 15:59:59 PST 1969
Type: MANAGED
Provider: org.apache.spark.sql.jdbc
Storage(Properties: 
[url=jdbc:h2:mem:testdb0;user=testUser;password=testPass, dbtable=TEST.PEOPLE, 
user=testUser, password=testPass])), false
```

- The output of `DESC FORMATTED`
```
...
|Storage Desc Parameters:|  
|   |
|  url   
|jdbc:h2:mem:testdb0;user=testUser;password=testPass   |   |
|  dbtable   |TEST.PEOPLE   
|   |
|  user  |testUser  
|   |
|  password  |testPass  
|   |
++--+---+
```

- The output of `DESC EXTENDED`
```
|# Detailed Table Information|CatalogTable(
Table: `default`.`tab1`
Created: Wed Nov 16 23:00:10 PST 2016
Last Access: Wed Dec 31 15:59:59 PST 1969
Type: MANAGED
Schema: [StructField(NAME,StringType,false), 
StructField(THEID,IntegerType,false)]
Provider: org.apache.spark.sql.jdbc
Storage(Location: 
file:/Users/xiaoli/IdeaProjects/sparkDelivery/spark-warehouse/tab1, Properties: 
[url=jdbc:h2:mem:testdb0;user=testUser;password=testPass, dbtable=TEST.PEOPLE, 
user=testUser, password=testPass]))|   |
```

After the fix,
- The output of SQL statement EXPLAIN
```
== Physical Plan ==
ExecutedCommand
   +- CreateDataSourceTableCommand CatalogTable(
Table: `tab1`
Created: Wed Nov 16 22:43:49 PST 2016
Last Access: Wed Dec 31 15:59:59 PST 1969
Type: MANAGED
Provider: org.apache.spark.sql.jdbc
Storage(Properties: [url=###, dbtable=TEST.PEOPLE, user=testUser, 
password=###])), false
```
- The output of `DESC FORMATTED`
```
...
|Storage Desc Parameters:|  
|   |
|  url   |###   
|   |
|  dbtable   |TEST.PEOPLE   
|   |
|  user  |testUser  
|   |
|  password  |###   
|   |
++--+---+
```

- The output of `DESC EXTENDED`
```
|# Detailed Table Information|CatalogTable(
Table: `default`.`tab1`
Created: Wed Nov 16 22:43:49 PST 2016
Last Access: Wed Dec 31 15:59:59 PST 1969
Type: MANAGED
Schema: [StructField(NAME,StringType,false), 
StructField(THEID,IntegerType,false)]
Provider: org.apache.spark.sql.jdbc
Storage(Location: 
file:/Users/xiaoli/IdeaProjects/sparkDelivery/spark-warehouse/tab1, Properties: 
[url=###, dbtable=TEST.PEOPLE, user=testUser, password=###]))|   |
```

### How was this patch tested?

Added test cases

Author: gatorsmile 

Closes #15358 from gatorsmile/maskCredentials.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9f273c51
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9f273c51
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9f273c51

Branch: refs/heads/master
Commit: 9f273c5173c05017c3009faaf3e10f2f70a842d0
Parents: 70dfdcb
Author: gatorsmile 
Authored: Mon Nov 28 07:04:38 2016 -0800
Committer: Herman van Hovell 

spark git commit: [SPARK-18118][SQL] fix a compilation error due to nested JavaBeans

2016-11-28 Thread hvanhovell
Repository: spark
Updated Branches:
  refs/heads/branch-2.0 9070bd31c -> 759bd4a6a


[SPARK-18118][SQL] fix a compilation error due to nested JavaBeans

Remove this reference.

(cherry picked from commit 70dfdcbbf11c9c3174abc111afa2250236e31af2)
Signed-off-by: Herman van Hovell 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/759bd4a6
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/759bd4a6
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/759bd4a6

Branch: refs/heads/branch-2.0
Commit: 759bd4a6a7ea83b654089c6bd1d1574c709ca35f
Parents: 9070bd3
Author: Herman van Hovell 
Authored: Mon Nov 28 04:41:43 2016 -0800
Committer: Herman van Hovell 
Committed: Mon Nov 28 04:46:32 2016 -0800

--
 .../apache/spark/sql/catalyst/expressions/objects/objects.scala| 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/759bd4a6/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
index 8043475..c17c807 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
@@ -695,7 +695,7 @@ case class InitializeJavaBean(beanInstance: Expression, 
setters: Map[String, Exp
 val fieldGen = fieldValue.genCode(ctx)
 s"""
${fieldGen.code}
-   this.${javaBeanInstance}.$setterMethod(${fieldGen.value});
+   ${javaBeanInstance}.$setterMethod(${fieldGen.value});
  """
 }
 val initializeCode = ctx.splitExpressions(ctx.INPUT_ROW, initialize.toSeq)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-18118][SQL] fix a compilation error due to nested JavaBeans

2016-11-28 Thread hvanhovell
Repository: spark
Updated Branches:
  refs/heads/branch-2.1 712bd5abc -> e449f7546


[SPARK-18118][SQL] fix a compilation error due to nested JavaBeans

Remove this reference.

(cherry picked from commit 70dfdcbbf11c9c3174abc111afa2250236e31af2)
Signed-off-by: Herman van Hovell 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e449f754
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e449f754
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e449f754

Branch: refs/heads/branch-2.1
Commit: e449f7546897c5f29075e6a0913a5a6106bcbb5f
Parents: 712bd5a
Author: Herman van Hovell 
Authored: Mon Nov 28 04:41:43 2016 -0800
Committer: Herman van Hovell 
Committed: Mon Nov 28 04:45:23 2016 -0800

--
 .../apache/spark/sql/catalyst/expressions/objects/objects.scala| 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/e449f754/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
index 6952f54..e517ec1 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
@@ -905,7 +905,7 @@ case class InitializeJavaBean(beanInstance: Expression, 
setters: Map[String, Exp
 val fieldGen = fieldValue.genCode(ctx)
 s"""
${fieldGen.code}
-   this.${javaBeanInstance}.$setterMethod(${fieldGen.value});
+   ${javaBeanInstance}.$setterMethod(${fieldGen.value});
  """
 }
 val initializeCode = ctx.splitExpressions(ctx.INPUT_ROW, initialize.toSeq)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-18118][SQL] fix a compilation error due to nested JavaBeans\nRemove this reference.

2016-11-28 Thread hvanhovell
Repository: spark
Updated Branches:
  refs/heads/master f075cd9cb -> 70dfdcbbf


[SPARK-18118][SQL] fix a compilation error due to nested JavaBeans\nRemove this 
reference.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/70dfdcbb
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/70dfdcbb
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/70dfdcbb

Branch: refs/heads/master
Commit: 70dfdcbbf11c9c3174abc111afa2250236e31af2
Parents: f075cd9
Author: Herman van Hovell 
Authored: Mon Nov 28 04:41:43 2016 -0800
Committer: Herman van Hovell 
Committed: Mon Nov 28 04:41:43 2016 -0800

--
 .../apache/spark/sql/catalyst/expressions/objects/objects.scala| 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/70dfdcbb/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
index 6952f54..e517ec1 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
@@ -905,7 +905,7 @@ case class InitializeJavaBean(beanInstance: Expression, 
setters: Map[String, Exp
 val fieldGen = fieldValue.genCode(ctx)
 s"""
${fieldGen.code}
-   this.${javaBeanInstance}.$setterMethod(${fieldGen.value});
+   ${javaBeanInstance}.$setterMethod(${fieldGen.value});
  """
 }
 val initializeCode = ctx.splitExpressions(ctx.INPUT_ROW, initialize.toSeq)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-18118][SQL] fix a compilation error due to nested JavaBeans

2016-11-28 Thread hvanhovell
Repository: spark
Updated Branches:
  refs/heads/branch-2.1 d6e027e61 -> 712bd5abc


[SPARK-18118][SQL] fix a compilation error due to nested JavaBeans

## What changes were proposed in this pull request?

This PR avoids a compilation error due to more than 64KB Java byte code size. 
This error occur since generated java code `SpecificSafeProjection.apply()` for 
nested JavaBeans is too big. This PR avoids this compilation error by splitting 
a big code chunk into multiple methods by calling 
`CodegenContext.splitExpression` at `InitializeJavaBean.doGenCode`
An object reference for JavaBean is stored to an instance variable 
`javaBean...`. Then, the instance variable will be referenced in the split 
methods.

Generated code with this PR

/* 22098 */   private void apply130_0(InternalRow i) {
...
/* 22125 */ boolean isNull238 = i.isNullAt(2);
/* 22126 */ InternalRow value238 = isNull238 ? null : (i.getStruct(2, 3));
/* 22127 */ boolean isNull236 = false;
/* 22128 */ test.org.apache.spark.sql.JavaDatasetSuite$Nesting1 value236 = 
null;
/* 22129 */ if (!false && isNull238) {
/* 22130 */
/* 22131 */   final test.org.apache.spark.sql.JavaDatasetSuite$Nesting1 
value239 = null;
/* 22132 */   isNull236 = true;
/* 22133 */   value236 = value239;
/* 22134 */ } else {
/* 22135 */
/* 22136 */   final test.org.apache.spark.sql.JavaDatasetSuite$Nesting1 
value241 = false ? null : new 
test.org.apache.spark.sql.JavaDatasetSuite$Nesting1();
/* 22137 */   this.javaBean14 = value241;
/* 22138 */   if (!false) {
/* 22139 */ apply25_0(i);
/* 22140 */ apply25_1(i);
/* 22141 */ apply25_2(i);
/* 22142 */   }
/* 22143 */   isNull236 = false;
/* 22144 */   value236 = value241;
/* 22145 */ }
/* 22146 */ this.javaBean.setField2(value236);
/* 22147 */
/* 22148 */   }
...
/* 22928 */   public java.lang.Object apply(java.lang.Object _i) {
/* 22929 */ InternalRow i = (InternalRow) _i;
/* 22930 */
/* 22931 */ final 
test.org.apache.spark.sql.JavaDatasetSuite$NestedComplicatedJavaBean value1 = 
false ? null : new 
test.org.apache.spark.sql.JavaDatasetSuite$NestedComplicatedJavaBean();
/* 22932 */ this.javaBean = value1;
/* 22933 */ if (!false) {
/* 22934 */   apply130_0(i);
/* 22935 */   apply130_1(i);
/* 22936 */   apply130_2(i);
/* 22937 */   apply130_3(i);
/* 22938 */   apply130_4(i);
/* 22939 */ }
/* 22940 */ if (false) {
/* 22941 */   mutableRow.setNullAt(0);
/* 22942 */ } else {
/* 22943 */
/* 22944 */   mutableRow.update(0, value1);
/* 22945 */ }
/* 22946 */
/* 22947 */ return mutableRow;
/* 22948 */   }


## How was this patch tested?

added a test suite into `JavaDatasetSuite.java`

Author: Kazuaki Ishizaki 

Closes #16032 from kiszk/SPARK-18118.

(cherry picked from commit f075cd9cb7157819df9aec67baee8913c4ed5c53)
Signed-off-by: Herman van Hovell 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/712bd5ab
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/712bd5ab
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/712bd5ab

Branch: refs/heads/branch-2.1
Commit: 712bd5abc827c4eaf3f53bfc9155c8535584ca96
Parents: d6e027e
Author: Kazuaki Ishizaki 
Authored: Mon Nov 28 04:18:35 2016 -0800
Committer: Herman van Hovell 
Committed: Mon Nov 28 04:18:46 2016 -0800

--
 .../catalyst/expressions/objects/objects.scala  |  10 +-
 .../org/apache/spark/sql/JavaDatasetSuite.java  | 429 +++
 2 files changed, 437 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/712bd5ab/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
index 5c27179..6952f54 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
@@ -896,19 +896,25 @@ case class InitializeJavaBean(beanInstance: Expression, 
setters: Map[String, Exp
   override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
 val instanceGen = beanInstance.genCode(ctx)
 
+val javaBeanInstance = ctx.freshName("javaBean")
+val beanInstanceJavaType = ctx.javaType(beanInstance.dataType)
+ctx.addMutableState(beanInstanceJavaType, javaBeanInstance, "")
+
 val initialize = setters.map {
   case 

spark git commit: [SPARK-18118][SQL] fix a compilation error due to nested JavaBeans

2016-11-28 Thread hvanhovell
Repository: spark
Updated Branches:
  refs/heads/branch-2.0 e67ce4837 -> 9070bd31c


[SPARK-18118][SQL] fix a compilation error due to nested JavaBeans

## What changes were proposed in this pull request?

This PR avoids a compilation error due to more than 64KB Java byte code size. 
This error occur since generated java code `SpecificSafeProjection.apply()` for 
nested JavaBeans is too big. This PR avoids this compilation error by splitting 
a big code chunk into multiple methods by calling 
`CodegenContext.splitExpression` at `InitializeJavaBean.doGenCode`
An object reference for JavaBean is stored to an instance variable 
`javaBean...`. Then, the instance variable will be referenced in the split 
methods.

Generated code with this PR

/* 22098 */   private void apply130_0(InternalRow i) {
...
/* 22125 */ boolean isNull238 = i.isNullAt(2);
/* 22126 */ InternalRow value238 = isNull238 ? null : (i.getStruct(2, 3));
/* 22127 */ boolean isNull236 = false;
/* 22128 */ test.org.apache.spark.sql.JavaDatasetSuite$Nesting1 value236 = 
null;
/* 22129 */ if (!false && isNull238) {
/* 22130 */
/* 22131 */   final test.org.apache.spark.sql.JavaDatasetSuite$Nesting1 
value239 = null;
/* 22132 */   isNull236 = true;
/* 22133 */   value236 = value239;
/* 22134 */ } else {
/* 22135 */
/* 22136 */   final test.org.apache.spark.sql.JavaDatasetSuite$Nesting1 
value241 = false ? null : new 
test.org.apache.spark.sql.JavaDatasetSuite$Nesting1();
/* 22137 */   this.javaBean14 = value241;
/* 22138 */   if (!false) {
/* 22139 */ apply25_0(i);
/* 22140 */ apply25_1(i);
/* 22141 */ apply25_2(i);
/* 22142 */   }
/* 22143 */   isNull236 = false;
/* 22144 */   value236 = value241;
/* 22145 */ }
/* 22146 */ this.javaBean.setField2(value236);
/* 22147 */
/* 22148 */   }
...
/* 22928 */   public java.lang.Object apply(java.lang.Object _i) {
/* 22929 */ InternalRow i = (InternalRow) _i;
/* 22930 */
/* 22931 */ final 
test.org.apache.spark.sql.JavaDatasetSuite$NestedComplicatedJavaBean value1 = 
false ? null : new 
test.org.apache.spark.sql.JavaDatasetSuite$NestedComplicatedJavaBean();
/* 22932 */ this.javaBean = value1;
/* 22933 */ if (!false) {
/* 22934 */   apply130_0(i);
/* 22935 */   apply130_1(i);
/* 22936 */   apply130_2(i);
/* 22937 */   apply130_3(i);
/* 22938 */   apply130_4(i);
/* 22939 */ }
/* 22940 */ if (false) {
/* 22941 */   mutableRow.setNullAt(0);
/* 22942 */ } else {
/* 22943 */
/* 22944 */   mutableRow.update(0, value1);
/* 22945 */ }
/* 22946 */
/* 22947 */ return mutableRow;
/* 22948 */   }


## How was this patch tested?

added a test suite into `JavaDatasetSuite.java`

Author: Kazuaki Ishizaki 

Closes #16032 from kiszk/SPARK-18118.

(cherry picked from commit f075cd9cb7157819df9aec67baee8913c4ed5c53)
Signed-off-by: Herman van Hovell 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9070bd31
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9070bd31
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9070bd31

Branch: refs/heads/branch-2.0
Commit: 9070bd31c243d74d3c28c5208bc11e41876590ca
Parents: e67ce48
Author: Kazuaki Ishizaki 
Authored: Mon Nov 28 04:18:35 2016 -0800
Committer: Herman van Hovell 
Committed: Mon Nov 28 04:19:01 2016 -0800

--
 .../catalyst/expressions/objects/objects.scala  |  10 +-
 .../org/apache/spark/sql/JavaDatasetSuite.java  | 429 +++
 2 files changed, 437 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/9070bd31/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
index d9c29b3..8043475 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
@@ -686,19 +686,25 @@ case class InitializeJavaBean(beanInstance: Expression, 
setters: Map[String, Exp
   override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
 val instanceGen = beanInstance.genCode(ctx)
 
+val javaBeanInstance = ctx.freshName("javaBean")
+val beanInstanceJavaType = ctx.javaType(beanInstance.dataType)
+ctx.addMutableState(beanInstanceJavaType, javaBeanInstance, "")
+
 val initialize = setters.map {
   case 

spark git commit: [SPARK-18118][SQL] fix a compilation error due to nested JavaBeans

2016-11-28 Thread hvanhovell
Repository: spark
Updated Branches:
  refs/heads/master 454b80499 -> f075cd9cb


[SPARK-18118][SQL] fix a compilation error due to nested JavaBeans

## What changes were proposed in this pull request?

This PR avoids a compilation error due to more than 64KB Java byte code size. 
This error occur since generated java code `SpecificSafeProjection.apply()` for 
nested JavaBeans is too big. This PR avoids this compilation error by splitting 
a big code chunk into multiple methods by calling 
`CodegenContext.splitExpression` at `InitializeJavaBean.doGenCode`
An object reference for JavaBean is stored to an instance variable 
`javaBean...`. Then, the instance variable will be referenced in the split 
methods.

Generated code with this PR

/* 22098 */   private void apply130_0(InternalRow i) {
...
/* 22125 */ boolean isNull238 = i.isNullAt(2);
/* 22126 */ InternalRow value238 = isNull238 ? null : (i.getStruct(2, 3));
/* 22127 */ boolean isNull236 = false;
/* 22128 */ test.org.apache.spark.sql.JavaDatasetSuite$Nesting1 value236 = 
null;
/* 22129 */ if (!false && isNull238) {
/* 22130 */
/* 22131 */   final test.org.apache.spark.sql.JavaDatasetSuite$Nesting1 
value239 = null;
/* 22132 */   isNull236 = true;
/* 22133 */   value236 = value239;
/* 22134 */ } else {
/* 22135 */
/* 22136 */   final test.org.apache.spark.sql.JavaDatasetSuite$Nesting1 
value241 = false ? null : new 
test.org.apache.spark.sql.JavaDatasetSuite$Nesting1();
/* 22137 */   this.javaBean14 = value241;
/* 22138 */   if (!false) {
/* 22139 */ apply25_0(i);
/* 22140 */ apply25_1(i);
/* 22141 */ apply25_2(i);
/* 22142 */   }
/* 22143 */   isNull236 = false;
/* 22144 */   value236 = value241;
/* 22145 */ }
/* 22146 */ this.javaBean.setField2(value236);
/* 22147 */
/* 22148 */   }
...
/* 22928 */   public java.lang.Object apply(java.lang.Object _i) {
/* 22929 */ InternalRow i = (InternalRow) _i;
/* 22930 */
/* 22931 */ final 
test.org.apache.spark.sql.JavaDatasetSuite$NestedComplicatedJavaBean value1 = 
false ? null : new 
test.org.apache.spark.sql.JavaDatasetSuite$NestedComplicatedJavaBean();
/* 22932 */ this.javaBean = value1;
/* 22933 */ if (!false) {
/* 22934 */   apply130_0(i);
/* 22935 */   apply130_1(i);
/* 22936 */   apply130_2(i);
/* 22937 */   apply130_3(i);
/* 22938 */   apply130_4(i);
/* 22939 */ }
/* 22940 */ if (false) {
/* 22941 */   mutableRow.setNullAt(0);
/* 22942 */ } else {
/* 22943 */
/* 22944 */   mutableRow.update(0, value1);
/* 22945 */ }
/* 22946 */
/* 22947 */ return mutableRow;
/* 22948 */   }


## How was this patch tested?

added a test suite into `JavaDatasetSuite.java`

Author: Kazuaki Ishizaki 

Closes #16032 from kiszk/SPARK-18118.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f075cd9c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f075cd9c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f075cd9c

Branch: refs/heads/master
Commit: f075cd9cb7157819df9aec67baee8913c4ed5c53
Parents: 454b804
Author: Kazuaki Ishizaki 
Authored: Mon Nov 28 04:18:35 2016 -0800
Committer: Herman van Hovell 
Committed: Mon Nov 28 04:18:35 2016 -0800

--
 .../catalyst/expressions/objects/objects.scala  |  10 +-
 .../org/apache/spark/sql/JavaDatasetSuite.java  | 429 +++
 2 files changed, 437 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/f075cd9c/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
index 5c27179..6952f54 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
@@ -896,19 +896,25 @@ case class InitializeJavaBean(beanInstance: Expression, 
setters: Map[String, Exp
   override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
 val instanceGen = beanInstance.genCode(ctx)
 
+val javaBeanInstance = ctx.freshName("javaBean")
+val beanInstanceJavaType = ctx.javaType(beanInstance.dataType)
+ctx.addMutableState(beanInstanceJavaType, javaBeanInstance, "")
+
 val initialize = setters.map {
   case (setterMethod, fieldValue) =>
 val fieldGen = fieldValue.genCode(ctx)
 s"""
${fieldGen.code}
-   

spark git commit: [SPARK-18604][SQL] Make sure CollapseWindow returns the attributes in the same order.

2016-11-28 Thread hvanhovell
Repository: spark
Updated Branches:
  refs/heads/branch-2.1 886f880df -> d6e027e61


[SPARK-18604][SQL] Make sure CollapseWindow returns the attributes in the same 
order.

## What changes were proposed in this pull request?
The `CollapseWindow` optimizer rule changes the order of output attributes. 
This modifies the output of the plan, which the optimizer cannot do. This also 
breaks things like `collect()` for which we use a `RowEncoder` that assumes 
that the output attributes of the executed plan are equal to those outputted by 
the logical plan.

## How was this patch tested?
I have updated an incorrect test in `CollapseWindowSuite`.

Author: Herman van Hovell 

Closes #16027 from hvanhovell/SPARK-18604.

(cherry picked from commit 454b8049916a0353772a0ea5cfe14b62cbd81df4)
Signed-off-by: Herman van Hovell 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d6e027e6
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d6e027e6
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d6e027e6

Branch: refs/heads/branch-2.1
Commit: d6e027e610bdff0123e71925735ecedcf4787b83
Parents: 886f880
Author: Herman van Hovell 
Authored: Mon Nov 28 02:56:26 2016 -0800
Committer: Herman van Hovell 
Committed: Mon Nov 28 02:56:38 2016 -0800

--
 .../spark/sql/catalyst/optimizer/Optimizer.scala   |  2 +-
 .../sql/catalyst/optimizer/CollapseWindowSuite.scala   | 13 -
 2 files changed, 9 insertions(+), 6 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/d6e027e6/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index 6ba8b33..2679e02 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -545,7 +545,7 @@ object CollapseRepartition extends Rule[LogicalPlan] {
 object CollapseWindow extends Rule[LogicalPlan] {
   def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
 case w @ Window(we1, ps1, os1, Window(we2, ps2, os2, grandChild)) if ps1 
== ps2 && os1 == os2 =>
-  w.copy(windowExpressions = we1 ++ we2, child = grandChild)
+  w.copy(windowExpressions = we2 ++ we1, child = grandChild)
   }
 }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/d6e027e6/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CollapseWindowSuite.scala
--
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CollapseWindowSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CollapseWindowSuite.scala
index 797076e..3f7d1d9 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CollapseWindowSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CollapseWindowSuite.scala
@@ -46,12 +46,15 @@ class CollapseWindowSuite extends PlanTest {
   .window(Seq(sum(b).as('sum_b)), partitionSpec1, orderSpec1)
   .window(Seq(avg(b).as('avg_b)), partitionSpec1, orderSpec1)
 
-val optimized = Optimize.execute(query.analyze)
+val analyzed = query.analyze
+val optimized = Optimize.execute(analyzed)
+assert(analyzed.output === optimized.output)
+
 val correctAnswer = testRelation.window(Seq(
-avg(b).as('avg_b),
-sum(b).as('sum_b),
-max(a).as('max_a),
-min(a).as('min_a)), partitionSpec1, orderSpec1)
+  min(a).as('min_a),
+  max(a).as('max_a),
+  sum(b).as('sum_b),
+  avg(b).as('avg_b)), partitionSpec1, orderSpec1)
 
 comparePlans(optimized, correctAnswer)
   }


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-18604][SQL] Make sure CollapseWindow returns the attributes in the same order.

2016-11-28 Thread hvanhovell
Repository: spark
Updated Branches:
  refs/heads/master 87141622e -> 454b80499


[SPARK-18604][SQL] Make sure CollapseWindow returns the attributes in the same 
order.

## What changes were proposed in this pull request?
The `CollapseWindow` optimizer rule changes the order of output attributes. 
This modifies the output of the plan, which the optimizer cannot do. This also 
breaks things like `collect()` for which we use a `RowEncoder` that assumes 
that the output attributes of the executed plan are equal to those outputted by 
the logical plan.

## How was this patch tested?
I have updated an incorrect test in `CollapseWindowSuite`.

Author: Herman van Hovell 

Closes #16027 from hvanhovell/SPARK-18604.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/454b8049
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/454b8049
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/454b8049

Branch: refs/heads/master
Commit: 454b8049916a0353772a0ea5cfe14b62cbd81df4
Parents: 8714162
Author: Herman van Hovell 
Authored: Mon Nov 28 02:56:26 2016 -0800
Committer: Herman van Hovell 
Committed: Mon Nov 28 02:56:26 2016 -0800

--
 .../spark/sql/catalyst/optimizer/Optimizer.scala   |  2 +-
 .../sql/catalyst/optimizer/CollapseWindowSuite.scala   | 13 -
 2 files changed, 9 insertions(+), 6 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/454b8049/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index 6ba8b33..2679e02 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -545,7 +545,7 @@ object CollapseRepartition extends Rule[LogicalPlan] {
 object CollapseWindow extends Rule[LogicalPlan] {
   def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
 case w @ Window(we1, ps1, os1, Window(we2, ps2, os2, grandChild)) if ps1 
== ps2 && os1 == os2 =>
-  w.copy(windowExpressions = we1 ++ we2, child = grandChild)
+  w.copy(windowExpressions = we2 ++ we1, child = grandChild)
   }
 }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/454b8049/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CollapseWindowSuite.scala
--
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CollapseWindowSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CollapseWindowSuite.scala
index 797076e..3f7d1d9 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CollapseWindowSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CollapseWindowSuite.scala
@@ -46,12 +46,15 @@ class CollapseWindowSuite extends PlanTest {
   .window(Seq(sum(b).as('sum_b)), partitionSpec1, orderSpec1)
   .window(Seq(avg(b).as('avg_b)), partitionSpec1, orderSpec1)
 
-val optimized = Optimize.execute(query.analyze)
+val analyzed = query.analyze
+val optimized = Optimize.execute(analyzed)
+assert(analyzed.output === optimized.output)
+
 val correctAnswer = testRelation.window(Seq(
-avg(b).as('avg_b),
-sum(b).as('sum_b),
-max(a).as('max_a),
-min(a).as('min_a)), partitionSpec1, orderSpec1)
+  min(a).as('min_a),
+  max(a).as('max_a),
+  sum(b).as('sum_b),
+  avg(b).as('avg_b)), partitionSpec1, orderSpec1)
 
 comparePlans(optimized, correctAnswer)
   }


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org