spark git commit: [SPARK-18339][SPARK-18513][SQL] Don't push down current_timestamp for filters in StructuredStreaming and persist batch and watermark timestamps to offset log.
Repository: spark Updated Branches: refs/heads/branch-2.1 27a1a5c99 -> ea6957da2 [SPARK-18339][SPARK-18513][SQL] Don't push down current_timestamp for filters in StructuredStreaming and persist batch and watermark timestamps to offset log. ## What changes were proposed in this pull request? For the following workflow: 1. I have a column called time which is at minute level precision in a Streaming DataFrame 2. I want to perform groupBy time, count 3. Then I want my MemorySink to only have the last 30 minutes of counts and I perform this by .where('time >= current_timestamp().cast("long") - 30 * 60) what happens is that the `filter` gets pushed down before the aggregation, and the filter happens on the source data for the aggregation instead of the result of the aggregation (where I actually want to filter). I guess the main issue here is that `current_timestamp` is non-deterministic in the streaming context and shouldn't be pushed down the filter. Does this require us to store the `current_timestamp` for each trigger of the streaming job, that is something to discuss. Furthermore, we want to persist current batch timestamp and watermark timestamp to the offset log so that these values are consistent across multiple executions of the same batch. brkyvz zsxwing tdas ## How was this patch tested? A test was added to StreamingAggregationSuite ensuring the above use case is handled. The test injects a stream of time values (in seconds) to a query that runs in complete mode and only outputs the (count) aggregation results for the past 10 seconds. Author: Tyson CondieCloses #15949 from tcondie/SPARK-18339. (cherry picked from commit 3c0beea4752d39ee630a107316f40aff4a1b4ae7) Signed-off-by: Tathagata Das Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ea6957da Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ea6957da Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ea6957da Branch: refs/heads/branch-2.1 Commit: ea6957da20d3e03b95342a03a188c7ab5880cac7 Parents: 27a1a5c Author: Tyson Condie Authored: Mon Nov 28 23:07:17 2016 -0800 Committer: Tathagata Das Committed: Mon Nov 28 23:08:47 2016 -0800 -- .../expressions/datetimeExpressions.scala | 33 +- .../streaming/IncrementalExecution.scala| 19 +++- .../execution/streaming/StreamExecution.scala | 67 ++--- .../execution/streaming/StreamProgress.scala| 4 +- .../spark/sql/execution/streaming/memory.scala | 4 + .../StreamExecutionMetadataSuite.scala | 35 +++ .../streaming/StreamingAggregationSuite.scala | 100 +++ .../sql/streaming/StreamingQuerySuite.scala | 4 +- .../spark/sql/streaming/WatermarkSuite.scala| 40 +--- 9 files changed, 273 insertions(+), 33 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/ea6957da/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala index 1db1d19..ef1ac36 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala @@ -17,14 +17,14 @@ package org.apache.spark.sql.catalyst.expressions +import java.sql.Timestamp import java.text.SimpleDateFormat import java.util.{Calendar, Locale, TimeZone} import scala.util.Try import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, CodegenFallback, - ExprCode} +import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, CodegenFallback, ExprCode} import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String} @@ -72,6 +72,35 @@ case class CurrentTimestamp() extends LeafExpression with CodegenFallback { } /** + * Expression representing the current batch time, which is used by StreamExecution to + * 1. prevent optimizer from pushing this expression below a stateful operator + * 2. allow IncrementalExecution to substitute this expression with a Literal(timestamp) + * + * There is no code generation since this expression should be replaced with a literal. + */ +case class CurrentBatchTimestamp(timestampMs: Long, dataType: DataType) + extends LeafExpression with Nondeterministic with
spark git commit: [SPARK-18339][SPARK-18513][SQL] Don't push down current_timestamp for filters in StructuredStreaming and persist batch and watermark timestamps to offset log.
Repository: spark Updated Branches: refs/heads/master e2318ede0 -> 3c0beea47 [SPARK-18339][SPARK-18513][SQL] Don't push down current_timestamp for filters in StructuredStreaming and persist batch and watermark timestamps to offset log. ## What changes were proposed in this pull request? For the following workflow: 1. I have a column called time which is at minute level precision in a Streaming DataFrame 2. I want to perform groupBy time, count 3. Then I want my MemorySink to only have the last 30 minutes of counts and I perform this by .where('time >= current_timestamp().cast("long") - 30 * 60) what happens is that the `filter` gets pushed down before the aggregation, and the filter happens on the source data for the aggregation instead of the result of the aggregation (where I actually want to filter). I guess the main issue here is that `current_timestamp` is non-deterministic in the streaming context and shouldn't be pushed down the filter. Does this require us to store the `current_timestamp` for each trigger of the streaming job, that is something to discuss. Furthermore, we want to persist current batch timestamp and watermark timestamp to the offset log so that these values are consistent across multiple executions of the same batch. brkyvz zsxwing tdas ## How was this patch tested? A test was added to StreamingAggregationSuite ensuring the above use case is handled. The test injects a stream of time values (in seconds) to a query that runs in complete mode and only outputs the (count) aggregation results for the past 10 seconds. Author: Tyson CondieCloses #15949 from tcondie/SPARK-18339. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3c0beea4 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3c0beea4 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3c0beea4 Branch: refs/heads/master Commit: 3c0beea4752d39ee630a107316f40aff4a1b4ae7 Parents: e2318ed Author: Tyson Condie Authored: Mon Nov 28 23:07:17 2016 -0800 Committer: Tathagata Das Committed: Mon Nov 28 23:07:17 2016 -0800 -- .../expressions/datetimeExpressions.scala | 33 +- .../streaming/IncrementalExecution.scala| 19 +++- .../execution/streaming/StreamExecution.scala | 67 ++--- .../execution/streaming/StreamProgress.scala| 4 +- .../spark/sql/execution/streaming/memory.scala | 4 + .../StreamExecutionMetadataSuite.scala | 35 +++ .../streaming/StreamingAggregationSuite.scala | 100 +++ .../sql/streaming/StreamingQuerySuite.scala | 4 +- .../spark/sql/streaming/WatermarkSuite.scala| 40 +--- 9 files changed, 273 insertions(+), 33 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/3c0beea4/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala index 1db1d19..ef1ac36 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala @@ -17,14 +17,14 @@ package org.apache.spark.sql.catalyst.expressions +import java.sql.Timestamp import java.text.SimpleDateFormat import java.util.{Calendar, Locale, TimeZone} import scala.util.Try import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, CodegenFallback, - ExprCode} +import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, CodegenFallback, ExprCode} import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String} @@ -72,6 +72,35 @@ case class CurrentTimestamp() extends LeafExpression with CodegenFallback { } /** + * Expression representing the current batch time, which is used by StreamExecution to + * 1. prevent optimizer from pushing this expression below a stateful operator + * 2. allow IncrementalExecution to substitute this expression with a Literal(timestamp) + * + * There is no code generation since this expression should be replaced with a literal. + */ +case class CurrentBatchTimestamp(timestampMs: Long, dataType: DataType) + extends LeafExpression with Nondeterministic with CodegenFallback { + + override def nullable: Boolean = false + + override def prettyName: String = "current_batch_timestamp" + +
spark git commit: [SPARK-18544][SQL] Append with df.saveAsTable writes data to wrong location
Repository: spark Updated Branches: refs/heads/branch-2.1 1759cf69a -> 27a1a5c99 [SPARK-18544][SQL] Append with df.saveAsTable writes data to wrong location ## What changes were proposed in this pull request? We failed to properly propagate table metadata for existing tables for the saveAsTable command. This caused a downstream component to think the table was MANAGED, writing data to the wrong location. ## How was this patch tested? Unit test that fails before the patch. Author: Eric LiangCloses #15983 from ericl/spark-18544. (cherry picked from commit e2318ede04fa7a756d1c8151775e1f2406a176ca) Signed-off-by: Reynold Xin Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/27a1a5c9 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/27a1a5c9 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/27a1a5c9 Branch: refs/heads/branch-2.1 Commit: 27a1a5c99ff471ee15b56995d56cfd39b3ffe6e8 Parents: 1759cf6 Author: Eric Liang Authored: Mon Nov 28 21:58:01 2016 -0800 Committer: Reynold Xin Committed: Mon Nov 28 21:58:10 2016 -0800 -- .../org/apache/spark/sql/DataFrameWriter.scala | 21 .../command/createDataSourceTables.scala| 3 ++- .../PartitionProviderCompatibilitySuite.scala | 19 ++ 3 files changed, 34 insertions(+), 9 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/27a1a5c9/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index 2d86342..8294e41 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -373,8 +373,19 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { throw new AnalysisException(s"Table $tableIdent already exists.") case _ => -val storage = DataSource.buildStorageFormatFromOptions(extraOptions.toMap) -val tableType = if (storage.locationUri.isDefined) { +val existingTable = if (tableExists) { + Some(df.sparkSession.sessionState.catalog.getTableMetadata(tableIdent)) +} else { + None +} +val storage = if (tableExists) { + existingTable.get.storage +} else { + DataSource.buildStorageFormatFromOptions(extraOptions.toMap) +} +val tableType = if (tableExists) { + existingTable.get.tableType +} else if (storage.locationUri.isDefined) { CatalogTableType.EXTERNAL } else { CatalogTableType.MANAGED @@ -391,12 +402,6 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { ) df.sparkSession.sessionState.executePlan( CreateTable(tableDesc, mode, Some(df.logicalPlan))).toRdd -if (tableDesc.partitionColumnNames.nonEmpty && -df.sparkSession.sqlContext.conf.manageFilesourcePartitions) { - // Need to recover partitions into the metastore so our saved data is visible. - df.sparkSession.sessionState.executePlan( -AlterTableRecoverPartitionsCommand(tableDesc.identifier)).toRdd -} } } http://git-wip-us.apache.org/repos/asf/spark/blob/27a1a5c9/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala index add732c..422700c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala @@ -212,7 +212,8 @@ case class CreateDataSourceTableAsSelectCommand( className = provider, partitionColumns = table.partitionColumnNames, bucketSpec = table.bucketSpec, - options = table.storage.properties ++ pathOption) + options = table.storage.properties ++ pathOption, + catalogTable = Some(table)) val result = try { dataSource.write(mode, df) http://git-wip-us.apache.org/repos/asf/spark/blob/27a1a5c9/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionProviderCompatibilitySuite.scala -- diff --git
spark git commit: [SPARK-18544][SQL] Append with df.saveAsTable writes data to wrong location
Repository: spark Updated Branches: refs/heads/master d449988b8 -> e2318ede0 [SPARK-18544][SQL] Append with df.saveAsTable writes data to wrong location ## What changes were proposed in this pull request? We failed to properly propagate table metadata for existing tables for the saveAsTable command. This caused a downstream component to think the table was MANAGED, writing data to the wrong location. ## How was this patch tested? Unit test that fails before the patch. Author: Eric LiangCloses #15983 from ericl/spark-18544. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e2318ede Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e2318ede Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e2318ede Branch: refs/heads/master Commit: e2318ede04fa7a756d1c8151775e1f2406a176ca Parents: d449988 Author: Eric Liang Authored: Mon Nov 28 21:58:01 2016 -0800 Committer: Reynold Xin Committed: Mon Nov 28 21:58:01 2016 -0800 -- .../org/apache/spark/sql/DataFrameWriter.scala | 21 .../command/createDataSourceTables.scala| 3 ++- .../PartitionProviderCompatibilitySuite.scala | 19 ++ 3 files changed, 34 insertions(+), 9 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/e2318ede/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index 2d86342..8294e41 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -373,8 +373,19 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { throw new AnalysisException(s"Table $tableIdent already exists.") case _ => -val storage = DataSource.buildStorageFormatFromOptions(extraOptions.toMap) -val tableType = if (storage.locationUri.isDefined) { +val existingTable = if (tableExists) { + Some(df.sparkSession.sessionState.catalog.getTableMetadata(tableIdent)) +} else { + None +} +val storage = if (tableExists) { + existingTable.get.storage +} else { + DataSource.buildStorageFormatFromOptions(extraOptions.toMap) +} +val tableType = if (tableExists) { + existingTable.get.tableType +} else if (storage.locationUri.isDefined) { CatalogTableType.EXTERNAL } else { CatalogTableType.MANAGED @@ -391,12 +402,6 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { ) df.sparkSession.sessionState.executePlan( CreateTable(tableDesc, mode, Some(df.logicalPlan))).toRdd -if (tableDesc.partitionColumnNames.nonEmpty && -df.sparkSession.sqlContext.conf.manageFilesourcePartitions) { - // Need to recover partitions into the metastore so our saved data is visible. - df.sparkSession.sessionState.executePlan( -AlterTableRecoverPartitionsCommand(tableDesc.identifier)).toRdd -} } } http://git-wip-us.apache.org/repos/asf/spark/blob/e2318ede/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala index add732c..422700c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala @@ -212,7 +212,8 @@ case class CreateDataSourceTableAsSelectCommand( className = provider, partitionColumns = table.partitionColumnNames, bucketSpec = table.bucketSpec, - options = table.storage.properties ++ pathOption) + options = table.storage.properties ++ pathOption, + catalogTable = Some(table)) val result = try { dataSource.write(mode, df) http://git-wip-us.apache.org/repos/asf/spark/blob/e2318ede/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionProviderCompatibilitySuite.scala -- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionProviderCompatibilitySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionProviderCompatibilitySuite.scala index
spark git commit: [SPARK-18058][SQL][TRIVIAL] Use dataType.sameResult(...) instead equality on asNullable datatypes
Repository: spark Updated Branches: refs/heads/branch-2.1 c4cbdc864 -> 1759cf69a [SPARK-18058][SQL][TRIVIAL] Use dataType.sameResult(...) instead equality on asNullable datatypes ## What changes were proposed in this pull request? This is absolutely minor. PR https://github.com/apache/spark/pull/15595 uses `dt1.asNullable == dt2.asNullable` expressions in a few places. It is however more efficient to call `dt1.sameType(dt2)`. I have replaced every instance of the first pattern with the second pattern (3/5 were introduced by #15595). ## How was this patch tested? Existing tests. Author: Herman van HovellCloses #16041 from hvanhovell/SPARK-18058. (cherry picked from commit d449988b8819775fcfd27da53bb5143a7aab01f7) Signed-off-by: Reynold Xin Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1759cf69 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1759cf69 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1759cf69 Branch: refs/heads/branch-2.1 Commit: 1759cf69aa1a7059a5fe78d012a54bc0ba02677c Parents: c4cbdc8 Author: Herman van Hovell Authored: Mon Nov 28 21:43:33 2016 -0800 Committer: Reynold Xin Committed: Mon Nov 28 21:43:38 2016 -0800 -- .../org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala | 2 +- .../sql/catalyst/expressions/conditionalExpressions.scala | 2 +- .../sql/catalyst/plans/logical/basicLogicalOperators.scala | 6 +++--- .../spark/sql/execution/datasources/DataSourceStrategy.scala | 2 +- 4 files changed, 6 insertions(+), 6 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/1759cf69/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala index 26d2638..db41752 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala @@ -306,7 +306,7 @@ trait CheckAnalysis extends PredicateHelper { // Check if the data types match. dataTypes(child).zip(ref).zipWithIndex.foreach { case ((dt1, dt2), ci) => // SPARK-18058: we shall not care about the nullability of columns -if (dt1.asNullable != dt2.asNullable) { +if (!dt1.sameType(dt2)) { failAnalysis( s""" |${operator.nodeName} can only be performed on tables with the compatible http://git-wip-us.apache.org/repos/asf/spark/blob/1759cf69/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/conditionalExpressions.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/conditionalExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/conditionalExpressions.scala index a7d9e2d..afc190e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/conditionalExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/conditionalExpressions.scala @@ -41,7 +41,7 @@ case class If(predicate: Expression, trueValue: Expression, falseValue: Expressi if (predicate.dataType != BooleanType) { TypeCheckResult.TypeCheckFailure( s"type of predicate expression in If should be boolean, not ${predicate.dataType}") -} else if (trueValue.dataType.asNullable != falseValue.dataType.asNullable) { +} else if (!trueValue.dataType.sameType(falseValue.dataType)) { TypeCheckResult.TypeCheckFailure(s"differing types in '$sql' " + s"(${trueValue.dataType.simpleString} and ${falseValue.dataType.simpleString}).") } else { http://git-wip-us.apache.org/repos/asf/spark/blob/1759cf69/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala index dd6c8fd..da42df3 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala +++
spark git commit: [SPARK-18058][SQL][TRIVIAL] Use dataType.sameResult(...) instead equality on asNullable datatypes
Repository: spark Updated Branches: refs/heads/master 8b325b17e -> d449988b8 [SPARK-18058][SQL][TRIVIAL] Use dataType.sameResult(...) instead equality on asNullable datatypes ## What changes were proposed in this pull request? This is absolutely minor. PR https://github.com/apache/spark/pull/15595 uses `dt1.asNullable == dt2.asNullable` expressions in a few places. It is however more efficient to call `dt1.sameType(dt2)`. I have replaced every instance of the first pattern with the second pattern (3/5 were introduced by #15595). ## How was this patch tested? Existing tests. Author: Herman van HovellCloses #16041 from hvanhovell/SPARK-18058. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d449988b Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d449988b Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d449988b Branch: refs/heads/master Commit: d449988b8819775fcfd27da53bb5143a7aab01f7 Parents: 8b325b1 Author: Herman van Hovell Authored: Mon Nov 28 21:43:33 2016 -0800 Committer: Reynold Xin Committed: Mon Nov 28 21:43:33 2016 -0800 -- .../org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala | 2 +- .../sql/catalyst/expressions/conditionalExpressions.scala | 2 +- .../sql/catalyst/plans/logical/basicLogicalOperators.scala | 6 +++--- .../spark/sql/execution/datasources/DataSourceStrategy.scala | 2 +- 4 files changed, 6 insertions(+), 6 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/d449988b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala index 26d2638..db41752 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala @@ -306,7 +306,7 @@ trait CheckAnalysis extends PredicateHelper { // Check if the data types match. dataTypes(child).zip(ref).zipWithIndex.foreach { case ((dt1, dt2), ci) => // SPARK-18058: we shall not care about the nullability of columns -if (dt1.asNullable != dt2.asNullable) { +if (!dt1.sameType(dt2)) { failAnalysis( s""" |${operator.nodeName} can only be performed on tables with the compatible http://git-wip-us.apache.org/repos/asf/spark/blob/d449988b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/conditionalExpressions.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/conditionalExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/conditionalExpressions.scala index a7d9e2d..afc190e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/conditionalExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/conditionalExpressions.scala @@ -41,7 +41,7 @@ case class If(predicate: Expression, trueValue: Expression, falseValue: Expressi if (predicate.dataType != BooleanType) { TypeCheckResult.TypeCheckFailure( s"type of predicate expression in If should be boolean, not ${predicate.dataType}") -} else if (trueValue.dataType.asNullable != falseValue.dataType.asNullable) { +} else if (!trueValue.dataType.sameType(falseValue.dataType)) { TypeCheckResult.TypeCheckFailure(s"differing types in '$sql' " + s"(${trueValue.dataType.simpleString} and ${falseValue.dataType.simpleString}).") } else { http://git-wip-us.apache.org/repos/asf/spark/blob/d449988b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala index 4e333d5..7aaefc8 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala @@ -134,7 +134,7 @@ abstract class SetOperation(left: LogicalPlan, right: LogicalPlan) extends Binar childrenResolved
spark git commit: [SPARK-18547][CORE] Propagate I/O encryption key when executors register.
Repository: spark Updated Branches: refs/heads/branch-2.1 45e2b3c0e -> c4cbdc864 [SPARK-18547][CORE] Propagate I/O encryption key when executors register. This change modifies the method used to propagate encryption keys used during shuffle. Instead of relying on YARN's UserGroupInformation credential propagation, this change explicitly distributes the key using the messages exchanged between driver and executor during registration. When RPC encryption is enabled, this means key propagation is also secure. This allows shuffle encryption to work in non-YARN mode, which means that it's easier to write unit tests for areas of the code that are affected by the feature. The key is stored in the SecurityManager; because there are many instances of that class used in the code, the key is only guaranteed to exist in the instance managed by the SparkEnv. This path was chosen to avoid storing the key in the SparkConf, which would risk having the key being written to disk as part of the configuration (as, for example, is done when starting YARN applications). Tested by new and existing unit tests (which were moved from the YARN module to core), and by running apps with shuffle encryption enabled. Author: Marcelo VanzinCloses #15981 from vanzin/SPARK-18547. (cherry picked from commit 8b325b17ecdf013b7a6edcb7ee3773546bd914df) Signed-off-by: Shixiong Zhu Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c4cbdc86 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c4cbdc86 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c4cbdc86 Branch: refs/heads/branch-2.1 Commit: c4cbdc864f7191ab1d49cdc360fe78ec16f48db5 Parents: 45e2b3c Author: Marcelo Vanzin Authored: Mon Nov 28 21:10:57 2016 -0800 Committer: Shixiong Zhu Committed: Mon Nov 28 21:11:04 2016 -0800 -- .../org/apache/spark/SecurityManager.scala | 23 +--- .../scala/org/apache/spark/SparkContext.scala | 4 - .../main/scala/org/apache/spark/SparkEnv.scala | 33 +++-- .../executor/CoarseGrainedExecutorBackend.scala | 6 +- .../cluster/CoarseGrainedClusterMessage.scala | 7 +- .../cluster/CoarseGrainedSchedulerBackend.scala | 6 +- .../spark/security/CryptoStreamUtils.scala | 28 ++-- .../spark/serializer/SerializerManager.scala| 18 ++- .../spark/security/CryptoStreamUtilsSuite.scala | 135 +++ docs/configuration.md | 3 +- .../spark/executor/MesosExecutorBackend.scala | 2 +- .../cluster/mesos/MesosClusterManager.scala | 4 + .../mesos/MesosClusterManagerSuite.scala| 11 +- .../org/apache/spark/deploy/yarn/Client.scala | 5 - .../spark/deploy/yarn/IOEncryptionSuite.scala | 108 --- 15 files changed, 166 insertions(+), 227 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/c4cbdc86/core/src/main/scala/org/apache/spark/SecurityManager.scala -- diff --git a/core/src/main/scala/org/apache/spark/SecurityManager.scala b/core/src/main/scala/org/apache/spark/SecurityManager.scala index 199365a..87fe563 100644 --- a/core/src/main/scala/org/apache/spark/SecurityManager.scala +++ b/core/src/main/scala/org/apache/spark/SecurityManager.scala @@ -21,7 +21,6 @@ import java.lang.{Byte => JByte} import java.net.{Authenticator, PasswordAuthentication} import java.security.{KeyStore, SecureRandom} import java.security.cert.X509Certificate -import javax.crypto.KeyGenerator import javax.net.ssl._ import com.google.common.hash.HashCodes @@ -33,7 +32,6 @@ import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.internal.Logging import org.apache.spark.internal.config._ import org.apache.spark.network.sasl.SecretKeyHolder -import org.apache.spark.security.CryptoStreamUtils._ import org.apache.spark.util.Utils /** @@ -185,7 +183,9 @@ import org.apache.spark.util.Utils * setting `spark.ssl.useNodeLocalConf` to `true`. */ -private[spark] class SecurityManager(sparkConf: SparkConf) +private[spark] class SecurityManager( +sparkConf: SparkConf, +ioEncryptionKey: Option[Array[Byte]] = None) extends Logging with SecretKeyHolder { import SecurityManager._ @@ -415,6 +415,8 @@ private[spark] class SecurityManager(sparkConf: SparkConf) logInfo("Changing acls enabled to: " + aclsOn) } + def getIOEncryptionKey(): Option[Array[Byte]] = ioEncryptionKey + /** * Generates or looks up the secret key. * @@ -559,19 +561,4 @@ private[spark] object SecurityManager { // key used to store the spark secret in the Hadoop UGI val SECRET_LOOKUP_KEY = "sparkCookie" - /** - * Setup the cryptographic key used by IO
spark git commit: [SPARK-18547][CORE] Propagate I/O encryption key when executors register.
Repository: spark Updated Branches: refs/heads/master 1633ff3b6 -> 8b325b17e [SPARK-18547][CORE] Propagate I/O encryption key when executors register. This change modifies the method used to propagate encryption keys used during shuffle. Instead of relying on YARN's UserGroupInformation credential propagation, this change explicitly distributes the key using the messages exchanged between driver and executor during registration. When RPC encryption is enabled, this means key propagation is also secure. This allows shuffle encryption to work in non-YARN mode, which means that it's easier to write unit tests for areas of the code that are affected by the feature. The key is stored in the SecurityManager; because there are many instances of that class used in the code, the key is only guaranteed to exist in the instance managed by the SparkEnv. This path was chosen to avoid storing the key in the SparkConf, which would risk having the key being written to disk as part of the configuration (as, for example, is done when starting YARN applications). Tested by new and existing unit tests (which were moved from the YARN module to core), and by running apps with shuffle encryption enabled. Author: Marcelo VanzinCloses #15981 from vanzin/SPARK-18547. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8b325b17 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8b325b17 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8b325b17 Branch: refs/heads/master Commit: 8b325b17ecdf013b7a6edcb7ee3773546bd914df Parents: 1633ff3 Author: Marcelo Vanzin Authored: Mon Nov 28 21:10:57 2016 -0800 Committer: Shixiong Zhu Committed: Mon Nov 28 21:10:57 2016 -0800 -- .../org/apache/spark/SecurityManager.scala | 23 +--- .../scala/org/apache/spark/SparkContext.scala | 4 - .../main/scala/org/apache/spark/SparkEnv.scala | 33 +++-- .../executor/CoarseGrainedExecutorBackend.scala | 6 +- .../cluster/CoarseGrainedClusterMessage.scala | 7 +- .../cluster/CoarseGrainedSchedulerBackend.scala | 6 +- .../spark/security/CryptoStreamUtils.scala | 28 ++-- .../spark/serializer/SerializerManager.scala| 18 ++- .../spark/security/CryptoStreamUtilsSuite.scala | 135 +++ docs/configuration.md | 3 +- .../spark/executor/MesosExecutorBackend.scala | 2 +- .../cluster/mesos/MesosClusterManager.scala | 4 + .../mesos/MesosClusterManagerSuite.scala| 11 +- .../org/apache/spark/deploy/yarn/Client.scala | 5 - .../spark/deploy/yarn/IOEncryptionSuite.scala | 108 --- 15 files changed, 166 insertions(+), 227 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/8b325b17/core/src/main/scala/org/apache/spark/SecurityManager.scala -- diff --git a/core/src/main/scala/org/apache/spark/SecurityManager.scala b/core/src/main/scala/org/apache/spark/SecurityManager.scala index 199365a..87fe563 100644 --- a/core/src/main/scala/org/apache/spark/SecurityManager.scala +++ b/core/src/main/scala/org/apache/spark/SecurityManager.scala @@ -21,7 +21,6 @@ import java.lang.{Byte => JByte} import java.net.{Authenticator, PasswordAuthentication} import java.security.{KeyStore, SecureRandom} import java.security.cert.X509Certificate -import javax.crypto.KeyGenerator import javax.net.ssl._ import com.google.common.hash.HashCodes @@ -33,7 +32,6 @@ import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.internal.Logging import org.apache.spark.internal.config._ import org.apache.spark.network.sasl.SecretKeyHolder -import org.apache.spark.security.CryptoStreamUtils._ import org.apache.spark.util.Utils /** @@ -185,7 +183,9 @@ import org.apache.spark.util.Utils * setting `spark.ssl.useNodeLocalConf` to `true`. */ -private[spark] class SecurityManager(sparkConf: SparkConf) +private[spark] class SecurityManager( +sparkConf: SparkConf, +ioEncryptionKey: Option[Array[Byte]] = None) extends Logging with SecretKeyHolder { import SecurityManager._ @@ -415,6 +415,8 @@ private[spark] class SecurityManager(sparkConf: SparkConf) logInfo("Changing acls enabled to: " + aclsOn) } + def getIOEncryptionKey(): Option[Array[Byte]] = ioEncryptionKey + /** * Generates or looks up the secret key. * @@ -559,19 +561,4 @@ private[spark] object SecurityManager { // key used to store the spark secret in the Hadoop UGI val SECRET_LOOKUP_KEY = "sparkCookie" - /** - * Setup the cryptographic key used by IO encryption in credentials. The key is generated using - * [[KeyGenerator]]. The algorithm and key length is specified by the
spark git commit: [SPARK-18588][SS][KAFKA] Ignore the flaky kafka test
Repository: spark Updated Branches: refs/heads/branch-2.1 a0c1c699e -> 45e2b3c0e [SPARK-18588][SS][KAFKA] Ignore the flaky kafka test ## What changes were proposed in this pull request? Ignore the flaky test to unblock other PRs while I'm debugging it. ## How was this patch tested? Jenkins Author: Shixiong ZhuCloses #16051 from zsxwing/ignore-flaky-kafka-test. (cherry picked from commit 1633ff3b6c97e33191859f34c868782cbb0972fd) Signed-off-by: Shixiong Zhu Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/45e2b3c0 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/45e2b3c0 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/45e2b3c0 Branch: refs/heads/branch-2.1 Commit: 45e2b3c0e4cd5c6e1ce6d9c99950eda726d27250 Parents: a0c1c69 Author: Shixiong Zhu Authored: Mon Nov 28 21:04:20 2016 -0800 Committer: Shixiong Zhu Committed: Mon Nov 28 21:04:29 2016 -0800 -- .../scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/45e2b3c0/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala -- diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala index f9f6258..e1af14f 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala @@ -838,7 +838,7 @@ class KafkaSourceStressForDontFailOnDataLossSuite extends StreamTest with Shared } } - test("stress test for failOnDataLoss=false") { + ignore("stress test for failOnDataLoss=false") { val reader = spark .readStream .format("kafka") - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-18588][SS][KAFKA] Ignore the flaky kafka test
Repository: spark Updated Branches: refs/heads/master e64a2047e -> 1633ff3b6 [SPARK-18588][SS][KAFKA] Ignore the flaky kafka test ## What changes were proposed in this pull request? Ignore the flaky test to unblock other PRs while I'm debugging it. ## How was this patch tested? Jenkins Author: Shixiong ZhuCloses #16051 from zsxwing/ignore-flaky-kafka-test. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1633ff3b Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1633ff3b Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1633ff3b Branch: refs/heads/master Commit: 1633ff3b6c97e33191859f34c868782cbb0972fd Parents: e64a204 Author: Shixiong Zhu Authored: Mon Nov 28 21:04:20 2016 -0800 Committer: Shixiong Zhu Committed: Mon Nov 28 21:04:20 2016 -0800 -- .../scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/1633ff3b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala -- diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala index f9f6258..e1af14f 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala @@ -838,7 +838,7 @@ class KafkaSourceStressForDontFailOnDataLossSuite extends StreamTest with Shared } } - test("stress test for failOnDataLoss=false") { + ignore("stress test for failOnDataLoss=false") { val reader = spark .readStream .format("kafka") - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-18523][PYSPARK] Make SparkContext.stop more reliable
Repository: spark Updated Branches: refs/heads/branch-2.1 cdf315ba1 -> c46928ff9 [SPARK-18523][PYSPARK] Make SparkContext.stop more reliable ## What changes were proposed in this pull request? This PR fixes SparkContext broken state in which it may fall if spark driver get crashed or killed by OOM. ## How was this patch tested? 1. Start SparkContext; 2. Find Spark driver process and `kill -9` it; 3. Call `sc.stop()`; 4. Create new SparkContext after that; Without this patch you will crash on step 3 and won't be able to do step 4 without manual reset private attibutes or IPython notebook / shell restart. Author: Alexander ShorinCloses #15961 from kxepal/18523-make-spark-context-stop-more-reliable. (cherry picked from commit 71352c94ad2a60d1695bd7ac0f4452539270e10c) Signed-off-by: Reynold Xin Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c46928ff Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c46928ff Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c46928ff Branch: refs/heads/branch-2.1 Commit: c46928ff97371421613720a0d8d7f2baaa64bb73 Parents: cdf315b Author: Alexander Shorin Authored: Mon Nov 28 18:28:24 2016 -0800 Committer: Reynold Xin Committed: Mon Nov 28 18:28:29 2016 -0800 -- python/pyspark/context.py | 17 +++-- 1 file changed, 15 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/c46928ff/python/pyspark/context.py -- diff --git a/python/pyspark/context.py b/python/pyspark/context.py index 2fd3aee..5c4e79c 100644 --- a/python/pyspark/context.py +++ b/python/pyspark/context.py @@ -26,6 +26,8 @@ import warnings from threading import RLock from tempfile import NamedTemporaryFile +from py4j.protocol import Py4JError + from pyspark import accumulators from pyspark.accumulators import Accumulator from pyspark.broadcast import Broadcast @@ -373,8 +375,19 @@ class SparkContext(object): Shut down the SparkContext. """ if getattr(self, "_jsc", None): -self._jsc.stop() -self._jsc = None +try: +self._jsc.stop() +except Py4JError: +# Case: SPARK-18523 +warnings.warn( +'Unable to cleanly shutdown Spark JVM process.' +' It is possible that the process has crashed,' +' been killed or may also be in a zombie state.', +RuntimeWarning +) +pass +finally: +self._jsc = None if getattr(self, "_accumulatorServer", None): self._accumulatorServer.shutdown() self._accumulatorServer = None - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-18403][SQL] Fix unsafe data false sharing issue in ObjectHashAggregateExec
Repository: spark Updated Branches: refs/heads/master 05f7c6ffa -> 2e809903d [SPARK-18403][SQL] Fix unsafe data false sharing issue in ObjectHashAggregateExec ## What changes were proposed in this pull request? This PR fixes a random OOM issue occurred while running `ObjectHashAggregateSuite`. This issue can be steadily reproduced under the following conditions: 1. The aggregation must be evaluated using `ObjectHashAggregateExec`; 2. There must be an input column whose data type involves `ArrayType` (an input column of `MapType` may even cause SIGSEGV); 3. Sort-based aggregation fallback must be triggered during evaluation. The root cause is that while falling back to sort-based aggregation, we must sort and feed already evaluated partial aggregation buffers living in the hash map to the sort-based aggregator using an external sorter. However, the underlying mutable byte buffer of `UnsafeRow`s produced by the iterator of the external sorter is reused and may get overwritten when the iterator steps forward. After the last entry is consumed, the byte buffer points to a block of uninitialized memory filled by `5a`. Therefore, while reading an `UnsafeArrayData` out of the `UnsafeRow`, `5a5a5a5a` is treated as array size and triggers a memory allocation for a ridiculously large array and immediately blows up the JVM with an OOM. To fix this issue, we only need to add `.copy()` accordingly. ## How was this patch tested? New regression test case added in `ObjectHashAggregateSuite`. Author: Cheng LianCloses #15976 from liancheng/investigate-oom. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2e809903 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2e809903 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2e809903 Branch: refs/heads/master Commit: 2e809903d459b5b5aa6fd882b5c4a0c915af4d43 Parents: 05f7c6f Author: Cheng Lian Authored: Tue Nov 29 09:01:03 2016 +0800 Committer: Wenchen Fan Committed: Tue Nov 29 09:01:03 2016 +0800 -- .../aggregate/ObjectAggregationIterator.scala | 11 +- .../execution/ObjectHashAggregateSuite.scala| 164 +++ 2 files changed, 101 insertions(+), 74 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/2e809903/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/ObjectAggregationIterator.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/ObjectAggregationIterator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/ObjectAggregationIterator.scala index 3c7b9ee..3a7fcf1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/ObjectAggregationIterator.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/ObjectAggregationIterator.scala @@ -262,7 +262,9 @@ class SortBasedAggregator( // Firstly, update the aggregation buffer with input rows. while (hasNextInput && groupingKeyOrdering.compare(inputIterator.getKey, groupingKey) == 0) { -processRow(result.aggregationBuffer, inputIterator.getValue) +// Since `inputIterator.getValue` is an `UnsafeRow` whose underlying buffer will be +// overwritten when `inputIterator` steps forward, we need to do a deep copy here. +processRow(result.aggregationBuffer, inputIterator.getValue.copy()) hasNextInput = inputIterator.next() } @@ -271,7 +273,12 @@ class SortBasedAggregator( // be called after calling processRow. while (hasNextAggBuffer && groupingKeyOrdering.compare(initialAggBufferIterator.getKey, groupingKey) == 0) { -mergeAggregationBuffers(result.aggregationBuffer, initialAggBufferIterator.getValue) +mergeAggregationBuffers( + result.aggregationBuffer, + // Since `inputIterator.getValue` is an `UnsafeRow` whose underlying buffer will be + // overwritten when `inputIterator` steps forward, we need to do a deep copy here. + initialAggBufferIterator.getValue.copy() +) hasNextAggBuffer = initialAggBufferIterator.next() } http://git-wip-us.apache.org/repos/asf/spark/blob/2e809903/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ObjectHashAggregateSuite.scala -- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ObjectHashAggregateSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ObjectHashAggregateSuite.scala index
spark git commit: [SPARK-18408][ML] API Improvements for LSH
Repository: spark Updated Branches: refs/heads/branch-2.1 75d73d13e -> cdf315ba1 [SPARK-18408][ML] API Improvements for LSH ## What changes were proposed in this pull request? (1) Change output schema to `Array of Vector` instead of `Vectors` (2) Use `numHashTables` as the dimension of Array (3) Rename `RandomProjection` to `BucketedRandomProjectionLSH`, `MinHash` to `MinHashLSH` (4) Make `randUnitVectors/randCoefficients` private (5) Make Multi-Probe NN Search and `hashDistance` private for future discussion Saved for future PRs: (1) AND-amplification and `numHashFunctions` as the dimension of Vector are saved for a future PR. (2) `hashDistance` and MultiProbe NN Search needs more discussion. The current implementation is just a backward compatible one. ## How was this patch tested? Related unit tests are modified to make sure the performance of LSH are ensured, and the outputs of the APIs meets expectation. Author: Yun NiAuthor: Yunni Closes #15874 from Yunni/SPARK-18408-yunn-api-improvements. (cherry picked from commit 05f7c6ffab2a6be548375cd624dc27092677232f) Signed-off-by: Joseph K. Bradley Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/cdf315ba Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/cdf315ba Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/cdf315ba Branch: refs/heads/branch-2.1 Commit: cdf315ba1bd732291f05756281070eb7aa4e123f Parents: 75d73d1 Author: Yun Ni Authored: Mon Nov 28 15:14:46 2016 -0800 Committer: Joseph K. Bradley Committed: Mon Nov 28 15:14:55 2016 -0800 -- .../feature/BucketedRandomProjectionLSH.scala | 234 +++ .../scala/org/apache/spark/ml/feature/LSH.scala | 138 ++- .../org/apache/spark/ml/feature/MinHash.scala | 195 .../apache/spark/ml/feature/MinHashLSH.scala| 201 .../spark/ml/feature/RandomProjection.scala | 225 -- .../BucketedRandomProjectionLSHSuite.scala | 213 + .../org/apache/spark/ml/feature/LSHTest.scala | 17 +- .../spark/ml/feature/MinHashLSHSuite.scala | 161 + .../apache/spark/ml/feature/MinHashSuite.scala | 126 -- .../ml/feature/RandomProjectionSuite.scala | 197 10 files changed, 896 insertions(+), 811 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/cdf315ba/mllib/src/main/scala/org/apache/spark/ml/feature/BucketedRandomProjectionLSH.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/BucketedRandomProjectionLSH.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/BucketedRandomProjectionLSH.scala new file mode 100644 index 000..cbac163 --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/BucketedRandomProjectionLSH.scala @@ -0,0 +1,234 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ml.feature + +import scala.util.Random + +import breeze.linalg.normalize +import org.apache.hadoop.fs.Path + +import org.apache.spark.annotation.{Experimental, Since} +import org.apache.spark.ml.linalg._ +import org.apache.spark.ml.param._ +import org.apache.spark.ml.param.shared.HasSeed +import org.apache.spark.ml.util._ +import org.apache.spark.mllib.util.MLUtils +import org.apache.spark.sql.Row +import org.apache.spark.sql.types.StructType + +/** + * :: Experimental :: + * + * Params for [[BucketedRandomProjectionLSH]]. + */ +private[ml] trait BucketedRandomProjectionLSHParams extends Params { + + /** + * The length of each hash bucket, a larger bucket lowers the false negative rate. The number of + * buckets will be `(max L2 norm of input vectors) / bucketLength`. + * + * + * If input vectors are normalized, 1-10 times of pow(numRecords, -1/inputDim) would be a + * reasonable value + * @group param + */ + val bucketLength: DoubleParam =
spark git commit: [SPARK-18408][ML] API Improvements for LSH
Repository: spark Updated Branches: refs/heads/master 8b1609beb -> 05f7c6ffa [SPARK-18408][ML] API Improvements for LSH ## What changes were proposed in this pull request? (1) Change output schema to `Array of Vector` instead of `Vectors` (2) Use `numHashTables` as the dimension of Array (3) Rename `RandomProjection` to `BucketedRandomProjectionLSH`, `MinHash` to `MinHashLSH` (4) Make `randUnitVectors/randCoefficients` private (5) Make Multi-Probe NN Search and `hashDistance` private for future discussion Saved for future PRs: (1) AND-amplification and `numHashFunctions` as the dimension of Vector are saved for a future PR. (2) `hashDistance` and MultiProbe NN Search needs more discussion. The current implementation is just a backward compatible one. ## How was this patch tested? Related unit tests are modified to make sure the performance of LSH are ensured, and the outputs of the APIs meets expectation. Author: Yun NiAuthor: Yunni Closes #15874 from Yunni/SPARK-18408-yunn-api-improvements. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/05f7c6ff Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/05f7c6ff Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/05f7c6ff Branch: refs/heads/master Commit: 05f7c6ffab2a6be548375cd624dc27092677232f Parents: 8b1609b Author: Yun Ni Authored: Mon Nov 28 15:14:46 2016 -0800 Committer: Joseph K. Bradley Committed: Mon Nov 28 15:14:46 2016 -0800 -- .../feature/BucketedRandomProjectionLSH.scala | 234 +++ .../scala/org/apache/spark/ml/feature/LSH.scala | 138 ++- .../org/apache/spark/ml/feature/MinHash.scala | 195 .../apache/spark/ml/feature/MinHashLSH.scala| 201 .../spark/ml/feature/RandomProjection.scala | 225 -- .../BucketedRandomProjectionLSHSuite.scala | 213 + .../org/apache/spark/ml/feature/LSHTest.scala | 17 +- .../spark/ml/feature/MinHashLSHSuite.scala | 161 + .../apache/spark/ml/feature/MinHashSuite.scala | 126 -- .../ml/feature/RandomProjectionSuite.scala | 197 10 files changed, 896 insertions(+), 811 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/05f7c6ff/mllib/src/main/scala/org/apache/spark/ml/feature/BucketedRandomProjectionLSH.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/BucketedRandomProjectionLSH.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/BucketedRandomProjectionLSH.scala new file mode 100644 index 000..cbac163 --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/BucketedRandomProjectionLSH.scala @@ -0,0 +1,234 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ml.feature + +import scala.util.Random + +import breeze.linalg.normalize +import org.apache.hadoop.fs.Path + +import org.apache.spark.annotation.{Experimental, Since} +import org.apache.spark.ml.linalg._ +import org.apache.spark.ml.param._ +import org.apache.spark.ml.param.shared.HasSeed +import org.apache.spark.ml.util._ +import org.apache.spark.mllib.util.MLUtils +import org.apache.spark.sql.Row +import org.apache.spark.sql.types.StructType + +/** + * :: Experimental :: + * + * Params for [[BucketedRandomProjectionLSH]]. + */ +private[ml] trait BucketedRandomProjectionLSHParams extends Params { + + /** + * The length of each hash bucket, a larger bucket lowers the false negative rate. The number of + * buckets will be `(max L2 norm of input vectors) / bucketLength`. + * + * + * If input vectors are normalized, 1-10 times of pow(numRecords, -1/inputDim) would be a + * reasonable value + * @group param + */ + val bucketLength: DoubleParam = new DoubleParam(this, "bucketLength", +"the length of each hash bucket, a larger bucket lowers the false negative rate.", +
spark git commit: [SPARK-18553][CORE][BRANCH-2.0] Fix leak of TaskSetManager following executor loss
Repository: spark Updated Branches: refs/heads/branch-2.0 f158045fd -> 9ff03fa23 [SPARK-18553][CORE][BRANCH-2.0] Fix leak of TaskSetManager following executor loss ## What changes were proposed in this pull request? This patch fixes a critical resource leak in the TaskScheduler which could cause RDDs and ShuffleDependencies to be kept alive indefinitely if an executor with running tasks is permanently lost and the associated stage fails. This problem was originally identified by analyzing the heap dump of a driver belonging to a cluster that had run out of shuffle space. This dump contained several `ShuffleDependency` instances that were retained by `TaskSetManager`s inside the scheduler but were not otherwise referenced. Each of these `TaskSetManager`s was considered a "zombie" but had no running tasks and therefore should have been cleaned up. However, these zombie task sets were still referenced by the `TaskSchedulerImpl.taskIdToTaskSetManager` map. Entries are added to the `taskIdToTaskSetManager` map when tasks are launched and are removed inside of `TaskScheduler.statusUpdate()`, which is invoked by the scheduler backend while processing `StatusUpdate` messages from executors. The problem with this design is that a completely dead executor will never send a `StatusUpdate`. There is [some code](https://github.com/apache/spark/blob/072f4c518cdc57d705beec6bcc3113d9a6740819/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala#L338) in `statusUpdate` which handles tasks that exit with the `TaskState.LOST` state (which is supposed to correspond to a task failure triggered by total executor loss), but this state only seems to be used in Mesos fine-grained mode. There doesn't seem to be any code which performs per-task state cleanup for tasks that were running on an executor that completely disappears without sending any sort of final death message. The `executorLost` and [`removeExecutor`](https://github.com/apache /spark/blob/072f4c518cdc57d705beec6bcc3113d9a6740819/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala#L527) methods don't appear to perform any cleanup of the `taskId -> *` mappings, causing the leaks observed here. This patch's fix is to maintain a `executorId -> running task id` mapping so that these `taskId -> *` maps can be properly cleaned up following an executor loss. There are some potential corner-case interactions that I'm concerned about here, especially some details in [the comment](https://github.com/apache/spark/blob/072f4c518cdc57d705beec6bcc3113d9a6740819/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala#L523) in `removeExecutor`, so I'd appreciate a very careful review of these changes. This PR is opened against branch-2.0, where I first observed this problem, but will also need to be fixed in master, branch-2.1, and branch-1.6 (which I'll do in followup PRs after this fix is reviewed and merged). ## How was this patch tested? I added a new unit test to `TaskSchedulerImplSuite`. You can check out this PR as of 25e455e711b978cd331ee0f484f70fde31307634 to see the failing test. cc kayousterhout, markhamstra, rxin for review. Author: Josh RosenCloses #15986 from JoshRosen/fix-leak-following-total-executor-loss. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9ff03fa2 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9ff03fa2 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9ff03fa2 Branch: refs/heads/branch-2.0 Commit: 9ff03fa23e664bc0241914c7b5a7bda0c38eec15 Parents: f158045 Author: Josh Rosen Authored: Mon Nov 28 13:17:24 2016 -0800 Committer: Josh Rosen Committed: Mon Nov 28 13:17:24 2016 -0800 -- .../spark/scheduler/TaskSchedulerImpl.scala | 80 .../StandaloneDynamicAllocationSuite.scala | 7 +- .../scheduler/TaskSchedulerImplSuite.scala | 68 + 3 files changed, 120 insertions(+), 35 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/9ff03fa2/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala -- diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index d22321b..b2ef41e 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -88,10 +88,12 @@ private[spark] class TaskSchedulerImpl( // Incrementing task IDs val nextTaskId = new AtomicLong(0) - // Number of tasks running on each executor - private
spark git commit: [SPARK-18117][CORE] Add test for TaskSetBlacklist
Repository: spark Updated Branches: refs/heads/master ad67993b7 -> 8b1609beb [SPARK-18117][CORE] Add test for TaskSetBlacklist ## What changes were proposed in this pull request? This adds tests to verify the interaction between TaskSetBlacklist and TaskSchedulerImpl. TaskSetBlacklist was introduced by SPARK-17675 but it neglected to add these tests. This change does not fix any bugs -- it is just for increasing test coverage. ## How was this patch tested? Jenkins Author: Imran RashidCloses #15644 from squito/taskset_blacklist_test_update. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8b1609be Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8b1609be Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8b1609be Branch: refs/heads/master Commit: 8b1609bebe489b2ef78db4be6e9836687089fe3d Parents: ad67993 Author: Imran Rashid Authored: Mon Nov 28 13:47:09 2016 -0600 Committer: Imran Rashid Committed: Mon Nov 28 13:47:09 2016 -0600 -- .../apache/spark/scheduler/TaskSetManager.scala | 2 +- .../scheduler/TaskSchedulerImplSuite.scala | 254 ++- .../spark/scheduler/TaskSetManagerSuite.scala | 45 +++- 3 files changed, 292 insertions(+), 9 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/8b1609be/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala -- diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index b766e41..f2a432c 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -84,7 +84,7 @@ private[spark] class TaskSetManager( var totalResultSize = 0L var calculatedTasks = 0 - private val taskSetBlacklistHelperOpt: Option[TaskSetBlacklist] = { + private[scheduler] val taskSetBlacklistHelperOpt: Option[TaskSetBlacklist] = { if (BlacklistTracker.isBlacklistEnabled(conf)) { Some(new TaskSetBlacklist(conf, stageId, clock)) } else { http://git-wip-us.apache.org/repos/asf/spark/blob/8b1609be/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala -- diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala index f5f1947..5dc7708 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala @@ -17,7 +17,12 @@ package org.apache.spark.scheduler +import scala.collection.mutable.HashMap + +import org.mockito.Matchers.{anyInt, anyString, eq => meq} +import org.mockito.Mockito.{atLeast, atMost, never, spy, verify, when} import org.scalatest.BeforeAndAfterEach +import org.scalatest.mock.MockitoSugar import org.apache.spark._ import org.apache.spark.internal.config @@ -31,7 +36,7 @@ class FakeSchedulerBackend extends SchedulerBackend { } class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with BeforeAndAfterEach -with Logging { +with Logging with MockitoSugar { var failedTaskSetException: Option[Throwable] = None var failedTaskSetReason: String = null @@ -40,11 +45,16 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B var taskScheduler: TaskSchedulerImpl = null var dagScheduler: DAGScheduler = null + val stageToMockTaskSetBlacklist = new HashMap[Int, TaskSetBlacklist]() + val stageToMockTaskSetManager = new HashMap[Int, TaskSetManager]() + override def beforeEach(): Unit = { super.beforeEach() failedTaskSet = false failedTaskSetException = None failedTaskSetReason = null +stageToMockTaskSetBlacklist.clear() +stageToMockTaskSetManager.clear() } override def afterEach(): Unit = { @@ -66,6 +76,30 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B } sc = new SparkContext(conf) taskScheduler = new TaskSchedulerImpl(sc) +setupHelper() + } + + def setupSchedulerWithMockTaskSetBlacklist(): TaskSchedulerImpl = { +val conf = new SparkConf().setMaster("local").setAppName("TaskSchedulerImplSuite") +conf.set(config.BLACKLIST_ENABLED, true) +sc = new SparkContext(conf) +taskScheduler = + new TaskSchedulerImpl(sc, sc.conf.getInt("spark.task.maxFailures", 4)) { +override def createTaskSetManager(taskSet: TaskSet, maxFailures: Int): TaskSetManager = { + val
[1/2] spark git commit: Preparing Spark release v2.1.0-rc1
Repository: spark Updated Branches: refs/heads/branch-2.1 b386943b2 -> 75d73d13e Preparing Spark release v2.1.0-rc1 Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/80aabc0b Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/80aabc0b Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/80aabc0b Branch: refs/heads/branch-2.1 Commit: 80aabc0bd33dc5661a90133156247e7a8c1bf7f5 Parents: b386943 Author: Patrick WendellAuthored: Mon Nov 28 11:48:12 2016 -0800 Committer: Patrick Wendell Committed: Mon Nov 28 11:48:12 2016 -0800 -- assembly/pom.xml | 2 +- common/network-common/pom.xml | 2 +- common/network-shuffle/pom.xml| 2 +- common/network-yarn/pom.xml | 2 +- common/sketch/pom.xml | 2 +- common/tags/pom.xml | 2 +- common/unsafe/pom.xml | 2 +- core/pom.xml | 2 +- docs/_config.yml | 2 +- examples/pom.xml | 2 +- external/docker-integration-tests/pom.xml | 2 +- external/flume-assembly/pom.xml | 2 +- external/flume-sink/pom.xml | 2 +- external/flume/pom.xml| 2 +- external/java8-tests/pom.xml | 2 +- external/kafka-0-10-assembly/pom.xml | 2 +- external/kafka-0-10-sql/pom.xml | 2 +- external/kafka-0-10/pom.xml | 2 +- external/kafka-0-8-assembly/pom.xml | 2 +- external/kafka-0-8/pom.xml| 2 +- external/kinesis-asl-assembly/pom.xml | 2 +- external/kinesis-asl/pom.xml | 2 +- external/spark-ganglia-lgpl/pom.xml | 2 +- graphx/pom.xml| 2 +- launcher/pom.xml | 2 +- mesos/pom.xml | 2 +- mllib-local/pom.xml | 2 +- mllib/pom.xml | 2 +- pom.xml | 2 +- python/pyspark/version.py | 2 +- repl/pom.xml | 2 +- sql/catalyst/pom.xml | 2 +- sql/core/pom.xml | 2 +- sql/hive-thriftserver/pom.xml | 2 +- sql/hive/pom.xml | 2 +- streaming/pom.xml | 2 +- tools/pom.xml | 2 +- yarn/pom.xml | 2 +- 38 files changed, 38 insertions(+), 38 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/80aabc0b/assembly/pom.xml -- diff --git a/assembly/pom.xml b/assembly/pom.xml index ec243ea..aebfd12 100644 --- a/assembly/pom.xml +++ b/assembly/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.11 -2.1.0-SNAPSHOT +2.1.0 ../pom.xml http://git-wip-us.apache.org/repos/asf/spark/blob/80aabc0b/common/network-common/pom.xml -- diff --git a/common/network-common/pom.xml b/common/network-common/pom.xml index fcefe64..67d78d5 100644 --- a/common/network-common/pom.xml +++ b/common/network-common/pom.xml @@ -22,7 +22,7 @@ org.apache.spark spark-parent_2.11 -2.1.0-SNAPSHOT +2.1.0 ../../pom.xml http://git-wip-us.apache.org/repos/asf/spark/blob/80aabc0b/common/network-shuffle/pom.xml -- diff --git a/common/network-shuffle/pom.xml b/common/network-shuffle/pom.xml index 511e1f2..9379097 100644 --- a/common/network-shuffle/pom.xml +++ b/common/network-shuffle/pom.xml @@ -22,7 +22,7 @@ org.apache.spark spark-parent_2.11 -2.1.0-SNAPSHOT +2.1.0 ../../pom.xml http://git-wip-us.apache.org/repos/asf/spark/blob/80aabc0b/common/network-yarn/pom.xml -- diff --git a/common/network-yarn/pom.xml b/common/network-yarn/pom.xml index 606ad15..53cb8dd 100644 --- a/common/network-yarn/pom.xml +++ b/common/network-yarn/pom.xml @@ -22,7 +22,7 @@ org.apache.spark spark-parent_2.11 -2.1.0-SNAPSHOT +2.1.0 ../../pom.xml http://git-wip-us.apache.org/repos/asf/spark/blob/80aabc0b/common/sketch/pom.xml -- diff --git a/common/sketch/pom.xml b/common/sketch/pom.xml index 626f023..89bee85 100644 --- a/common/sketch/pom.xml +++ b/common/sketch/pom.xml @@ -22,7 +22,7 @@ org.apache.spark spark-parent_2.11 -2.1.0-SNAPSHOT +2.1.0 ../../pom.xml
[2/2] spark git commit: Preparing development version 2.1.1-SNAPSHOT
Preparing development version 2.1.1-SNAPSHOT Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/75d73d13 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/75d73d13 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/75d73d13 Branch: refs/heads/branch-2.1 Commit: 75d73d13e82aa88a7043d60b041b97fdb19e49b9 Parents: 80aabc0 Author: Patrick WendellAuthored: Mon Nov 28 11:48:21 2016 -0800 Committer: Patrick Wendell Committed: Mon Nov 28 11:48:21 2016 -0800 -- R/pkg/DESCRIPTION | 2 +- assembly/pom.xml | 2 +- common/network-common/pom.xml | 2 +- common/network-shuffle/pom.xml| 2 +- common/network-yarn/pom.xml | 2 +- common/sketch/pom.xml | 2 +- common/tags/pom.xml | 2 +- common/unsafe/pom.xml | 2 +- core/pom.xml | 2 +- docs/_config.yml | 4 ++-- examples/pom.xml | 2 +- external/docker-integration-tests/pom.xml | 2 +- external/flume-assembly/pom.xml | 2 +- external/flume-sink/pom.xml | 2 +- external/flume/pom.xml| 2 +- external/java8-tests/pom.xml | 2 +- external/kafka-0-10-assembly/pom.xml | 2 +- external/kafka-0-10-sql/pom.xml | 2 +- external/kafka-0-10/pom.xml | 2 +- external/kafka-0-8-assembly/pom.xml | 2 +- external/kafka-0-8/pom.xml| 2 +- external/kinesis-asl-assembly/pom.xml | 2 +- external/kinesis-asl/pom.xml | 2 +- external/spark-ganglia-lgpl/pom.xml | 2 +- graphx/pom.xml| 2 +- launcher/pom.xml | 2 +- mesos/pom.xml | 2 +- mllib-local/pom.xml | 2 +- mllib/pom.xml | 2 +- pom.xml | 2 +- python/pyspark/version.py | 2 +- repl/pom.xml | 2 +- sql/catalyst/pom.xml | 2 +- sql/core/pom.xml | 2 +- sql/hive-thriftserver/pom.xml | 2 +- sql/hive/pom.xml | 2 +- streaming/pom.xml | 2 +- tools/pom.xml | 2 +- yarn/pom.xml | 2 +- 39 files changed, 40 insertions(+), 40 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/75d73d13/R/pkg/DESCRIPTION -- diff --git a/R/pkg/DESCRIPTION b/R/pkg/DESCRIPTION index 981ae12..46fb178 100644 --- a/R/pkg/DESCRIPTION +++ b/R/pkg/DESCRIPTION @@ -1,7 +1,7 @@ Package: SparkR Type: Package Title: R Frontend for Apache Spark -Version: 2.1.0 +Version: 2.1.1 Date: 2016-11-06 Authors@R: c(person("Shivaram", "Venkataraman", role = c("aut", "cre"), email = "shiva...@cs.berkeley.edu"), http://git-wip-us.apache.org/repos/asf/spark/blob/75d73d13/assembly/pom.xml -- diff --git a/assembly/pom.xml b/assembly/pom.xml index aebfd12..29522fd 100644 --- a/assembly/pom.xml +++ b/assembly/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.11 -2.1.0 +2.1.1-SNAPSHOT ../pom.xml http://git-wip-us.apache.org/repos/asf/spark/blob/75d73d13/common/network-common/pom.xml -- diff --git a/common/network-common/pom.xml b/common/network-common/pom.xml index 67d78d5..85644c4 100644 --- a/common/network-common/pom.xml +++ b/common/network-common/pom.xml @@ -22,7 +22,7 @@ org.apache.spark spark-parent_2.11 -2.1.0 +2.1.1-SNAPSHOT ../../pom.xml http://git-wip-us.apache.org/repos/asf/spark/blob/75d73d13/common/network-shuffle/pom.xml -- diff --git a/common/network-shuffle/pom.xml b/common/network-shuffle/pom.xml index 9379097..e15ede9 100644 --- a/common/network-shuffle/pom.xml +++ b/common/network-shuffle/pom.xml @@ -22,7 +22,7 @@ org.apache.spark spark-parent_2.11 -2.1.0 +2.1.1-SNAPSHOT ../../pom.xml http://git-wip-us.apache.org/repos/asf/spark/blob/75d73d13/common/network-yarn/pom.xml -- diff --git a/common/network-yarn/pom.xml b/common/network-yarn/pom.xml index 53cb8dd..c93a355 100644 --- a/common/network-yarn/pom.xml +++ b/common/network-yarn/pom.xml @@ -22,7 +22,7 @@ org.apache.spark spark-parent_2.11 -2.1.0 +
[spark] Git Push Summary
Repository: spark Updated Tags: refs/tags/v2.1.0-rc1 [created] 80aabc0bd - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-18597][SQL] Do not push-down join conditions to the left side of a Left Anti join [BRANCH-2.0]
Repository: spark Updated Branches: refs/heads/branch-2.0 759bd4a6a -> f158045fd [SPARK-18597][SQL] Do not push-down join conditions to the left side of a Left Anti join [BRANCH-2.0] ## What changes were proposed in this pull request? We currently push down join conditions of a Left Anti join to both sides of the join. This is similar to Inner, Left Semi and Existence (a specialized left semi) join. The problem is that this changes the semantics of the join; a left anti join filters out rows that matches the join condition. This PR fixes this by only pushing down conditions to the right hand side of the join. This is similar to the behavior of left outer join. This PR is a backport of https://github.com/apache/spark/pull/16026 ## How was this patch tested? Added tests to `FilterPushdownSuite.scala` and created a SQLQueryTestSuite file for left anti joins with a regression test. Author: Herman van HovellCloses #16039 from hvanhovell/SPARK-18597-branch-2.0. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f158045f Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f158045f Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f158045f Branch: refs/heads/branch-2.0 Commit: f158045fde14b3018493e9059e9dcb2095f11c54 Parents: 759bd4a Author: Herman van Hovell Authored: Mon Nov 28 11:20:59 2016 -0800 Committer: Herman van Hovell Committed: Mon Nov 28 11:20:59 2016 -0800 -- .../sql/catalyst/optimizer/Optimizer.scala | 6 ++-- .../optimizer/FilterPushdownSuite.scala | 33 .../resources/sql-tests/inputs/anti-join.sql| 7 + .../sql-tests/results/anti-join.sql.out | 29 + 4 files changed, 72 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/f158045f/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 0a28ef4..3a71463 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -1289,7 +1289,7 @@ object PushPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper { split(joinCondition.map(splitConjunctivePredicates).getOrElse(Nil), left, right) joinType match { -case Inner | LeftExistence(_) => +case Inner | LeftSemi | ExistenceJoin(_) => // push down the single side only join filter for both sides sub queries val newLeft = leftJoinConditions. reduceLeftOption(And).map(Filter(_, left)).getOrElse(left) @@ -1306,14 +1306,14 @@ object PushPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper { val newJoinCond = (rightJoinConditions ++ commonJoinCondition).reduceLeftOption(And) Join(newLeft, newRight, RightOuter, newJoinCond) -case LeftOuter => +case LeftOuter | LeftAnti => // push down the right side only join filter for right sub query val newLeft = left val newRight = rightJoinConditions. reduceLeftOption(And).map(Filter(_, right)).getOrElse(right) val newJoinCond = (leftJoinConditions ++ commonJoinCondition).reduceLeftOption(And) - Join(newLeft, newRight, LeftOuter, newJoinCond) + Join(newLeft, newRight, joinType, newJoinCond) case FullOuter => j case NaturalJoin(_) => sys.error("Untransformed NaturalJoin node") case UsingJoin(_, _) => sys.error("Untransformed Using join node") http://git-wip-us.apache.org/repos/asf/spark/blob/f158045f/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala -- diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala index 019f132..3e67282 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala @@ -514,6 +514,39 @@ class FilterPushdownSuite extends PlanTest { comparePlans(optimized, analysis.EliminateSubqueryAliases(correctAnswer)) } + test("joins: push down where clause into left anti join") { +val
spark git commit: [SPARK-17680][SQL][TEST] Added test cases for InMemoryRelation
Repository: spark Updated Branches: refs/heads/branch-2.1 81e3f9711 -> b386943b2 [SPARK-17680][SQL][TEST] Added test cases for InMemoryRelation ## What changes were proposed in this pull request? This pull request adds test cases for the following cases: - keep all data types with null or without null - access `CachedBatch` disabling whole stage codegen - access only some columns in `CachedBatch` This PR is a part of https://github.com/apache/spark/pull/15219. Here are motivations to add these tests. When https://github.com/apache/spark/pull/15219 is enabled, the first two cases are handled by specialized (generated) code. The third one is a pitfall. In general, even for now, it would be helpful to increase test coverage. ## How was this patch tested? added test suites itself Author: Kazuaki IshizakiCloses #15462 from kiszk/columnartestsuites. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b386943b Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b386943b Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b386943b Branch: refs/heads/branch-2.1 Commit: b386943b2fe6af5237270bfa520295c1711bb341 Parents: 81e3f97 Author: Kazuaki Ishizaki Authored: Mon Nov 28 14:06:37 2016 -0500 Committer: Andrew Or Committed: Mon Nov 28 14:07:34 2016 -0500 -- .../columnar/InMemoryColumnarQuerySuite.scala | 148 ++- 1 file changed, 146 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/b386943b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala index b272c8e..afeb478 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala @@ -20,18 +20,96 @@ package org.apache.spark.sql.execution.columnar import java.nio.charset.StandardCharsets import java.sql.{Date, Timestamp} -import org.apache.spark.sql.{QueryTest, Row} +import org.apache.spark.sql.{DataFrame, QueryTest, Row} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.test.SQLTestData._ import org.apache.spark.sql.types._ -import org.apache.spark.storage.StorageLevel.MEMORY_ONLY +import org.apache.spark.storage.StorageLevel._ class InMemoryColumnarQuerySuite extends QueryTest with SharedSQLContext { import testImplicits._ setupTestData() + private def cachePrimitiveTest(data: DataFrame, dataType: String) { +data.createOrReplaceTempView(s"testData$dataType") +val storageLevel = MEMORY_ONLY +val plan = spark.sessionState.executePlan(data.logicalPlan).sparkPlan +val inMemoryRelation = InMemoryRelation(useCompression = true, 5, storageLevel, plan, None) + +assert(inMemoryRelation.cachedColumnBuffers.getStorageLevel == storageLevel) +inMemoryRelation.cachedColumnBuffers.collect().head match { + case _: CachedBatch => + case other => fail(s"Unexpected cached batch type: ${other.getClass.getName}") +} +checkAnswer(inMemoryRelation, data.collect().toSeq) + } + + private def testPrimitiveType(nullability: Boolean): Unit = { +val dataTypes = Seq(BooleanType, ByteType, ShortType, IntegerType, LongType, + FloatType, DoubleType, DateType, TimestampType, DecimalType(25, 5), DecimalType(6, 5)) +val schema = StructType(dataTypes.zipWithIndex.map { case (dataType, index) => + StructField(s"col$index", dataType, nullability) +}) +val rdd = spark.sparkContext.parallelize((1 to 10).map(i => Row( + if (nullability && i % 3 == 0) null else if (i % 2 == 0) true else false, + if (nullability && i % 3 == 0) null else i.toByte, + if (nullability && i % 3 == 0) null else i.toShort, + if (nullability && i % 3 == 0) null else i.toInt, + if (nullability && i % 3 == 0) null else i.toLong, + if (nullability && i % 3 == 0) null else (i + 0.25).toFloat, + if (nullability && i % 3 == 0) null else (i + 0.75).toDouble, + if (nullability && i % 3 == 0) null else new Date(i), + if (nullability && i % 3 == 0) null else new Timestamp(i * 100L), + if (nullability && i % 3 == 0) null else BigDecimal(Long.MaxValue.toString + ".12345"), + if (nullability && i % 3 == 0) null + else new java.math.BigDecimal(s"${i % 9 + 1}" + ".23456") +))) +
spark git commit: [SPARK-16282][SQL] Implement percentile SQL function.
Repository: spark Updated Branches: refs/heads/branch-2.1 4d7947856 -> 81e3f9711 [SPARK-16282][SQL] Implement percentile SQL function. ## What changes were proposed in this pull request? Implement percentile SQL function. It computes the exact percentile(s) of expr at pc with range in [0, 1]. ## How was this patch tested? Add a new testsuite `PercentileSuite` to test percentile directly. Updated related testcases in `ExpressionToSQLSuite`. Author: jiangxingboAuthor: èæå Author: jiangxingbo Closes #14136 from jiangxb1987/percentile. (cherry picked from commit 0f5f52a3d1e5dcf5b970c49e324e322b9deb00f3) Signed-off-by: Herman van Hovell Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/81e3f971 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/81e3f971 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/81e3f971 Branch: refs/heads/branch-2.1 Commit: 81e3f9711da5758fdeb297fe057685f648b6458b Parents: 4d79478 Author: jiangxingbo Authored: Mon Nov 28 11:05:58 2016 -0800 Committer: Herman van Hovell Committed: Mon Nov 28 11:06:12 2016 -0800 -- .../catalyst/analysis/FunctionRegistry.scala| 1 + .../expressions/aggregate/Percentile.scala | 269 +++ .../expressions/aggregate/PercentileSuite.scala | 245 + .../spark/sql/hive/HiveSessionCatalog.scala | 3 +- .../sql/catalyst/ExpressionToSQLSuite.scala | 2 + 5 files changed, 518 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/81e3f971/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala index 007cdc1..2636afe 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala @@ -249,6 +249,7 @@ object FunctionRegistry { expression[Max]("max"), expression[Average]("mean"), expression[Min]("min"), +expression[Percentile]("percentile"), expression[Skewness]("skewness"), expression[ApproximatePercentile]("percentile_approx"), expression[StddevSamp]("std"), http://git-wip-us.apache.org/repos/asf/spark/blob/81e3f971/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Percentile.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Percentile.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Percentile.scala new file mode 100644 index 000..356e088 --- /dev/null +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Percentile.scala @@ -0,0 +1,269 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions.aggregate + +import java.io.{ByteArrayInputStream, ByteArrayOutputStream, DataInputStream, DataOutputStream} +import java.util + +import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.analysis.TypeCheckResult +import org.apache.spark.sql.catalyst.analysis.TypeCheckResult.{TypeCheckFailure, TypeCheckSuccess} +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.util._ +import org.apache.spark.sql.types._ +import org.apache.spark.util.collection.OpenHashMap + +/** + * The Percentile aggregate function returns the exact percentile(s) of numeric column `expr` at + * the given percentage(s) with value
spark git commit: [SPARK-17680][SQL][TEST] Added test cases for InMemoryRelation
Repository: spark Updated Branches: refs/heads/master 0f5f52a3d -> ad67993b7 [SPARK-17680][SQL][TEST] Added test cases for InMemoryRelation ## What changes were proposed in this pull request? This pull request adds test cases for the following cases: - keep all data types with null or without null - access `CachedBatch` disabling whole stage codegen - access only some columns in `CachedBatch` This PR is a part of https://github.com/apache/spark/pull/15219. Here are motivations to add these tests. When https://github.com/apache/spark/pull/15219 is enabled, the first two cases are handled by specialized (generated) code. The third one is a pitfall. In general, even for now, it would be helpful to increase test coverage. ## How was this patch tested? added test suites itself Author: Kazuaki IshizakiCloses #15462 from kiszk/columnartestsuites. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ad67993b Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ad67993b Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ad67993b Branch: refs/heads/master Commit: ad67993b73490a24e7012df23810dab1712e7689 Parents: 0f5f52a Author: Kazuaki Ishizaki Authored: Mon Nov 28 14:06:37 2016 -0500 Committer: Andrew Or Committed: Mon Nov 28 14:06:37 2016 -0500 -- .../columnar/InMemoryColumnarQuerySuite.scala | 148 ++- 1 file changed, 146 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/ad67993b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala index b272c8e..afeb478 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala @@ -20,18 +20,96 @@ package org.apache.spark.sql.execution.columnar import java.nio.charset.StandardCharsets import java.sql.{Date, Timestamp} -import org.apache.spark.sql.{QueryTest, Row} +import org.apache.spark.sql.{DataFrame, QueryTest, Row} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.test.SQLTestData._ import org.apache.spark.sql.types._ -import org.apache.spark.storage.StorageLevel.MEMORY_ONLY +import org.apache.spark.storage.StorageLevel._ class InMemoryColumnarQuerySuite extends QueryTest with SharedSQLContext { import testImplicits._ setupTestData() + private def cachePrimitiveTest(data: DataFrame, dataType: String) { +data.createOrReplaceTempView(s"testData$dataType") +val storageLevel = MEMORY_ONLY +val plan = spark.sessionState.executePlan(data.logicalPlan).sparkPlan +val inMemoryRelation = InMemoryRelation(useCompression = true, 5, storageLevel, plan, None) + +assert(inMemoryRelation.cachedColumnBuffers.getStorageLevel == storageLevel) +inMemoryRelation.cachedColumnBuffers.collect().head match { + case _: CachedBatch => + case other => fail(s"Unexpected cached batch type: ${other.getClass.getName}") +} +checkAnswer(inMemoryRelation, data.collect().toSeq) + } + + private def testPrimitiveType(nullability: Boolean): Unit = { +val dataTypes = Seq(BooleanType, ByteType, ShortType, IntegerType, LongType, + FloatType, DoubleType, DateType, TimestampType, DecimalType(25, 5), DecimalType(6, 5)) +val schema = StructType(dataTypes.zipWithIndex.map { case (dataType, index) => + StructField(s"col$index", dataType, nullability) +}) +val rdd = spark.sparkContext.parallelize((1 to 10).map(i => Row( + if (nullability && i % 3 == 0) null else if (i % 2 == 0) true else false, + if (nullability && i % 3 == 0) null else i.toByte, + if (nullability && i % 3 == 0) null else i.toShort, + if (nullability && i % 3 == 0) null else i.toInt, + if (nullability && i % 3 == 0) null else i.toLong, + if (nullability && i % 3 == 0) null else (i + 0.25).toFloat, + if (nullability && i % 3 == 0) null else (i + 0.75).toDouble, + if (nullability && i % 3 == 0) null else new Date(i), + if (nullability && i % 3 == 0) null else new Timestamp(i * 100L), + if (nullability && i % 3 == 0) null else BigDecimal(Long.MaxValue.toString + ".12345"), + if (nullability && i % 3 == 0) null + else new java.math.BigDecimal(s"${i % 9 + 1}" + ".23456") +))) +
spark git commit: [SPARK-16282][SQL] Implement percentile SQL function.
Repository: spark Updated Branches: refs/heads/master 185642846 -> 0f5f52a3d [SPARK-16282][SQL] Implement percentile SQL function. ## What changes were proposed in this pull request? Implement percentile SQL function. It computes the exact percentile(s) of expr at pc with range in [0, 1]. ## How was this patch tested? Add a new testsuite `PercentileSuite` to test percentile directly. Updated related testcases in `ExpressionToSQLSuite`. Author: jiangxingboAuthor: èæå Author: jiangxingbo Closes #14136 from jiangxb1987/percentile. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0f5f52a3 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0f5f52a3 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0f5f52a3 Branch: refs/heads/master Commit: 0f5f52a3d1e5dcf5b970c49e324e322b9deb00f3 Parents: 1856428 Author: jiangxingbo Authored: Mon Nov 28 11:05:58 2016 -0800 Committer: Herman van Hovell Committed: Mon Nov 28 11:05:58 2016 -0800 -- .../catalyst/analysis/FunctionRegistry.scala| 1 + .../expressions/aggregate/Percentile.scala | 269 +++ .../expressions/aggregate/PercentileSuite.scala | 245 + .../spark/sql/hive/HiveSessionCatalog.scala | 3 +- .../sql/catalyst/ExpressionToSQLSuite.scala | 2 + 5 files changed, 518 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/0f5f52a3/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala index 007cdc1..2636afe 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala @@ -249,6 +249,7 @@ object FunctionRegistry { expression[Max]("max"), expression[Average]("mean"), expression[Min]("min"), +expression[Percentile]("percentile"), expression[Skewness]("skewness"), expression[ApproximatePercentile]("percentile_approx"), expression[StddevSamp]("std"), http://git-wip-us.apache.org/repos/asf/spark/blob/0f5f52a3/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Percentile.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Percentile.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Percentile.scala new file mode 100644 index 000..356e088 --- /dev/null +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Percentile.scala @@ -0,0 +1,269 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions.aggregate + +import java.io.{ByteArrayInputStream, ByteArrayOutputStream, DataInputStream, DataOutputStream} +import java.util + +import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.analysis.TypeCheckResult +import org.apache.spark.sql.catalyst.analysis.TypeCheckResult.{TypeCheckFailure, TypeCheckSuccess} +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.util._ +import org.apache.spark.sql.types._ +import org.apache.spark.util.collection.OpenHashMap + +/** + * The Percentile aggregate function returns the exact percentile(s) of numeric column `expr` at + * the given percentage(s) with value range in [0.0, 1.0]. + * + * The operator is bound to the slower sort based aggregation path because the number of elements + * and their
spark git commit: [SQL][MINOR] DESC should use 'Catalog' as partition provider
Repository: spark Updated Branches: refs/heads/master eba727757 -> 185642846 [SQL][MINOR] DESC should use 'Catalog' as partition provider ## What changes were proposed in this pull request? `CatalogTable` has a parameter named `tracksPartitionsInCatalog`, and in `CatalogTable.toString` we use `"Partition Provider: Catalog"` to represent it. This PR fixes `DESC TABLE` to make it consistent with `CatalogTable.toString`. ## How was this patch tested? N/A Author: Wenchen FanCloses #16035 from cloud-fan/minor. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/18564284 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/18564284 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/18564284 Branch: refs/heads/master Commit: 185642846e25fa812f9c7f398ab20bffc1e25273 Parents: eba7277 Author: Wenchen Fan Authored: Mon Nov 28 10:57:17 2016 -0800 Committer: Reynold Xin Committed: Mon Nov 28 10:57:17 2016 -0800 -- .../main/scala/org/apache/spark/sql/execution/command/tables.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/18564284/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala index ca4d20a..57d66f1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala @@ -489,7 +489,7 @@ case class DescribeTableCommand( if (table.tableType == CatalogTableType.VIEW) describeViewInfo(table, buffer) if (DDLUtils.isDatasourceTable(table) && table.tracksPartitionsInCatalog) { - append(buffer, "Partition Provider:", "Hive", "") + append(buffer, "Partition Provider:", "Catalog", "") } } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-18602] Set the version of org.codehaus.janino:commons-compiler to 3.0.0 to match the version of org.codehaus.janino:janino
Repository: spark Updated Branches: refs/heads/branch-2.1 32b259fae -> 34ad4d520 [SPARK-18602] Set the version of org.codehaus.janino:commons-compiler to 3.0.0 to match the version of org.codehaus.janino:janino ## What changes were proposed in this pull request? org.codehaus.janino:janino depends on org.codehaus.janino:commons-compiler and we have been upgraded to org.codehaus.janino:janino 3.0.0. However, seems we are still pulling in org.codehaus.janino:commons-compiler 2.7.6 because of calcite. It looks like an accident because we exclude janino from calcite (see here https://github.com/apache/spark/blob/branch-2.1/pom.xml#L1759). So, this PR upgrades org.codehaus.janino:commons-compiler to 3.0.0. ## How was this patch tested? jenkins Author: Yin HuaiCloses #16025 from yhuai/janino-commons-compile. (cherry picked from commit eba727757ed5dc23c635e1926795aea62ec0fc66) Signed-off-by: Yin Huai Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/34ad4d52 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/34ad4d52 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/34ad4d52 Branch: refs/heads/branch-2.1 Commit: 34ad4d520ae0e4302972097c5985ab2c5a8d5e04 Parents: 32b259f Author: Yin Huai Authored: Mon Nov 28 10:09:30 2016 -0800 Committer: Yin Huai Committed: Mon Nov 28 10:09:50 2016 -0800 -- dev/deps/spark-deps-hadoop-2.2 | 2 +- dev/deps/spark-deps-hadoop-2.3 | 2 +- dev/deps/spark-deps-hadoop-2.4 | 2 +- dev/deps/spark-deps-hadoop-2.6 | 2 +- dev/deps/spark-deps-hadoop-2.7 | 2 +- pom.xml| 9 + sql/catalyst/pom.xml | 4 7 files changed, 18 insertions(+), 5 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/34ad4d52/dev/deps/spark-deps-hadoop-2.2 -- diff --git a/dev/deps/spark-deps-hadoop-2.2 b/dev/deps/spark-deps-hadoop-2.2 index bbdea06..89bfcef 100644 --- a/dev/deps/spark-deps-hadoop-2.2 +++ b/dev/deps/spark-deps-hadoop-2.2 @@ -24,7 +24,7 @@ commons-beanutils-core-1.8.0.jar commons-cli-1.2.jar commons-codec-1.10.jar commons-collections-3.2.2.jar -commons-compiler-2.7.6.jar +commons-compiler-3.0.0.jar commons-compress-1.4.1.jar commons-configuration-1.6.jar commons-crypto-1.0.0.jar http://git-wip-us.apache.org/repos/asf/spark/blob/34ad4d52/dev/deps/spark-deps-hadoop-2.3 -- diff --git a/dev/deps/spark-deps-hadoop-2.3 b/dev/deps/spark-deps-hadoop-2.3 index a2dec41..8df3858 100644 --- a/dev/deps/spark-deps-hadoop-2.3 +++ b/dev/deps/spark-deps-hadoop-2.3 @@ -27,7 +27,7 @@ commons-beanutils-core-1.8.0.jar commons-cli-1.2.jar commons-codec-1.10.jar commons-collections-3.2.2.jar -commons-compiler-2.7.6.jar +commons-compiler-3.0.0.jar commons-compress-1.4.1.jar commons-configuration-1.6.jar commons-crypto-1.0.0.jar http://git-wip-us.apache.org/repos/asf/spark/blob/34ad4d52/dev/deps/spark-deps-hadoop-2.4 -- diff --git a/dev/deps/spark-deps-hadoop-2.4 b/dev/deps/spark-deps-hadoop-2.4 index c1f02b9..71e7fb6 100644 --- a/dev/deps/spark-deps-hadoop-2.4 +++ b/dev/deps/spark-deps-hadoop-2.4 @@ -27,7 +27,7 @@ commons-beanutils-core-1.8.0.jar commons-cli-1.2.jar commons-codec-1.10.jar commons-collections-3.2.2.jar -commons-compiler-2.7.6.jar +commons-compiler-3.0.0.jar commons-compress-1.4.1.jar commons-configuration-1.6.jar commons-crypto-1.0.0.jar http://git-wip-us.apache.org/repos/asf/spark/blob/34ad4d52/dev/deps/spark-deps-hadoop-2.6 -- diff --git a/dev/deps/spark-deps-hadoop-2.6 b/dev/deps/spark-deps-hadoop-2.6 index 4f04636..ba31391 100644 --- a/dev/deps/spark-deps-hadoop-2.6 +++ b/dev/deps/spark-deps-hadoop-2.6 @@ -31,7 +31,7 @@ commons-beanutils-core-1.8.0.jar commons-cli-1.2.jar commons-codec-1.10.jar commons-collections-3.2.2.jar -commons-compiler-2.7.6.jar +commons-compiler-3.0.0.jar commons-compress-1.4.1.jar commons-configuration-1.6.jar commons-crypto-1.0.0.jar http://git-wip-us.apache.org/repos/asf/spark/blob/34ad4d52/dev/deps/spark-deps-hadoop-2.7 -- diff --git a/dev/deps/spark-deps-hadoop-2.7 b/dev/deps/spark-deps-hadoop-2.7 index da3af9f..b129e5a 100644 --- a/dev/deps/spark-deps-hadoop-2.7 +++ b/dev/deps/spark-deps-hadoop-2.7 @@ -31,7 +31,7 @@ commons-beanutils-core-1.8.0.jar commons-cli-1.2.jar commons-codec-1.10.jar commons-collections-3.2.2.jar -commons-compiler-2.7.6.jar +commons-compiler-3.0.0.jar commons-compress-1.4.1.jar commons-configuration-1.6.jar
spark git commit: [SPARK-18602] Set the version of org.codehaus.janino:commons-compiler to 3.0.0 to match the version of org.codehaus.janino:janino
Repository: spark Updated Branches: refs/heads/master 237c3b964 -> eba727757 [SPARK-18602] Set the version of org.codehaus.janino:commons-compiler to 3.0.0 to match the version of org.codehaus.janino:janino ## What changes were proposed in this pull request? org.codehaus.janino:janino depends on org.codehaus.janino:commons-compiler and we have been upgraded to org.codehaus.janino:janino 3.0.0. However, seems we are still pulling in org.codehaus.janino:commons-compiler 2.7.6 because of calcite. It looks like an accident because we exclude janino from calcite (see here https://github.com/apache/spark/blob/branch-2.1/pom.xml#L1759). So, this PR upgrades org.codehaus.janino:commons-compiler to 3.0.0. ## How was this patch tested? jenkins Author: Yin HuaiCloses #16025 from yhuai/janino-commons-compile. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/eba72775 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/eba72775 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/eba72775 Branch: refs/heads/master Commit: eba727757ed5dc23c635e1926795aea62ec0fc66 Parents: 237c3b9 Author: Yin Huai Authored: Mon Nov 28 10:09:30 2016 -0800 Committer: Yin Huai Committed: Mon Nov 28 10:09:30 2016 -0800 -- dev/deps/spark-deps-hadoop-2.2 | 2 +- dev/deps/spark-deps-hadoop-2.3 | 2 +- dev/deps/spark-deps-hadoop-2.4 | 2 +- dev/deps/spark-deps-hadoop-2.6 | 2 +- dev/deps/spark-deps-hadoop-2.7 | 2 +- pom.xml| 9 + sql/catalyst/pom.xml | 4 7 files changed, 18 insertions(+), 5 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/eba72775/dev/deps/spark-deps-hadoop-2.2 -- diff --git a/dev/deps/spark-deps-hadoop-2.2 b/dev/deps/spark-deps-hadoop-2.2 index bbdea06..89bfcef 100644 --- a/dev/deps/spark-deps-hadoop-2.2 +++ b/dev/deps/spark-deps-hadoop-2.2 @@ -24,7 +24,7 @@ commons-beanutils-core-1.8.0.jar commons-cli-1.2.jar commons-codec-1.10.jar commons-collections-3.2.2.jar -commons-compiler-2.7.6.jar +commons-compiler-3.0.0.jar commons-compress-1.4.1.jar commons-configuration-1.6.jar commons-crypto-1.0.0.jar http://git-wip-us.apache.org/repos/asf/spark/blob/eba72775/dev/deps/spark-deps-hadoop-2.3 -- diff --git a/dev/deps/spark-deps-hadoop-2.3 b/dev/deps/spark-deps-hadoop-2.3 index a2dec41..8df3858 100644 --- a/dev/deps/spark-deps-hadoop-2.3 +++ b/dev/deps/spark-deps-hadoop-2.3 @@ -27,7 +27,7 @@ commons-beanutils-core-1.8.0.jar commons-cli-1.2.jar commons-codec-1.10.jar commons-collections-3.2.2.jar -commons-compiler-2.7.6.jar +commons-compiler-3.0.0.jar commons-compress-1.4.1.jar commons-configuration-1.6.jar commons-crypto-1.0.0.jar http://git-wip-us.apache.org/repos/asf/spark/blob/eba72775/dev/deps/spark-deps-hadoop-2.4 -- diff --git a/dev/deps/spark-deps-hadoop-2.4 b/dev/deps/spark-deps-hadoop-2.4 index c1f02b9..71e7fb6 100644 --- a/dev/deps/spark-deps-hadoop-2.4 +++ b/dev/deps/spark-deps-hadoop-2.4 @@ -27,7 +27,7 @@ commons-beanutils-core-1.8.0.jar commons-cli-1.2.jar commons-codec-1.10.jar commons-collections-3.2.2.jar -commons-compiler-2.7.6.jar +commons-compiler-3.0.0.jar commons-compress-1.4.1.jar commons-configuration-1.6.jar commons-crypto-1.0.0.jar http://git-wip-us.apache.org/repos/asf/spark/blob/eba72775/dev/deps/spark-deps-hadoop-2.6 -- diff --git a/dev/deps/spark-deps-hadoop-2.6 b/dev/deps/spark-deps-hadoop-2.6 index 4f04636..ba31391 100644 --- a/dev/deps/spark-deps-hadoop-2.6 +++ b/dev/deps/spark-deps-hadoop-2.6 @@ -31,7 +31,7 @@ commons-beanutils-core-1.8.0.jar commons-cli-1.2.jar commons-codec-1.10.jar commons-collections-3.2.2.jar -commons-compiler-2.7.6.jar +commons-compiler-3.0.0.jar commons-compress-1.4.1.jar commons-configuration-1.6.jar commons-crypto-1.0.0.jar http://git-wip-us.apache.org/repos/asf/spark/blob/eba72775/dev/deps/spark-deps-hadoop-2.7 -- diff --git a/dev/deps/spark-deps-hadoop-2.7 b/dev/deps/spark-deps-hadoop-2.7 index da3af9f..b129e5a 100644 --- a/dev/deps/spark-deps-hadoop-2.7 +++ b/dev/deps/spark-deps-hadoop-2.7 @@ -31,7 +31,7 @@ commons-beanutils-core-1.8.0.jar commons-cli-1.2.jar commons-codec-1.10.jar commons-collections-3.2.2.jar -commons-compiler-2.7.6.jar +commons-compiler-3.0.0.jar commons-compress-1.4.1.jar commons-configuration-1.6.jar commons-crypto-1.0.0.jar http://git-wip-us.apache.org/repos/asf/spark/blob/eba72775/pom.xml
spark git commit: [SPARK-18535][UI][YARN] Redact sensitive information from Spark logs and UI
Repository: spark Updated Branches: refs/heads/master d31ff9b7c -> 237c3b964 [SPARK-18535][UI][YARN] Redact sensitive information from Spark logs and UI ## What changes were proposed in this pull request? This patch adds a new property called `spark.secret.redactionPattern` that allows users to specify a scala regex to decide which Spark configuration properties and environment variables in driver and executor environments contain sensitive information. When this regex matches the property or environment variable name, its value is redacted from the environment UI and various logs like YARN and event logs. This change uses this property to redact information from event logs and YARN logs. It also, updates the UI code to adhere to this property instead of hardcoding the logic to decipher which properties are sensitive. Here's an image of the UI post-redaction: ![image](https://cloud.githubusercontent.com/assets/1709451/20506215/4cc30654-b007-11e6-8aee-4cde253fba2f.png) Here's the text in the YARN logs, post-redaction: ``HADOOP_CREDSTORE_PASSWORD -> *(redacted)`` Here's the text in the event logs, post-redaction: ``...,"spark.executorEnv.HADOOP_CREDSTORE_PASSWORD":"*(redacted)","spark.yarn.appMasterEnv.HADOOP_CREDSTORE_PASSWORD":"*(redacted)",...`` ## How was this patch tested? 1. Unit tests are added to ensure that redaction works. 2. A YARN job reading data off of S3 with confidential information (hadoop credential provider password) being provided in the environment variables of driver and executor. And, afterwards, logs were grepped to make sure that no mention of secret password was present. It was also ensure that the job was able to read the data off of S3 correctly, thereby ensuring that the sensitive information was being trickled down to the right places to read the data. 3. The event logs were checked to make sure no mention of secret password was present. 4. UI environment tab was checked to make sure there was no secret information being displayed. Author: Mark GroverCloses #15971 from markgrover/master_redaction. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/237c3b96 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/237c3b96 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/237c3b96 Branch: refs/heads/master Commit: 237c3b9642a1a7c5e7884824b21877590d5d0b3b Parents: d31ff9b Author: Mark Grover Authored: Mon Nov 28 08:59:47 2016 -0800 Committer: Marcelo Vanzin Committed: Mon Nov 28 08:59:47 2016 -0800 -- .../apache/spark/internal/config/package.scala | 9 + .../spark/scheduler/EventLoggingListener.scala | 13 - .../apache/spark/ui/env/EnvironmentPage.scala | 12 .../apache/spark/ui/env/EnvironmentTab.scala| 1 + .../scala/org/apache/spark/util/Utils.scala | 14 +- .../scheduler/EventLoggingListenerSuite.scala | 12 .../org/apache/spark/util/UtilsSuite.scala | 20 docs/configuration.md | 9 + .../spark/deploy/yarn/ExecutorRunnable.scala| 3 +-- 9 files changed, 81 insertions(+), 12 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/237c3b96/core/src/main/scala/org/apache/spark/internal/config/package.scala -- diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 2951bdc..a69a2b5 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -223,4 +223,13 @@ package object config { " bigger files.") .longConf .createWithDefault(4 * 1024 * 1024) + + private[spark] val SECRET_REDACTION_PATTERN = +ConfigBuilder("spark.redaction.regex") + .doc("Regex to decide which Spark configuration properties and environment variables in " + +"driver and executor environments contain sensitive information. When this regex matches " + +"a property, its value is redacted from the environment UI and various logs like YARN " + +"and event logs.") + .stringConf + .createWithDefault("(?i)secret|password") } http://git-wip-us.apache.org/repos/asf/spark/blob/237c3b96/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala -- diff --git a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala index ce78774..f39565e 100644 ---
spark git commit: [SPARK-17732][SQL] Revert ALTER TABLE DROP PARTITION should support comparators
Repository: spark Updated Branches: refs/heads/master 38e29824d -> d31ff9b7c [SPARK-17732][SQL] Revert ALTER TABLE DROP PARTITION should support comparators ## What changes were proposed in this pull request? https://github.com/apache/spark/pull/15704 will fail if we use int literal in `DROP PARTITION`, and we have reverted it in branch-2.1. This PR reverts it in master branch, and add a regression test for it, to make sure the master branch is healthy. ## How was this patch tested? new regression test Author: Wenchen FanCloses #16036 from cloud-fan/revert. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d31ff9b7 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d31ff9b7 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d31ff9b7 Branch: refs/heads/master Commit: d31ff9b7caf4eba66724947b68f517072e6a011c Parents: 38e2982 Author: Wenchen Fan Authored: Mon Nov 28 08:46:00 2016 -0800 Committer: Herman van Hovell Committed: Mon Nov 28 08:46:00 2016 -0800 -- .../apache/spark/sql/catalyst/parser/SqlBase.g4 | 6 +- .../spark/sql/catalyst/parser/AstBuilder.scala | 30 +- .../spark/sql/execution/SparkSqlParser.scala| 2 +- .../spark/sql/execution/command/ddl.scala | 51 ++--- .../datasources/DataSourceStrategy.scala| 8 +- .../sql/execution/command/DDLCommandSuite.scala | 9 +- .../spark/sql/execution/command/DDLSuite.scala | 14 ++- .../spark/sql/hive/execution/HiveDDLSuite.scala | 103 --- 8 files changed, 34 insertions(+), 189 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/d31ff9b7/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 -- diff --git a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 index 4531fe4..df85c70 100644 --- a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 +++ b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 @@ -235,7 +235,11 @@ partitionSpecLocation ; partitionSpec -: PARTITION '(' expression (',' expression)* ')' +: PARTITION '(' partitionVal (',' partitionVal)* ')' +; + +partitionVal +: identifier (EQ constant)? ; describeFuncName http://git-wip-us.apache.org/repos/asf/spark/blob/d31ff9b7/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index 8e8d374..3fa7bf1 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -194,15 +194,10 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with Logging { */ override def visitPartitionSpec( ctx: PartitionSpecContext): Map[String, Option[String]] = withOrigin(ctx) { -val parts = ctx.expression.asScala.map { pVal => - expression(pVal) match { -case UnresolvedAttribute(name :: Nil) => - name -> None -case cmp @ EqualTo(UnresolvedAttribute(name :: Nil), constant: Literal) => - name -> Option(constant.toString) -case _ => - throw new ParseException("Invalid partition filter specification", ctx) - } +val parts = ctx.partitionVal.asScala.map { pVal => + val name = pVal.identifier.getText + val value = Option(pVal.constant).map(visitStringConstant) + name -> value } // Before calling `toMap`, we check duplicated keys to avoid silently ignore partition values // in partition spec like PARTITION(a='1', b='2', a='3'). The real semantical check for @@ -212,23 +207,6 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with Logging { } /** - * Create a partition filter specification. - */ - def visitPartitionFilterSpec(ctx: PartitionSpecContext): Expression = withOrigin(ctx) { -val parts = ctx.expression.asScala.map { pVal => - expression(pVal) match { -case EqualNullSafe(_, _) => - throw new ParseException("'<=>' operator is not allowed in partition specification.", ctx) -case cmp @ BinaryComparison(UnresolvedAttribute(name :: Nil), constant: Literal) => - cmp.withNewChildren(Seq(AttributeReference(name, StringType)(), constant)) -case _ => - throw new ParseException("Invalid
spark git commit: [SPARK-18597][SQL] Do not push-down join conditions to the right side of a LEFT ANTI join
Repository: spark Updated Branches: refs/heads/master 9f273c517 -> 38e29824d [SPARK-18597][SQL] Do not push-down join conditions to the right side of a LEFT ANTI join ## What changes were proposed in this pull request? We currently push down join conditions of a Left Anti join to both sides of the join. This is similar to Inner, Left Semi and Existence (a specialized left semi) join. The problem is that this changes the semantics of the join; a left anti join filters out rows that matches the join condition. This PR fixes this by only pushing down conditions to the left hand side of the join. This is similar to the behavior of left outer join. ## How was this patch tested? Added tests to `FilterPushdownSuite.scala` and created a SQLQueryTestSuite file for left anti joins with a regression test. Author: Herman van HovellCloses #16026 from hvanhovell/SPARK-18597. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/38e29824 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/38e29824 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/38e29824 Branch: refs/heads/master Commit: 38e29824d9a50464daa397c28e89610ed0aed4b6 Parents: 9f273c5 Author: Herman van Hovell Authored: Mon Nov 28 07:10:52 2016 -0800 Committer: Herman van Hovell Committed: Mon Nov 28 07:10:52 2016 -0800 -- .../sql/catalyst/optimizer/Optimizer.scala | 6 ++-- .../optimizer/FilterPushdownSuite.scala | 33 .../resources/sql-tests/inputs/anti-join.sql| 7 + .../sql-tests/results/anti-join.sql.out | 29 + 4 files changed, 72 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/38e29824/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 2679e02..805cad5 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -932,7 +932,7 @@ object PushPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper { split(joinCondition.map(splitConjunctivePredicates).getOrElse(Nil), left, right) joinType match { -case _: InnerLike | LeftExistence(_) => +case _: InnerLike | LeftSemi | ExistenceJoin(_) => // push down the single side only join filter for both sides sub queries val newLeft = leftJoinConditions. reduceLeftOption(And).map(Filter(_, left)).getOrElse(left) @@ -949,14 +949,14 @@ object PushPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper { val newJoinCond = (rightJoinConditions ++ commonJoinCondition).reduceLeftOption(And) Join(newLeft, newRight, RightOuter, newJoinCond) -case LeftOuter => +case LeftOuter | LeftAnti => // push down the right side only join filter for right sub query val newLeft = left val newRight = rightJoinConditions. reduceLeftOption(And).map(Filter(_, right)).getOrElse(right) val newJoinCond = (leftJoinConditions ++ commonJoinCondition).reduceLeftOption(And) - Join(newLeft, newRight, LeftOuter, newJoinCond) + Join(newLeft, newRight, joinType, newJoinCond) case FullOuter => j case NaturalJoin(_) => sys.error("Untransformed NaturalJoin node") case UsingJoin(_, _) => sys.error("Untransformed Using join node") http://git-wip-us.apache.org/repos/asf/spark/blob/38e29824/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala -- diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala index 019f132..3e67282 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala @@ -514,6 +514,39 @@ class FilterPushdownSuite extends PlanTest { comparePlans(optimized, analysis.EliminateSubqueryAliases(correctAnswer)) } + test("joins: push down where clause into left anti join") { +val x = testRelation.subquery('x) +val y = testRelation.subquery('y) +val
spark git commit: [SPARK-18597][SQL] Do not push-down join conditions to the right side of a LEFT ANTI join
Repository: spark Updated Branches: refs/heads/branch-2.1 a9d4febe9 -> 32b259fae [SPARK-18597][SQL] Do not push-down join conditions to the right side of a LEFT ANTI join ## What changes were proposed in this pull request? We currently push down join conditions of a Left Anti join to both sides of the join. This is similar to Inner, Left Semi and Existence (a specialized left semi) join. The problem is that this changes the semantics of the join; a left anti join filters out rows that matches the join condition. This PR fixes this by only pushing down conditions to the left hand side of the join. This is similar to the behavior of left outer join. ## How was this patch tested? Added tests to `FilterPushdownSuite.scala` and created a SQLQueryTestSuite file for left anti joins with a regression test. Author: Herman van HovellCloses #16026 from hvanhovell/SPARK-18597. (cherry picked from commit 38e29824d9a50464daa397c28e89610ed0aed4b6) Signed-off-by: Herman van Hovell Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/32b259fa Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/32b259fa Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/32b259fa Branch: refs/heads/branch-2.1 Commit: 32b259faed7e0573c0f465954205cbd3b94ee440 Parents: a9d4feb Author: Herman van Hovell Authored: Mon Nov 28 07:10:52 2016 -0800 Committer: Herman van Hovell Committed: Mon Nov 28 07:11:02 2016 -0800 -- .../sql/catalyst/optimizer/Optimizer.scala | 6 ++-- .../optimizer/FilterPushdownSuite.scala | 33 .../resources/sql-tests/inputs/anti-join.sql| 7 + .../sql-tests/results/anti-join.sql.out | 29 + 4 files changed, 72 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/32b259fa/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 2679e02..805cad5 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -932,7 +932,7 @@ object PushPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper { split(joinCondition.map(splitConjunctivePredicates).getOrElse(Nil), left, right) joinType match { -case _: InnerLike | LeftExistence(_) => +case _: InnerLike | LeftSemi | ExistenceJoin(_) => // push down the single side only join filter for both sides sub queries val newLeft = leftJoinConditions. reduceLeftOption(And).map(Filter(_, left)).getOrElse(left) @@ -949,14 +949,14 @@ object PushPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper { val newJoinCond = (rightJoinConditions ++ commonJoinCondition).reduceLeftOption(And) Join(newLeft, newRight, RightOuter, newJoinCond) -case LeftOuter => +case LeftOuter | LeftAnti => // push down the right side only join filter for right sub query val newLeft = left val newRight = rightJoinConditions. reduceLeftOption(And).map(Filter(_, right)).getOrElse(right) val newJoinCond = (leftJoinConditions ++ commonJoinCondition).reduceLeftOption(And) - Join(newLeft, newRight, LeftOuter, newJoinCond) + Join(newLeft, newRight, joinType, newJoinCond) case FullOuter => j case NaturalJoin(_) => sys.error("Untransformed NaturalJoin node") case UsingJoin(_, _) => sys.error("Untransformed Using join node") http://git-wip-us.apache.org/repos/asf/spark/blob/32b259fa/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala -- diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala index 019f132..3e67282 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala @@ -514,6 +514,39 @@ class FilterPushdownSuite extends PlanTest { comparePlans(optimized, analysis.EliminateSubqueryAliases(correctAnswer)) } + test("joins: push
spark git commit: [SPARK-17783][SQL] Hide Credentials in CREATE and DESC FORMATTED/EXTENDED a PERSISTENT/TEMP Table for JDBC
Repository: spark Updated Branches: refs/heads/branch-2.1 e449f7546 -> a9d4febe9 [SPARK-17783][SQL] Hide Credentials in CREATE and DESC FORMATTED/EXTENDED a PERSISTENT/TEMP Table for JDBC ### What changes were proposed in this pull request? We should never expose the Credentials in the EXPLAIN and DESC FORMATTED/EXTENDED command. However, below commands exposed the credentials. In the related PR: https://github.com/apache/spark/pull/10452 > URL patterns to specify credential seems to be vary between different > databases. Thus, we hide the whole `url` value if it contains the keyword `password`. We also hide the `password` property. Before the fix, the command outputs look like: ``` SQL CREATE TABLE tab1 USING org.apache.spark.sql.jdbc OPTIONS ( url 'jdbc:h2:mem:testdb0;user=testUser;password=testPass', dbtable 'TEST.PEOPLE', user 'testUser', password '$password') DESC FORMATTED tab1 DESC EXTENDED tab1 ``` Before the fix, - The output of SQL statement EXPLAIN ``` == Physical Plan == ExecutedCommand +- CreateDataSourceTableCommand CatalogTable( Table: `tab1` Created: Wed Nov 16 23:00:10 PST 2016 Last Access: Wed Dec 31 15:59:59 PST 1969 Type: MANAGED Provider: org.apache.spark.sql.jdbc Storage(Properties: [url=jdbc:h2:mem:testdb0;user=testUser;password=testPass, dbtable=TEST.PEOPLE, user=testUser, password=testPass])), false ``` - The output of `DESC FORMATTED` ``` ... |Storage Desc Parameters:| | | | url |jdbc:h2:mem:testdb0;user=testUser;password=testPass | | | dbtable |TEST.PEOPLE | | | user |testUser | | | password |testPass | | ++--+---+ ``` - The output of `DESC EXTENDED` ``` |# Detailed Table Information|CatalogTable( Table: `default`.`tab1` Created: Wed Nov 16 23:00:10 PST 2016 Last Access: Wed Dec 31 15:59:59 PST 1969 Type: MANAGED Schema: [StructField(NAME,StringType,false), StructField(THEID,IntegerType,false)] Provider: org.apache.spark.sql.jdbc Storage(Location: file:/Users/xiaoli/IdeaProjects/sparkDelivery/spark-warehouse/tab1, Properties: [url=jdbc:h2:mem:testdb0;user=testUser;password=testPass, dbtable=TEST.PEOPLE, user=testUser, password=testPass]))| | ``` After the fix, - The output of SQL statement EXPLAIN ``` == Physical Plan == ExecutedCommand +- CreateDataSourceTableCommand CatalogTable( Table: `tab1` Created: Wed Nov 16 22:43:49 PST 2016 Last Access: Wed Dec 31 15:59:59 PST 1969 Type: MANAGED Provider: org.apache.spark.sql.jdbc Storage(Properties: [url=###, dbtable=TEST.PEOPLE, user=testUser, password=###])), false ``` - The output of `DESC FORMATTED` ``` ... |Storage Desc Parameters:| | | | url |### | | | dbtable |TEST.PEOPLE | | | user |testUser | | | password |### | | ++--+---+ ``` - The output of `DESC EXTENDED` ``` |# Detailed Table Information|CatalogTable( Table: `default`.`tab1` Created: Wed Nov 16 22:43:49 PST 2016 Last Access: Wed Dec 31 15:59:59 PST 1969 Type: MANAGED Schema: [StructField(NAME,StringType,false), StructField(THEID,IntegerType,false)] Provider: org.apache.spark.sql.jdbc Storage(Location: file:/Users/xiaoli/IdeaProjects/sparkDelivery/spark-warehouse/tab1, Properties: [url=###, dbtable=TEST.PEOPLE, user=testUser, password=###]))| | ``` ### How was this patch tested? Added test cases Author: gatorsmileCloses #15358 from gatorsmile/maskCredentials. (cherry picked from commit 9f273c5173c05017c3009faaf3e10f2f70a842d0) Signed-off-by: Herman van Hovell Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a9d4febe Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a9d4febe Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a9d4febe Branch: refs/heads/branch-2.1 Commit: a9d4febe900aa3eb9c595089e7283a64a24c8761
spark git commit: [SPARK-17783][SQL] Hide Credentials in CREATE and DESC FORMATTED/EXTENDED a PERSISTENT/TEMP Table for JDBC
Repository: spark Updated Branches: refs/heads/master 70dfdcbbf -> 9f273c517 [SPARK-17783][SQL] Hide Credentials in CREATE and DESC FORMATTED/EXTENDED a PERSISTENT/TEMP Table for JDBC ### What changes were proposed in this pull request? We should never expose the Credentials in the EXPLAIN and DESC FORMATTED/EXTENDED command. However, below commands exposed the credentials. In the related PR: https://github.com/apache/spark/pull/10452 > URL patterns to specify credential seems to be vary between different > databases. Thus, we hide the whole `url` value if it contains the keyword `password`. We also hide the `password` property. Before the fix, the command outputs look like: ``` SQL CREATE TABLE tab1 USING org.apache.spark.sql.jdbc OPTIONS ( url 'jdbc:h2:mem:testdb0;user=testUser;password=testPass', dbtable 'TEST.PEOPLE', user 'testUser', password '$password') DESC FORMATTED tab1 DESC EXTENDED tab1 ``` Before the fix, - The output of SQL statement EXPLAIN ``` == Physical Plan == ExecutedCommand +- CreateDataSourceTableCommand CatalogTable( Table: `tab1` Created: Wed Nov 16 23:00:10 PST 2016 Last Access: Wed Dec 31 15:59:59 PST 1969 Type: MANAGED Provider: org.apache.spark.sql.jdbc Storage(Properties: [url=jdbc:h2:mem:testdb0;user=testUser;password=testPass, dbtable=TEST.PEOPLE, user=testUser, password=testPass])), false ``` - The output of `DESC FORMATTED` ``` ... |Storage Desc Parameters:| | | | url |jdbc:h2:mem:testdb0;user=testUser;password=testPass | | | dbtable |TEST.PEOPLE | | | user |testUser | | | password |testPass | | ++--+---+ ``` - The output of `DESC EXTENDED` ``` |# Detailed Table Information|CatalogTable( Table: `default`.`tab1` Created: Wed Nov 16 23:00:10 PST 2016 Last Access: Wed Dec 31 15:59:59 PST 1969 Type: MANAGED Schema: [StructField(NAME,StringType,false), StructField(THEID,IntegerType,false)] Provider: org.apache.spark.sql.jdbc Storage(Location: file:/Users/xiaoli/IdeaProjects/sparkDelivery/spark-warehouse/tab1, Properties: [url=jdbc:h2:mem:testdb0;user=testUser;password=testPass, dbtable=TEST.PEOPLE, user=testUser, password=testPass]))| | ``` After the fix, - The output of SQL statement EXPLAIN ``` == Physical Plan == ExecutedCommand +- CreateDataSourceTableCommand CatalogTable( Table: `tab1` Created: Wed Nov 16 22:43:49 PST 2016 Last Access: Wed Dec 31 15:59:59 PST 1969 Type: MANAGED Provider: org.apache.spark.sql.jdbc Storage(Properties: [url=###, dbtable=TEST.PEOPLE, user=testUser, password=###])), false ``` - The output of `DESC FORMATTED` ``` ... |Storage Desc Parameters:| | | | url |### | | | dbtable |TEST.PEOPLE | | | user |testUser | | | password |### | | ++--+---+ ``` - The output of `DESC EXTENDED` ``` |# Detailed Table Information|CatalogTable( Table: `default`.`tab1` Created: Wed Nov 16 22:43:49 PST 2016 Last Access: Wed Dec 31 15:59:59 PST 1969 Type: MANAGED Schema: [StructField(NAME,StringType,false), StructField(THEID,IntegerType,false)] Provider: org.apache.spark.sql.jdbc Storage(Location: file:/Users/xiaoli/IdeaProjects/sparkDelivery/spark-warehouse/tab1, Properties: [url=###, dbtable=TEST.PEOPLE, user=testUser, password=###]))| | ``` ### How was this patch tested? Added test cases Author: gatorsmileCloses #15358 from gatorsmile/maskCredentials. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9f273c51 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9f273c51 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9f273c51 Branch: refs/heads/master Commit: 9f273c5173c05017c3009faaf3e10f2f70a842d0 Parents: 70dfdcb Author: gatorsmile Authored: Mon Nov 28 07:04:38 2016 -0800 Committer: Herman van Hovell
spark git commit: [SPARK-18118][SQL] fix a compilation error due to nested JavaBeans
Repository: spark Updated Branches: refs/heads/branch-2.0 9070bd31c -> 759bd4a6a [SPARK-18118][SQL] fix a compilation error due to nested JavaBeans Remove this reference. (cherry picked from commit 70dfdcbbf11c9c3174abc111afa2250236e31af2) Signed-off-by: Herman van HovellProject: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/759bd4a6 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/759bd4a6 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/759bd4a6 Branch: refs/heads/branch-2.0 Commit: 759bd4a6a7ea83b654089c6bd1d1574c709ca35f Parents: 9070bd3 Author: Herman van Hovell Authored: Mon Nov 28 04:41:43 2016 -0800 Committer: Herman van Hovell Committed: Mon Nov 28 04:46:32 2016 -0800 -- .../apache/spark/sql/catalyst/expressions/objects/objects.scala| 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/759bd4a6/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala index 8043475..c17c807 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala @@ -695,7 +695,7 @@ case class InitializeJavaBean(beanInstance: Expression, setters: Map[String, Exp val fieldGen = fieldValue.genCode(ctx) s""" ${fieldGen.code} - this.${javaBeanInstance}.$setterMethod(${fieldGen.value}); + ${javaBeanInstance}.$setterMethod(${fieldGen.value}); """ } val initializeCode = ctx.splitExpressions(ctx.INPUT_ROW, initialize.toSeq) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-18118][SQL] fix a compilation error due to nested JavaBeans
Repository: spark Updated Branches: refs/heads/branch-2.1 712bd5abc -> e449f7546 [SPARK-18118][SQL] fix a compilation error due to nested JavaBeans Remove this reference. (cherry picked from commit 70dfdcbbf11c9c3174abc111afa2250236e31af2) Signed-off-by: Herman van HovellProject: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e449f754 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e449f754 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e449f754 Branch: refs/heads/branch-2.1 Commit: e449f7546897c5f29075e6a0913a5a6106bcbb5f Parents: 712bd5a Author: Herman van Hovell Authored: Mon Nov 28 04:41:43 2016 -0800 Committer: Herman van Hovell Committed: Mon Nov 28 04:45:23 2016 -0800 -- .../apache/spark/sql/catalyst/expressions/objects/objects.scala| 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/e449f754/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala index 6952f54..e517ec1 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala @@ -905,7 +905,7 @@ case class InitializeJavaBean(beanInstance: Expression, setters: Map[String, Exp val fieldGen = fieldValue.genCode(ctx) s""" ${fieldGen.code} - this.${javaBeanInstance}.$setterMethod(${fieldGen.value}); + ${javaBeanInstance}.$setterMethod(${fieldGen.value}); """ } val initializeCode = ctx.splitExpressions(ctx.INPUT_ROW, initialize.toSeq) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-18118][SQL] fix a compilation error due to nested JavaBeans\nRemove this reference.
Repository: spark Updated Branches: refs/heads/master f075cd9cb -> 70dfdcbbf [SPARK-18118][SQL] fix a compilation error due to nested JavaBeans\nRemove this reference. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/70dfdcbb Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/70dfdcbb Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/70dfdcbb Branch: refs/heads/master Commit: 70dfdcbbf11c9c3174abc111afa2250236e31af2 Parents: f075cd9 Author: Herman van HovellAuthored: Mon Nov 28 04:41:43 2016 -0800 Committer: Herman van Hovell Committed: Mon Nov 28 04:41:43 2016 -0800 -- .../apache/spark/sql/catalyst/expressions/objects/objects.scala| 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/70dfdcbb/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala index 6952f54..e517ec1 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala @@ -905,7 +905,7 @@ case class InitializeJavaBean(beanInstance: Expression, setters: Map[String, Exp val fieldGen = fieldValue.genCode(ctx) s""" ${fieldGen.code} - this.${javaBeanInstance}.$setterMethod(${fieldGen.value}); + ${javaBeanInstance}.$setterMethod(${fieldGen.value}); """ } val initializeCode = ctx.splitExpressions(ctx.INPUT_ROW, initialize.toSeq) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-18118][SQL] fix a compilation error due to nested JavaBeans
Repository: spark Updated Branches: refs/heads/branch-2.1 d6e027e61 -> 712bd5abc [SPARK-18118][SQL] fix a compilation error due to nested JavaBeans ## What changes were proposed in this pull request? This PR avoids a compilation error due to more than 64KB Java byte code size. This error occur since generated java code `SpecificSafeProjection.apply()` for nested JavaBeans is too big. This PR avoids this compilation error by splitting a big code chunk into multiple methods by calling `CodegenContext.splitExpression` at `InitializeJavaBean.doGenCode` An object reference for JavaBean is stored to an instance variable `javaBean...`. Then, the instance variable will be referenced in the split methods. Generated code with this PR /* 22098 */ private void apply130_0(InternalRow i) { ... /* 22125 */ boolean isNull238 = i.isNullAt(2); /* 22126 */ InternalRow value238 = isNull238 ? null : (i.getStruct(2, 3)); /* 22127 */ boolean isNull236 = false; /* 22128 */ test.org.apache.spark.sql.JavaDatasetSuite$Nesting1 value236 = null; /* 22129 */ if (!false && isNull238) { /* 22130 */ /* 22131 */ final test.org.apache.spark.sql.JavaDatasetSuite$Nesting1 value239 = null; /* 22132 */ isNull236 = true; /* 22133 */ value236 = value239; /* 22134 */ } else { /* 22135 */ /* 22136 */ final test.org.apache.spark.sql.JavaDatasetSuite$Nesting1 value241 = false ? null : new test.org.apache.spark.sql.JavaDatasetSuite$Nesting1(); /* 22137 */ this.javaBean14 = value241; /* 22138 */ if (!false) { /* 22139 */ apply25_0(i); /* 22140 */ apply25_1(i); /* 22141 */ apply25_2(i); /* 22142 */ } /* 22143 */ isNull236 = false; /* 22144 */ value236 = value241; /* 22145 */ } /* 22146 */ this.javaBean.setField2(value236); /* 22147 */ /* 22148 */ } ... /* 22928 */ public java.lang.Object apply(java.lang.Object _i) { /* 22929 */ InternalRow i = (InternalRow) _i; /* 22930 */ /* 22931 */ final test.org.apache.spark.sql.JavaDatasetSuite$NestedComplicatedJavaBean value1 = false ? null : new test.org.apache.spark.sql.JavaDatasetSuite$NestedComplicatedJavaBean(); /* 22932 */ this.javaBean = value1; /* 22933 */ if (!false) { /* 22934 */ apply130_0(i); /* 22935 */ apply130_1(i); /* 22936 */ apply130_2(i); /* 22937 */ apply130_3(i); /* 22938 */ apply130_4(i); /* 22939 */ } /* 22940 */ if (false) { /* 22941 */ mutableRow.setNullAt(0); /* 22942 */ } else { /* 22943 */ /* 22944 */ mutableRow.update(0, value1); /* 22945 */ } /* 22946 */ /* 22947 */ return mutableRow; /* 22948 */ } ## How was this patch tested? added a test suite into `JavaDatasetSuite.java` Author: Kazuaki IshizakiCloses #16032 from kiszk/SPARK-18118. (cherry picked from commit f075cd9cb7157819df9aec67baee8913c4ed5c53) Signed-off-by: Herman van Hovell Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/712bd5ab Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/712bd5ab Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/712bd5ab Branch: refs/heads/branch-2.1 Commit: 712bd5abc827c4eaf3f53bfc9155c8535584ca96 Parents: d6e027e Author: Kazuaki Ishizaki Authored: Mon Nov 28 04:18:35 2016 -0800 Committer: Herman van Hovell Committed: Mon Nov 28 04:18:46 2016 -0800 -- .../catalyst/expressions/objects/objects.scala | 10 +- .../org/apache/spark/sql/JavaDatasetSuite.java | 429 +++ 2 files changed, 437 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/712bd5ab/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala index 5c27179..6952f54 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala @@ -896,19 +896,25 @@ case class InitializeJavaBean(beanInstance: Expression, setters: Map[String, Exp override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { val instanceGen = beanInstance.genCode(ctx) +val javaBeanInstance = ctx.freshName("javaBean") +val beanInstanceJavaType = ctx.javaType(beanInstance.dataType) +ctx.addMutableState(beanInstanceJavaType, javaBeanInstance, "") + val initialize = setters.map { case
spark git commit: [SPARK-18118][SQL] fix a compilation error due to nested JavaBeans
Repository: spark Updated Branches: refs/heads/branch-2.0 e67ce4837 -> 9070bd31c [SPARK-18118][SQL] fix a compilation error due to nested JavaBeans ## What changes were proposed in this pull request? This PR avoids a compilation error due to more than 64KB Java byte code size. This error occur since generated java code `SpecificSafeProjection.apply()` for nested JavaBeans is too big. This PR avoids this compilation error by splitting a big code chunk into multiple methods by calling `CodegenContext.splitExpression` at `InitializeJavaBean.doGenCode` An object reference for JavaBean is stored to an instance variable `javaBean...`. Then, the instance variable will be referenced in the split methods. Generated code with this PR /* 22098 */ private void apply130_0(InternalRow i) { ... /* 22125 */ boolean isNull238 = i.isNullAt(2); /* 22126 */ InternalRow value238 = isNull238 ? null : (i.getStruct(2, 3)); /* 22127 */ boolean isNull236 = false; /* 22128 */ test.org.apache.spark.sql.JavaDatasetSuite$Nesting1 value236 = null; /* 22129 */ if (!false && isNull238) { /* 22130 */ /* 22131 */ final test.org.apache.spark.sql.JavaDatasetSuite$Nesting1 value239 = null; /* 22132 */ isNull236 = true; /* 22133 */ value236 = value239; /* 22134 */ } else { /* 22135 */ /* 22136 */ final test.org.apache.spark.sql.JavaDatasetSuite$Nesting1 value241 = false ? null : new test.org.apache.spark.sql.JavaDatasetSuite$Nesting1(); /* 22137 */ this.javaBean14 = value241; /* 22138 */ if (!false) { /* 22139 */ apply25_0(i); /* 22140 */ apply25_1(i); /* 22141 */ apply25_2(i); /* 22142 */ } /* 22143 */ isNull236 = false; /* 22144 */ value236 = value241; /* 22145 */ } /* 22146 */ this.javaBean.setField2(value236); /* 22147 */ /* 22148 */ } ... /* 22928 */ public java.lang.Object apply(java.lang.Object _i) { /* 22929 */ InternalRow i = (InternalRow) _i; /* 22930 */ /* 22931 */ final test.org.apache.spark.sql.JavaDatasetSuite$NestedComplicatedJavaBean value1 = false ? null : new test.org.apache.spark.sql.JavaDatasetSuite$NestedComplicatedJavaBean(); /* 22932 */ this.javaBean = value1; /* 22933 */ if (!false) { /* 22934 */ apply130_0(i); /* 22935 */ apply130_1(i); /* 22936 */ apply130_2(i); /* 22937 */ apply130_3(i); /* 22938 */ apply130_4(i); /* 22939 */ } /* 22940 */ if (false) { /* 22941 */ mutableRow.setNullAt(0); /* 22942 */ } else { /* 22943 */ /* 22944 */ mutableRow.update(0, value1); /* 22945 */ } /* 22946 */ /* 22947 */ return mutableRow; /* 22948 */ } ## How was this patch tested? added a test suite into `JavaDatasetSuite.java` Author: Kazuaki IshizakiCloses #16032 from kiszk/SPARK-18118. (cherry picked from commit f075cd9cb7157819df9aec67baee8913c4ed5c53) Signed-off-by: Herman van Hovell Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9070bd31 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9070bd31 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9070bd31 Branch: refs/heads/branch-2.0 Commit: 9070bd31c243d74d3c28c5208bc11e41876590ca Parents: e67ce48 Author: Kazuaki Ishizaki Authored: Mon Nov 28 04:18:35 2016 -0800 Committer: Herman van Hovell Committed: Mon Nov 28 04:19:01 2016 -0800 -- .../catalyst/expressions/objects/objects.scala | 10 +- .../org/apache/spark/sql/JavaDatasetSuite.java | 429 +++ 2 files changed, 437 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/9070bd31/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala index d9c29b3..8043475 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala @@ -686,19 +686,25 @@ case class InitializeJavaBean(beanInstance: Expression, setters: Map[String, Exp override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { val instanceGen = beanInstance.genCode(ctx) +val javaBeanInstance = ctx.freshName("javaBean") +val beanInstanceJavaType = ctx.javaType(beanInstance.dataType) +ctx.addMutableState(beanInstanceJavaType, javaBeanInstance, "") + val initialize = setters.map { case
spark git commit: [SPARK-18118][SQL] fix a compilation error due to nested JavaBeans
Repository: spark Updated Branches: refs/heads/master 454b80499 -> f075cd9cb [SPARK-18118][SQL] fix a compilation error due to nested JavaBeans ## What changes were proposed in this pull request? This PR avoids a compilation error due to more than 64KB Java byte code size. This error occur since generated java code `SpecificSafeProjection.apply()` for nested JavaBeans is too big. This PR avoids this compilation error by splitting a big code chunk into multiple methods by calling `CodegenContext.splitExpression` at `InitializeJavaBean.doGenCode` An object reference for JavaBean is stored to an instance variable `javaBean...`. Then, the instance variable will be referenced in the split methods. Generated code with this PR /* 22098 */ private void apply130_0(InternalRow i) { ... /* 22125 */ boolean isNull238 = i.isNullAt(2); /* 22126 */ InternalRow value238 = isNull238 ? null : (i.getStruct(2, 3)); /* 22127 */ boolean isNull236 = false; /* 22128 */ test.org.apache.spark.sql.JavaDatasetSuite$Nesting1 value236 = null; /* 22129 */ if (!false && isNull238) { /* 22130 */ /* 22131 */ final test.org.apache.spark.sql.JavaDatasetSuite$Nesting1 value239 = null; /* 22132 */ isNull236 = true; /* 22133 */ value236 = value239; /* 22134 */ } else { /* 22135 */ /* 22136 */ final test.org.apache.spark.sql.JavaDatasetSuite$Nesting1 value241 = false ? null : new test.org.apache.spark.sql.JavaDatasetSuite$Nesting1(); /* 22137 */ this.javaBean14 = value241; /* 22138 */ if (!false) { /* 22139 */ apply25_0(i); /* 22140 */ apply25_1(i); /* 22141 */ apply25_2(i); /* 22142 */ } /* 22143 */ isNull236 = false; /* 22144 */ value236 = value241; /* 22145 */ } /* 22146 */ this.javaBean.setField2(value236); /* 22147 */ /* 22148 */ } ... /* 22928 */ public java.lang.Object apply(java.lang.Object _i) { /* 22929 */ InternalRow i = (InternalRow) _i; /* 22930 */ /* 22931 */ final test.org.apache.spark.sql.JavaDatasetSuite$NestedComplicatedJavaBean value1 = false ? null : new test.org.apache.spark.sql.JavaDatasetSuite$NestedComplicatedJavaBean(); /* 22932 */ this.javaBean = value1; /* 22933 */ if (!false) { /* 22934 */ apply130_0(i); /* 22935 */ apply130_1(i); /* 22936 */ apply130_2(i); /* 22937 */ apply130_3(i); /* 22938 */ apply130_4(i); /* 22939 */ } /* 22940 */ if (false) { /* 22941 */ mutableRow.setNullAt(0); /* 22942 */ } else { /* 22943 */ /* 22944 */ mutableRow.update(0, value1); /* 22945 */ } /* 22946 */ /* 22947 */ return mutableRow; /* 22948 */ } ## How was this patch tested? added a test suite into `JavaDatasetSuite.java` Author: Kazuaki IshizakiCloses #16032 from kiszk/SPARK-18118. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f075cd9c Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f075cd9c Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f075cd9c Branch: refs/heads/master Commit: f075cd9cb7157819df9aec67baee8913c4ed5c53 Parents: 454b804 Author: Kazuaki Ishizaki Authored: Mon Nov 28 04:18:35 2016 -0800 Committer: Herman van Hovell Committed: Mon Nov 28 04:18:35 2016 -0800 -- .../catalyst/expressions/objects/objects.scala | 10 +- .../org/apache/spark/sql/JavaDatasetSuite.java | 429 +++ 2 files changed, 437 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/f075cd9c/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala index 5c27179..6952f54 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala @@ -896,19 +896,25 @@ case class InitializeJavaBean(beanInstance: Expression, setters: Map[String, Exp override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { val instanceGen = beanInstance.genCode(ctx) +val javaBeanInstance = ctx.freshName("javaBean") +val beanInstanceJavaType = ctx.javaType(beanInstance.dataType) +ctx.addMutableState(beanInstanceJavaType, javaBeanInstance, "") + val initialize = setters.map { case (setterMethod, fieldValue) => val fieldGen = fieldValue.genCode(ctx) s""" ${fieldGen.code} -
spark git commit: [SPARK-18604][SQL] Make sure CollapseWindow returns the attributes in the same order.
Repository: spark Updated Branches: refs/heads/branch-2.1 886f880df -> d6e027e61 [SPARK-18604][SQL] Make sure CollapseWindow returns the attributes in the same order. ## What changes were proposed in this pull request? The `CollapseWindow` optimizer rule changes the order of output attributes. This modifies the output of the plan, which the optimizer cannot do. This also breaks things like `collect()` for which we use a `RowEncoder` that assumes that the output attributes of the executed plan are equal to those outputted by the logical plan. ## How was this patch tested? I have updated an incorrect test in `CollapseWindowSuite`. Author: Herman van HovellCloses #16027 from hvanhovell/SPARK-18604. (cherry picked from commit 454b8049916a0353772a0ea5cfe14b62cbd81df4) Signed-off-by: Herman van Hovell Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d6e027e6 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d6e027e6 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d6e027e6 Branch: refs/heads/branch-2.1 Commit: d6e027e610bdff0123e71925735ecedcf4787b83 Parents: 886f880 Author: Herman van Hovell Authored: Mon Nov 28 02:56:26 2016 -0800 Committer: Herman van Hovell Committed: Mon Nov 28 02:56:38 2016 -0800 -- .../spark/sql/catalyst/optimizer/Optimizer.scala | 2 +- .../sql/catalyst/optimizer/CollapseWindowSuite.scala | 13 - 2 files changed, 9 insertions(+), 6 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/d6e027e6/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 6ba8b33..2679e02 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -545,7 +545,7 @@ object CollapseRepartition extends Rule[LogicalPlan] { object CollapseWindow extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan transformUp { case w @ Window(we1, ps1, os1, Window(we2, ps2, os2, grandChild)) if ps1 == ps2 && os1 == os2 => - w.copy(windowExpressions = we1 ++ we2, child = grandChild) + w.copy(windowExpressions = we2 ++ we1, child = grandChild) } } http://git-wip-us.apache.org/repos/asf/spark/blob/d6e027e6/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CollapseWindowSuite.scala -- diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CollapseWindowSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CollapseWindowSuite.scala index 797076e..3f7d1d9 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CollapseWindowSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CollapseWindowSuite.scala @@ -46,12 +46,15 @@ class CollapseWindowSuite extends PlanTest { .window(Seq(sum(b).as('sum_b)), partitionSpec1, orderSpec1) .window(Seq(avg(b).as('avg_b)), partitionSpec1, orderSpec1) -val optimized = Optimize.execute(query.analyze) +val analyzed = query.analyze +val optimized = Optimize.execute(analyzed) +assert(analyzed.output === optimized.output) + val correctAnswer = testRelation.window(Seq( -avg(b).as('avg_b), -sum(b).as('sum_b), -max(a).as('max_a), -min(a).as('min_a)), partitionSpec1, orderSpec1) + min(a).as('min_a), + max(a).as('max_a), + sum(b).as('sum_b), + avg(b).as('avg_b)), partitionSpec1, orderSpec1) comparePlans(optimized, correctAnswer) } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-18604][SQL] Make sure CollapseWindow returns the attributes in the same order.
Repository: spark Updated Branches: refs/heads/master 87141622e -> 454b80499 [SPARK-18604][SQL] Make sure CollapseWindow returns the attributes in the same order. ## What changes were proposed in this pull request? The `CollapseWindow` optimizer rule changes the order of output attributes. This modifies the output of the plan, which the optimizer cannot do. This also breaks things like `collect()` for which we use a `RowEncoder` that assumes that the output attributes of the executed plan are equal to those outputted by the logical plan. ## How was this patch tested? I have updated an incorrect test in `CollapseWindowSuite`. Author: Herman van HovellCloses #16027 from hvanhovell/SPARK-18604. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/454b8049 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/454b8049 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/454b8049 Branch: refs/heads/master Commit: 454b8049916a0353772a0ea5cfe14b62cbd81df4 Parents: 8714162 Author: Herman van Hovell Authored: Mon Nov 28 02:56:26 2016 -0800 Committer: Herman van Hovell Committed: Mon Nov 28 02:56:26 2016 -0800 -- .../spark/sql/catalyst/optimizer/Optimizer.scala | 2 +- .../sql/catalyst/optimizer/CollapseWindowSuite.scala | 13 - 2 files changed, 9 insertions(+), 6 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/454b8049/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 6ba8b33..2679e02 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -545,7 +545,7 @@ object CollapseRepartition extends Rule[LogicalPlan] { object CollapseWindow extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan transformUp { case w @ Window(we1, ps1, os1, Window(we2, ps2, os2, grandChild)) if ps1 == ps2 && os1 == os2 => - w.copy(windowExpressions = we1 ++ we2, child = grandChild) + w.copy(windowExpressions = we2 ++ we1, child = grandChild) } } http://git-wip-us.apache.org/repos/asf/spark/blob/454b8049/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CollapseWindowSuite.scala -- diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CollapseWindowSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CollapseWindowSuite.scala index 797076e..3f7d1d9 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CollapseWindowSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CollapseWindowSuite.scala @@ -46,12 +46,15 @@ class CollapseWindowSuite extends PlanTest { .window(Seq(sum(b).as('sum_b)), partitionSpec1, orderSpec1) .window(Seq(avg(b).as('avg_b)), partitionSpec1, orderSpec1) -val optimized = Optimize.execute(query.analyze) +val analyzed = query.analyze +val optimized = Optimize.execute(analyzed) +assert(analyzed.output === optimized.output) + val correctAnswer = testRelation.window(Seq( -avg(b).as('avg_b), -sum(b).as('sum_b), -max(a).as('max_a), -min(a).as('min_a)), partitionSpec1, orderSpec1) + min(a).as('min_a), + max(a).as('max_a), + sum(b).as('sum_b), + avg(b).as('avg_b)), partitionSpec1, orderSpec1) comparePlans(optimized, correctAnswer) } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org