spark git commit: [SPARK-15759] [SQL] Fallback to non-codegen when fail to compile generated code

2016-06-10 Thread davies
Repository: spark
Updated Branches:
  refs/heads/branch-2.0 91dffcabd -> f0fa0a894


[SPARK-15759] [SQL] Fallback to non-codegen when fail to compile generated code

## What changes were proposed in this pull request?

In case of any bugs in whole-stage codegen, the generated code can't be 
compiled, we should fallback to non-codegen to make sure that query could run.

The batch mode of new parquet reader depends on codegen, can't be easily 
switched to non-batch mode, so we still use codegen for batched scan (for 
parquet). Because it only support primitive types and the number of columns is 
less than spark.sql.codegen.maxFields (100), it should not fail.

This could be configurable by `spark.sql.codegen.fallback`

## How was this patch tested?

Manual test it with buggy operator, it worked well.

Author: Davies Liu 

Closes #13501 from davies/codegen_fallback.

(cherry picked from commit 7504bc73f20fe0e6546a019ed91c3fd3804287ba)
Signed-off-by: Davies Liu 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f0fa0a89
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f0fa0a89
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f0fa0a89

Branch: refs/heads/branch-2.0
Commit: f0fa0a8946fb4bdf0f4697a8e389f49e98422871
Parents: 91dffca
Author: Davies Liu 
Authored: Fri Jun 10 21:12:06 2016 -0700
Committer: Davies Liu 
Committed: Fri Jun 10 21:12:15 2016 -0700

--
 .../org/apache/spark/sql/execution/ExistingRDD.scala |  5 -
 .../spark/sql/execution/WholeStageCodegenExec.scala  | 11 ++-
 .../scala/org/apache/spark/sql/internal/SQLConf.scala| 11 ++-
 3 files changed, 24 insertions(+), 3 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/f0fa0a89/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
index 9ab98fd1..ee72a70 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
@@ -234,7 +234,10 @@ private[sql] case class BatchedDataSourceScanExec(
   "scanTime" -> SQLMetrics.createTimingMetric(sparkContext, "scan time"))
 
   protected override def doExecute(): RDD[InternalRow] = {
-throw new UnsupportedOperationException
+// in the case of fallback, this batched scan should never fail because of:
+// 1) only primitive types are supported
+// 2) the number of columns should be smaller than 
spark.sql.codegen.maxFields
+WholeStageCodegenExec(this).execute()
   }
 
   override def simpleString: String = {

http://git-wip-us.apache.org/repos/asf/spark/blob/f0fa0a89/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
index e0d8e35..ac4c3aa 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
@@ -29,6 +29,7 @@ import 
org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, SortMergeJoi
 import org.apache.spark.sql.execution.metric.SQLMetrics
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types._
+import org.apache.spark.util.Utils
 
 /**
  * An interface for those physical operators that support codegen.
@@ -339,12 +340,20 @@ case class WholeStageCodegenExec(child: SparkPlan) 
extends UnaryExecNode with Co
   new CodeAndComment(CodeFormatter.stripExtraNewLines(source), 
ctx.getPlaceHolderToComments()))
 
 logDebug(s"\n${CodeFormatter.format(cleanedSource)}")
-CodeGenerator.compile(cleanedSource)
 (ctx, cleanedSource)
   }
 
   override def doExecute(): RDD[InternalRow] = {
 val (ctx, cleanedSource) = doCodeGen()
+// try to compile and fallback if it failed
+try {
+  CodeGenerator.compile(cleanedSource)
+} catch {
+  case e: Exception if !Utils.isTesting && 
sqlContext.conf.wholeStageFallback =>
+// We should already saw the error message
+logWarning(s"Whole-stage codegen disabled for this plan:\n 
$treeString")
+return child.execute()
+}
 val references = ctx.references.toArray
 
 val durationMs = longMetric("pipelineTime")

http://git-wip-us.apache.org/repos/asf/spark/blob/f0fa0a89/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
-

spark git commit: [SPARK-15759] [SQL] Fallback to non-codegen when fail to compile generated code

2016-06-10 Thread davies
Repository: spark
Updated Branches:
  refs/heads/master 468da03e2 -> 7504bc73f


[SPARK-15759] [SQL] Fallback to non-codegen when fail to compile generated code

## What changes were proposed in this pull request?

In case of any bugs in whole-stage codegen, the generated code can't be 
compiled, we should fallback to non-codegen to make sure that query could run.

The batch mode of new parquet reader depends on codegen, can't be easily 
switched to non-batch mode, so we still use codegen for batched scan (for 
parquet). Because it only support primitive types and the number of columns is 
less than spark.sql.codegen.maxFields (100), it should not fail.

This could be configurable by `spark.sql.codegen.fallback`

## How was this patch tested?

Manual test it with buggy operator, it worked well.

Author: Davies Liu 

Closes #13501 from davies/codegen_fallback.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7504bc73
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7504bc73
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7504bc73

Branch: refs/heads/master
Commit: 7504bc73f20fe0e6546a019ed91c3fd3804287ba
Parents: 468da03
Author: Davies Liu 
Authored: Fri Jun 10 21:12:06 2016 -0700
Committer: Davies Liu 
Committed: Fri Jun 10 21:12:06 2016 -0700

--
 .../org/apache/spark/sql/execution/ExistingRDD.scala |  5 -
 .../spark/sql/execution/WholeStageCodegenExec.scala  | 11 ++-
 .../scala/org/apache/spark/sql/internal/SQLConf.scala| 11 ++-
 3 files changed, 24 insertions(+), 3 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/7504bc73/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
index 9ab98fd1..ee72a70 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
@@ -234,7 +234,10 @@ private[sql] case class BatchedDataSourceScanExec(
   "scanTime" -> SQLMetrics.createTimingMetric(sparkContext, "scan time"))
 
   protected override def doExecute(): RDD[InternalRow] = {
-throw new UnsupportedOperationException
+// in the case of fallback, this batched scan should never fail because of:
+// 1) only primitive types are supported
+// 2) the number of columns should be smaller than 
spark.sql.codegen.maxFields
+WholeStageCodegenExec(this).execute()
   }
 
   override def simpleString: String = {

http://git-wip-us.apache.org/repos/asf/spark/blob/7504bc73/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
index e0d8e35..ac4c3aa 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
@@ -29,6 +29,7 @@ import 
org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, SortMergeJoi
 import org.apache.spark.sql.execution.metric.SQLMetrics
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types._
+import org.apache.spark.util.Utils
 
 /**
  * An interface for those physical operators that support codegen.
@@ -339,12 +340,20 @@ case class WholeStageCodegenExec(child: SparkPlan) 
extends UnaryExecNode with Co
   new CodeAndComment(CodeFormatter.stripExtraNewLines(source), 
ctx.getPlaceHolderToComments()))
 
 logDebug(s"\n${CodeFormatter.format(cleanedSource)}")
-CodeGenerator.compile(cleanedSource)
 (ctx, cleanedSource)
   }
 
   override def doExecute(): RDD[InternalRow] = {
 val (ctx, cleanedSource) = doCodeGen()
+// try to compile and fallback if it failed
+try {
+  CodeGenerator.compile(cleanedSource)
+} catch {
+  case e: Exception if !Utils.isTesting && 
sqlContext.conf.wholeStageFallback =>
+// We should already saw the error message
+logWarning(s"Whole-stage codegen disabled for this plan:\n 
$treeString")
+return child.execute()
+}
 val references = ctx.references.toArray
 
 val durationMs = longMetric("pipelineTime")

http://git-wip-us.apache.org/repos/asf/spark/blob/7504bc73/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
--
diff --git 
a/sql/core/src/main/scala/org/apach

spark git commit: Revert "[SPARK-15639][SQL] Try to push down filter at RowGroups level for parquet reader"

2016-06-10 Thread lian
Repository: spark
Updated Branches:
  refs/heads/branch-2.0 a08715c7a -> 91dffcabd


Revert "[SPARK-15639][SQL] Try to push down filter at RowGroups level for 
parquet reader"

This reverts commit 7d6bd1196410563bd1fccc10e7bff6e75b5c9f22.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/91dffcab
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/91dffcab
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/91dffcab

Branch: refs/heads/branch-2.0
Commit: 91dffcabdecd4ab651024c027cf9716664084e1e
Parents: a08715c
Author: Cheng Lian 
Authored: Fri Jun 10 20:45:27 2016 -0700
Committer: Cheng Lian 
Committed: Fri Jun 10 20:45:27 2016 -0700

--
 .../catalyst/expressions/namedExpressions.scala |  8 ---
 .../datasources/FileSourceStrategy.scala|  9 +--
 .../datasources/parquet/ParquetFileFormat.scala | 61 ++--
 3 files changed, 57 insertions(+), 21 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/91dffcab/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala
index c06a1ea..306a99d 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala
@@ -292,14 +292,6 @@ case class AttributeReference(
 }
   }
 
-  def withMetadata(newMetadata: Metadata): AttributeReference = {
-if (metadata == newMetadata) {
-  this
-} else {
-  AttributeReference(name, dataType, nullable, newMetadata)(exprId, 
qualifier, isGenerated)
-}
-  }
-
   override protected final def otherCopyArgs: Seq[AnyRef] = {
 exprId :: qualifier :: isGenerated :: Nil
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/91dffcab/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
index 7fc842f..13a86bf 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
@@ -84,14 +84,7 @@ private[sql] object FileSourceStrategy extends Strategy with 
Logging {
   logInfo(s"Pruning directories with: 
${partitionKeyFilters.mkString(",")}")
 
   val dataColumns =
-l.resolve(files.dataSchema, 
files.sparkSession.sessionState.analyzer.resolver).map { c =>
-  files.dataSchema.find(_.name == c.name).map { f =>
-c match {
-  case a: AttributeReference => a.withMetadata(f.metadata)
-  case _ => c
-}
-  }.getOrElse(c)
-}
+l.resolve(files.dataSchema, 
files.sparkSession.sessionState.analyzer.resolver)
 
   // Partition keys are not available in the statistics of the files.
   val dataFilters = 
normalizedFilters.filter(_.references.intersect(partitionSet).isEmpty)

http://git-wip-us.apache.org/repos/asf/spark/blob/91dffcab/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
index bc4a9de..3735c94 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
@@ -357,11 +357,6 @@ private[sql] class ParquetFileFormat
   val hadoopAttemptContext =
 new TaskAttemptContextImpl(broadcastedHadoopConf.value.value, 
attemptId)
 
-  // Try to push down filters when filter push-down is enabled.
-  // Notice: This push-down is RowGroups level, not individual records.
-  pushed.foreach {
-
ParquetInputFormat.setFilterPredicate(hadoopAttemptContext.getConfiguration, _)
-  }
   val parquetReader = if (enableVectorizedReader) {
 val vectorizedReader = new VectorizedParquetRecordReader()
 vectorizedReader.initialize(split, hadoopAttemptContext)
@@ -597,6 +592,62 @@ private[sql

spark git commit: [SPARK-15678] Add support to REFRESH data source paths

2016-06-10 Thread davies
Repository: spark
Updated Branches:
  refs/heads/master 8e7b56f3d -> 468da03e2


[SPARK-15678] Add support to REFRESH data source paths

## What changes were proposed in this pull request?

Spark currently incorrectly continues to use cached data even if the underlying 
data is overwritten.

Current behavior:
```scala
val dir = "/tmp/test"
sqlContext.range(1000).write.mode("overwrite").parquet(dir)
val df = sqlContext.read.parquet(dir).cache()
df.count() // outputs 1000
sqlContext.range(10).write.mode("overwrite").parquet(dir)
sqlContext.read.parquet(dir).count() // outputs 1000 < We are still using 
the cached dataset
```

This patch fixes this bug by adding support for `REFRESH path` that invalidates 
and refreshes all the cached data (and the associated metadata) for any 
dataframe that contains the given data source path.

Expected behavior:
```scala
val dir = "/tmp/test"
sqlContext.range(1000).write.mode("overwrite").parquet(dir)
val df = sqlContext.read.parquet(dir).cache()
df.count() // outputs 1000
sqlContext.range(10).write.mode("overwrite").parquet(dir)
spark.catalog.refreshResource(dir)
sqlContext.read.parquet(dir).count() // outputs 10 < We are not using the 
cached dataset
```

## How was this patch tested?

Unit tests for overwrites and appends in `ParquetQuerySuite` and 
`CachedTableSuite`.

Author: Sameer Agarwal 

Closes #13566 from sameeragarwal/refresh-path-2.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/468da03e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/468da03e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/468da03e

Branch: refs/heads/master
Commit: 468da03e23a01e02718608f05d778386cbb8416b
Parents: 8e7b56f
Author: Sameer Agarwal 
Authored: Fri Jun 10 20:43:18 2016 -0700
Committer: Davies Liu 
Committed: Fri Jun 10 20:43:18 2016 -0700

--
 .../apache/spark/sql/catalyst/parser/SqlBase.g4 |  1 +
 .../org/apache/spark/sql/catalog/Catalog.scala  |  7 +++
 .../spark/sql/execution/CacheManager.scala  | 51 +++-
 .../spark/sql/execution/SparkSqlParser.scala|  9 +++-
 .../spark/sql/execution/datasources/ddl.scala   |  9 
 .../apache/spark/sql/internal/CatalogImpl.scala | 10 
 .../datasources/parquet/ParquetQuerySuite.scala | 28 +++
 .../spark/sql/hive/CachedTableSuite.scala   | 45 +
 8 files changed, 158 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/468da03e/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
--
diff --git 
a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 
b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
index d102559..044f910 100644
--- 
a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
+++ 
b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
@@ -113,6 +113,7 @@ statement
 | (DESC | DESCRIBE) option=(EXTENDED | FORMATTED)?
 tableIdentifier partitionSpec? describeColName?
#describeTable
 | REFRESH TABLE tableIdentifier
#refreshTable
+| REFRESH .*?  
#refreshResource
 | CACHE LAZY? TABLE identifier (AS? query)?
#cacheTable
 | UNCACHE TABLE identifier 
#uncacheTable
 | CLEAR CACHE  
#clearCache

http://git-wip-us.apache.org/repos/asf/spark/blob/468da03e/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala
--
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala
index 6ddb1a7..083a63c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala
@@ -226,4 +226,11 @@ abstract class Catalog {
*/
   def refreshTable(tableName: String): Unit
 
+  /**
+   * Invalidate and refresh all the cached data (and the associated metadata) 
for any dataframe that
+   * contains the given data source path.
+   *
+   * @since 2.0.0
+   */
+  def refreshByPath(path: String): Unit
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/468da03e/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scal

spark git commit: [SPARK-15678] Add support to REFRESH data source paths

2016-06-10 Thread davies
Repository: spark
Updated Branches:
  refs/heads/branch-2.0 798825c09 -> a08715c7a


[SPARK-15678] Add support to REFRESH data source paths

## What changes were proposed in this pull request?

Spark currently incorrectly continues to use cached data even if the underlying 
data is overwritten.

Current behavior:
```scala
val dir = "/tmp/test"
sqlContext.range(1000).write.mode("overwrite").parquet(dir)
val df = sqlContext.read.parquet(dir).cache()
df.count() // outputs 1000
sqlContext.range(10).write.mode("overwrite").parquet(dir)
sqlContext.read.parquet(dir).count() // outputs 1000 < We are still using 
the cached dataset
```

This patch fixes this bug by adding support for `REFRESH path` that invalidates 
and refreshes all the cached data (and the associated metadata) for any 
dataframe that contains the given data source path.

Expected behavior:
```scala
val dir = "/tmp/test"
sqlContext.range(1000).write.mode("overwrite").parquet(dir)
val df = sqlContext.read.parquet(dir).cache()
df.count() // outputs 1000
sqlContext.range(10).write.mode("overwrite").parquet(dir)
spark.catalog.refreshResource(dir)
sqlContext.read.parquet(dir).count() // outputs 10 < We are not using the 
cached dataset
```

## How was this patch tested?

Unit tests for overwrites and appends in `ParquetQuerySuite` and 
`CachedTableSuite`.

Author: Sameer Agarwal 

Closes #13566 from sameeragarwal/refresh-path-2.

(cherry picked from commit 468da03e23a01e02718608f05d778386cbb8416b)
Signed-off-by: Davies Liu 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a08715c7
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a08715c7
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a08715c7

Branch: refs/heads/branch-2.0
Commit: a08715c7a79ce1953b8d64a9cf0ec1c513d56eec
Parents: 798825c
Author: Sameer Agarwal 
Authored: Fri Jun 10 20:43:18 2016 -0700
Committer: Davies Liu 
Committed: Fri Jun 10 20:43:26 2016 -0700

--
 .../apache/spark/sql/catalyst/parser/SqlBase.g4 |  1 +
 .../org/apache/spark/sql/catalog/Catalog.scala  |  7 +++
 .../spark/sql/execution/CacheManager.scala  | 51 +++-
 .../spark/sql/execution/SparkSqlParser.scala|  9 +++-
 .../spark/sql/execution/datasources/ddl.scala   |  9 
 .../apache/spark/sql/internal/CatalogImpl.scala | 10 
 .../datasources/parquet/ParquetQuerySuite.scala | 28 +++
 .../spark/sql/hive/CachedTableSuite.scala   | 45 +
 8 files changed, 158 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/a08715c7/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
--
diff --git 
a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 
b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
index d102559..044f910 100644
--- 
a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
+++ 
b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
@@ -113,6 +113,7 @@ statement
 | (DESC | DESCRIBE) option=(EXTENDED | FORMATTED)?
 tableIdentifier partitionSpec? describeColName?
#describeTable
 | REFRESH TABLE tableIdentifier
#refreshTable
+| REFRESH .*?  
#refreshResource
 | CACHE LAZY? TABLE identifier (AS? query)?
#cacheTable
 | UNCACHE TABLE identifier 
#uncacheTable
 | CLEAR CACHE  
#clearCache

http://git-wip-us.apache.org/repos/asf/spark/blob/a08715c7/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala
--
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala
index 6ddb1a7..083a63c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala
@@ -226,4 +226,11 @@ abstract class Catalog {
*/
   def refreshTable(tableName: String): Unit
 
+  /**
+   * Invalidate and refresh all the cached data (and the associated metadata) 
for any dataframe that
+   * contains the given data source path.
+   *
+   * @since 2.0.0
+   */
+  def refreshByPath(path: String): Unit
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/a08715c7/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql

spark git commit: Revert "[SPARK-15639][SQL] Try to push down filter at RowGroups level for parquet reader"

2016-06-10 Thread lian
Repository: spark
Updated Branches:
  refs/heads/master 99f3c8277 -> 8e7b56f3d


Revert "[SPARK-15639][SQL] Try to push down filter at RowGroups level for 
parquet reader"

This reverts commit bba5d7999f7b3ae9d816ea552ba9378fea1615a6.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8e7b56f3
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8e7b56f3
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8e7b56f3

Branch: refs/heads/master
Commit: 8e7b56f3d4917692d3ff44d91aa264738a6fc2ed
Parents: 99f3c82
Author: Cheng Lian 
Authored: Fri Jun 10 20:41:48 2016 -0700
Committer: Cheng Lian 
Committed: Fri Jun 10 20:41:48 2016 -0700

--
 .../catalyst/expressions/namedExpressions.scala |  8 ---
 .../datasources/FileSourceStrategy.scala|  9 +--
 .../datasources/parquet/ParquetFileFormat.scala | 61 ++--
 3 files changed, 57 insertions(+), 21 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/8e7b56f3/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala
index c06a1ea..306a99d 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala
@@ -292,14 +292,6 @@ case class AttributeReference(
 }
   }
 
-  def withMetadata(newMetadata: Metadata): AttributeReference = {
-if (metadata == newMetadata) {
-  this
-} else {
-  AttributeReference(name, dataType, nullable, newMetadata)(exprId, 
qualifier, isGenerated)
-}
-  }
-
   override protected final def otherCopyArgs: Seq[AnyRef] = {
 exprId :: qualifier :: isGenerated :: Nil
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/8e7b56f3/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
index 7fc842f..13a86bf 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
@@ -84,14 +84,7 @@ private[sql] object FileSourceStrategy extends Strategy with 
Logging {
   logInfo(s"Pruning directories with: 
${partitionKeyFilters.mkString(",")}")
 
   val dataColumns =
-l.resolve(files.dataSchema, 
files.sparkSession.sessionState.analyzer.resolver).map { c =>
-  files.dataSchema.find(_.name == c.name).map { f =>
-c match {
-  case a: AttributeReference => a.withMetadata(f.metadata)
-  case _ => c
-}
-  }.getOrElse(c)
-}
+l.resolve(files.dataSchema, 
files.sparkSession.sessionState.analyzer.resolver)
 
   // Partition keys are not available in the statistics of the files.
   val dataFilters = 
normalizedFilters.filter(_.references.intersect(partitionSet).isEmpty)

http://git-wip-us.apache.org/repos/asf/spark/blob/8e7b56f3/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
index bc4a9de..3735c94 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
@@ -357,11 +357,6 @@ private[sql] class ParquetFileFormat
   val hadoopAttemptContext =
 new TaskAttemptContextImpl(broadcastedHadoopConf.value.value, 
attemptId)
 
-  // Try to push down filters when filter push-down is enabled.
-  // Notice: This push-down is RowGroups level, not individual records.
-  pushed.foreach {
-
ParquetInputFormat.setFilterPredicate(hadoopAttemptContext.getConfiguration, _)
-  }
   val parquetReader = if (enableVectorizedReader) {
 val vectorizedReader = new VectorizedParquetRecordReader()
 vectorizedReader.initialize(split, hadoopAttemptContext)
@@ -597,6 +592,62 @@ private[sql] object

spark git commit: [SPARK-14615][ML][FOLLOWUP] Fix Python examples to use the new ML Vector and Matrix APIs in the ML pipeline based algorithms

2016-06-10 Thread jkbradley
Repository: spark
Updated Branches:
  refs/heads/branch-2.0 7d6bd1196 -> 798825c09


[SPARK-14615][ML][FOLLOWUP] Fix Python examples to use the new ML Vector and 
Matrix APIs in the ML pipeline based algorithms

## What changes were proposed in this pull request?

This PR fixes Python examples to use the new ML Vector and Matrix APIs in the 
ML pipeline based algorithms.

I firstly executed this shell command, `grep -r "from pyspark.mllib" .` and 
then executed them all.
Some of tests in `ml` produced the error messages as below:

```
pyspark.sql.utils.IllegalArgumentException: u'requirement failed: Input type 
must be VectorUDT but got org.apache.spark.mllib.linalg.VectorUDTf71b0bce.'
```

So, I fixed them to use new ones just identically with some Python tests fixed 
in https://github.com/apache/spark/pull/12627

## How was this patch tested?

Manually tested for all the examples listed by `grep -r "from pyspark.mllib" .`.

Author: hyukjinkwon 

Closes #13393 from HyukjinKwon/SPARK-14615.

(cherry picked from commit 99f3c82776fe5ea4f89a9965a288c7447585dc2c)
Signed-off-by: Joseph K. Bradley 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/798825c0
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/798825c0
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/798825c0

Branch: refs/heads/branch-2.0
Commit: 798825c09ba55dca449bde3f00ff2aeafd6b05b7
Parents: 7d6bd11
Author: hyukjinkwon 
Authored: Fri Jun 10 18:29:26 2016 -0700
Committer: Joseph K. Bradley 
Committed: Fri Jun 10 18:29:37 2016 -0700

--
 .../main/python/ml/aft_survival_regression.py|  2 +-
 .../src/main/python/ml/chisq_selector_example.py |  2 +-
 examples/src/main/python/ml/dct_example.py   |  2 +-
 .../python/ml/elementwise_product_example.py |  2 +-
 .../ml/estimator_transformer_param_example.py|  2 +-
 examples/src/main/python/ml/pca_example.py   |  2 +-
 .../python/ml/polynomial_expansion_example.py|  2 +-
 .../src/main/python/ml/simple_params_example.py  | 19 +--
 .../main/python/ml/vector_assembler_example.py   |  2 +-
 .../src/main/python/ml/vector_slicer_example.py  |  2 +-
 10 files changed, 18 insertions(+), 19 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/798825c0/examples/src/main/python/ml/aft_survival_regression.py
--
diff --git a/examples/src/main/python/ml/aft_survival_regression.py 
b/examples/src/main/python/ml/aft_survival_regression.py
index 9879679..060f017 100644
--- a/examples/src/main/python/ml/aft_survival_regression.py
+++ b/examples/src/main/python/ml/aft_survival_regression.py
@@ -19,7 +19,7 @@ from __future__ import print_function
 
 # $example on$
 from pyspark.ml.regression import AFTSurvivalRegression
-from pyspark.mllib.linalg import Vectors
+from pyspark.ml.linalg import Vectors
 # $example off$
 from pyspark.sql import SparkSession
 

http://git-wip-us.apache.org/repos/asf/spark/blob/798825c0/examples/src/main/python/ml/chisq_selector_example.py
--
diff --git a/examples/src/main/python/ml/chisq_selector_example.py 
b/examples/src/main/python/ml/chisq_selector_example.py
index 8bafb94..5e19ef1 100644
--- a/examples/src/main/python/ml/chisq_selector_example.py
+++ b/examples/src/main/python/ml/chisq_selector_example.py
@@ -20,7 +20,7 @@ from __future__ import print_function
 from pyspark.sql import SparkSession
 # $example on$
 from pyspark.ml.feature import ChiSqSelector
-from pyspark.mllib.linalg import Vectors
+from pyspark.ml.linalg import Vectors
 # $example off$
 
 if __name__ == "__main__":

http://git-wip-us.apache.org/repos/asf/spark/blob/798825c0/examples/src/main/python/ml/dct_example.py
--
diff --git a/examples/src/main/python/ml/dct_example.py 
b/examples/src/main/python/ml/dct_example.py
index e36fcde..a4f25df 100644
--- a/examples/src/main/python/ml/dct_example.py
+++ b/examples/src/main/python/ml/dct_example.py
@@ -19,7 +19,7 @@ from __future__ import print_function
 
 # $example on$
 from pyspark.ml.feature import DCT
-from pyspark.mllib.linalg import Vectors
+from pyspark.ml.linalg import Vectors
 # $example off$
 from pyspark.sql import SparkSession
 

http://git-wip-us.apache.org/repos/asf/spark/blob/798825c0/examples/src/main/python/ml/elementwise_product_example.py
--
diff --git a/examples/src/main/python/ml/elementwise_product_example.py 
b/examples/src/main/python/ml/elementwise_product_example.py
index 41727ed..598deae 100644
--- a/examples/src/main/python/ml/elementwise_product_example.py
+++ b/examples/src/main/python/ml/elementwise_product_exa

spark git commit: [SPARK-14615][ML][FOLLOWUP] Fix Python examples to use the new ML Vector and Matrix APIs in the ML pipeline based algorithms

2016-06-10 Thread jkbradley
Repository: spark
Updated Branches:
  refs/heads/master bba5d7999 -> 99f3c8277


[SPARK-14615][ML][FOLLOWUP] Fix Python examples to use the new ML Vector and 
Matrix APIs in the ML pipeline based algorithms

## What changes were proposed in this pull request?

This PR fixes Python examples to use the new ML Vector and Matrix APIs in the 
ML pipeline based algorithms.

I firstly executed this shell command, `grep -r "from pyspark.mllib" .` and 
then executed them all.
Some of tests in `ml` produced the error messages as below:

```
pyspark.sql.utils.IllegalArgumentException: u'requirement failed: Input type 
must be VectorUDT but got org.apache.spark.mllib.linalg.VectorUDTf71b0bce.'
```

So, I fixed them to use new ones just identically with some Python tests fixed 
in https://github.com/apache/spark/pull/12627

## How was this patch tested?

Manually tested for all the examples listed by `grep -r "from pyspark.mllib" .`.

Author: hyukjinkwon 

Closes #13393 from HyukjinKwon/SPARK-14615.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/99f3c827
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/99f3c827
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/99f3c827

Branch: refs/heads/master
Commit: 99f3c82776fe5ea4f89a9965a288c7447585dc2c
Parents: bba5d79
Author: hyukjinkwon 
Authored: Fri Jun 10 18:29:26 2016 -0700
Committer: Joseph K. Bradley 
Committed: Fri Jun 10 18:29:26 2016 -0700

--
 .../main/python/ml/aft_survival_regression.py|  2 +-
 .../src/main/python/ml/chisq_selector_example.py |  2 +-
 examples/src/main/python/ml/dct_example.py   |  2 +-
 .../python/ml/elementwise_product_example.py |  2 +-
 .../ml/estimator_transformer_param_example.py|  2 +-
 examples/src/main/python/ml/pca_example.py   |  2 +-
 .../python/ml/polynomial_expansion_example.py|  2 +-
 .../src/main/python/ml/simple_params_example.py  | 19 +--
 .../main/python/ml/vector_assembler_example.py   |  2 +-
 .../src/main/python/ml/vector_slicer_example.py  |  2 +-
 10 files changed, 18 insertions(+), 19 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/99f3c827/examples/src/main/python/ml/aft_survival_regression.py
--
diff --git a/examples/src/main/python/ml/aft_survival_regression.py 
b/examples/src/main/python/ml/aft_survival_regression.py
index 9879679..060f017 100644
--- a/examples/src/main/python/ml/aft_survival_regression.py
+++ b/examples/src/main/python/ml/aft_survival_regression.py
@@ -19,7 +19,7 @@ from __future__ import print_function
 
 # $example on$
 from pyspark.ml.regression import AFTSurvivalRegression
-from pyspark.mllib.linalg import Vectors
+from pyspark.ml.linalg import Vectors
 # $example off$
 from pyspark.sql import SparkSession
 

http://git-wip-us.apache.org/repos/asf/spark/blob/99f3c827/examples/src/main/python/ml/chisq_selector_example.py
--
diff --git a/examples/src/main/python/ml/chisq_selector_example.py 
b/examples/src/main/python/ml/chisq_selector_example.py
index 8bafb94..5e19ef1 100644
--- a/examples/src/main/python/ml/chisq_selector_example.py
+++ b/examples/src/main/python/ml/chisq_selector_example.py
@@ -20,7 +20,7 @@ from __future__ import print_function
 from pyspark.sql import SparkSession
 # $example on$
 from pyspark.ml.feature import ChiSqSelector
-from pyspark.mllib.linalg import Vectors
+from pyspark.ml.linalg import Vectors
 # $example off$
 
 if __name__ == "__main__":

http://git-wip-us.apache.org/repos/asf/spark/blob/99f3c827/examples/src/main/python/ml/dct_example.py
--
diff --git a/examples/src/main/python/ml/dct_example.py 
b/examples/src/main/python/ml/dct_example.py
index e36fcde..a4f25df 100644
--- a/examples/src/main/python/ml/dct_example.py
+++ b/examples/src/main/python/ml/dct_example.py
@@ -19,7 +19,7 @@ from __future__ import print_function
 
 # $example on$
 from pyspark.ml.feature import DCT
-from pyspark.mllib.linalg import Vectors
+from pyspark.ml.linalg import Vectors
 # $example off$
 from pyspark.sql import SparkSession
 

http://git-wip-us.apache.org/repos/asf/spark/blob/99f3c827/examples/src/main/python/ml/elementwise_product_example.py
--
diff --git a/examples/src/main/python/ml/elementwise_product_example.py 
b/examples/src/main/python/ml/elementwise_product_example.py
index 41727ed..598deae 100644
--- a/examples/src/main/python/ml/elementwise_product_example.py
+++ b/examples/src/main/python/ml/elementwise_product_example.py
@@ -19,7 +19,7 @@ from __future__ import print_function
 
 # $example on$
 from pyspark.ml.feature impor

spark git commit: [SPARK-15639][SQL] Try to push down filter at RowGroups level for parquet reader

2016-06-10 Thread lian
Repository: spark
Updated Branches:
  refs/heads/branch-2.0 0a450cfff -> 7d6bd1196


[SPARK-15639][SQL] Try to push down filter at RowGroups level for parquet reader

## What changes were proposed in this pull request?

The base class `SpecificParquetRecordReaderBase` used for vectorized parquet 
reader will try to get pushed-down filters from the given configuration. This 
pushed-down filters are used for RowGroups-level filtering. However, we don't 
set up the filters to push down into the configuration. In other words, the 
filters are not actually pushed down to do RowGroups-level filtering. This 
patch is to fix this and tries to set up the filters for pushing down to 
configuration for the reader.

## How was this patch tested?
Existing tests should be passed.

Author: Liang-Chi Hsieh 

Closes #13371 from viirya/vectorized-reader-push-down-filter.

(cherry picked from commit bba5d7999f7b3ae9d816ea552ba9378fea1615a6)
Signed-off-by: Cheng Lian 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7d6bd119
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7d6bd119
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7d6bd119

Branch: refs/heads/branch-2.0
Commit: 7d6bd1196410563bd1fccc10e7bff6e75b5c9f22
Parents: 0a450cf
Author: Liang-Chi Hsieh 
Authored: Fri Jun 10 18:23:59 2016 -0700
Committer: Cheng Lian 
Committed: Fri Jun 10 18:24:06 2016 -0700

--
 .../catalyst/expressions/namedExpressions.scala |  8 +++
 .../datasources/FileSourceStrategy.scala|  9 ++-
 .../datasources/parquet/ParquetFileFormat.scala | 61 ++--
 3 files changed, 21 insertions(+), 57 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/7d6bd119/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala
index 306a99d..c06a1ea 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala
@@ -292,6 +292,14 @@ case class AttributeReference(
 }
   }
 
+  def withMetadata(newMetadata: Metadata): AttributeReference = {
+if (metadata == newMetadata) {
+  this
+} else {
+  AttributeReference(name, dataType, nullable, newMetadata)(exprId, 
qualifier, isGenerated)
+}
+  }
+
   override protected final def otherCopyArgs: Seq[AnyRef] = {
 exprId :: qualifier :: isGenerated :: Nil
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/7d6bd119/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
index 13a86bf..7fc842f 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
@@ -84,7 +84,14 @@ private[sql] object FileSourceStrategy extends Strategy with 
Logging {
   logInfo(s"Pruning directories with: 
${partitionKeyFilters.mkString(",")}")
 
   val dataColumns =
-l.resolve(files.dataSchema, 
files.sparkSession.sessionState.analyzer.resolver)
+l.resolve(files.dataSchema, 
files.sparkSession.sessionState.analyzer.resolver).map { c =>
+  files.dataSchema.find(_.name == c.name).map { f =>
+c match {
+  case a: AttributeReference => a.withMetadata(f.metadata)
+  case _ => c
+}
+  }.getOrElse(c)
+}
 
   // Partition keys are not available in the statistics of the files.
   val dataFilters = 
normalizedFilters.filter(_.references.intersect(partitionSet).isEmpty)

http://git-wip-us.apache.org/repos/asf/spark/blob/7d6bd119/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
index 3735c94..bc4a9de 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
+++ 
b/sql/core/src/main/scala/org/apache/spar

spark git commit: [SPARK-15639][SQL] Try to push down filter at RowGroups level for parquet reader

2016-06-10 Thread lian
Repository: spark
Updated Branches:
  refs/heads/master 54f758b5f -> bba5d7999


[SPARK-15639][SQL] Try to push down filter at RowGroups level for parquet reader

## What changes were proposed in this pull request?

The base class `SpecificParquetRecordReaderBase` used for vectorized parquet 
reader will try to get pushed-down filters from the given configuration. This 
pushed-down filters are used for RowGroups-level filtering. However, we don't 
set up the filters to push down into the configuration. In other words, the 
filters are not actually pushed down to do RowGroups-level filtering. This 
patch is to fix this and tries to set up the filters for pushing down to 
configuration for the reader.

## How was this patch tested?
Existing tests should be passed.

Author: Liang-Chi Hsieh 

Closes #13371 from viirya/vectorized-reader-push-down-filter.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/bba5d799
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/bba5d799
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/bba5d799

Branch: refs/heads/master
Commit: bba5d7999f7b3ae9d816ea552ba9378fea1615a6
Parents: 54f758b
Author: Liang-Chi Hsieh 
Authored: Fri Jun 10 18:23:59 2016 -0700
Committer: Cheng Lian 
Committed: Fri Jun 10 18:23:59 2016 -0700

--
 .../catalyst/expressions/namedExpressions.scala |  8 +++
 .../datasources/FileSourceStrategy.scala|  9 ++-
 .../datasources/parquet/ParquetFileFormat.scala | 61 ++--
 3 files changed, 21 insertions(+), 57 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/bba5d799/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala
index 306a99d..c06a1ea 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala
@@ -292,6 +292,14 @@ case class AttributeReference(
 }
   }
 
+  def withMetadata(newMetadata: Metadata): AttributeReference = {
+if (metadata == newMetadata) {
+  this
+} else {
+  AttributeReference(name, dataType, nullable, newMetadata)(exprId, 
qualifier, isGenerated)
+}
+  }
+
   override protected final def otherCopyArgs: Seq[AnyRef] = {
 exprId :: qualifier :: isGenerated :: Nil
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/bba5d799/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
index 13a86bf..7fc842f 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
@@ -84,7 +84,14 @@ private[sql] object FileSourceStrategy extends Strategy with 
Logging {
   logInfo(s"Pruning directories with: 
${partitionKeyFilters.mkString(",")}")
 
   val dataColumns =
-l.resolve(files.dataSchema, 
files.sparkSession.sessionState.analyzer.resolver)
+l.resolve(files.dataSchema, 
files.sparkSession.sessionState.analyzer.resolver).map { c =>
+  files.dataSchema.find(_.name == c.name).map { f =>
+c match {
+  case a: AttributeReference => a.withMetadata(f.metadata)
+  case _ => c
+}
+  }.getOrElse(c)
+}
 
   // Partition keys are not available in the statistics of the files.
   val dataFilters = 
normalizedFilters.filter(_.references.intersect(partitionSet).isEmpty)

http://git-wip-us.apache.org/repos/asf/spark/blob/bba5d799/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
index 3735c94..bc4a9de 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
@@ -357,6 +357,11 @@ private[sql] class Parqu

spark git commit: [SPARK-15884][SPARKR][SQL] Overriding stringArgs in MapPartitionsInR

2016-06-10 Thread lian
Repository: spark
Updated Branches:
  refs/heads/branch-2.0 f41f433b1 -> 0a450cfff


[SPARK-15884][SPARKR][SQL] Overriding stringArgs in MapPartitionsInR

## What changes were proposed in this pull request?

As discussed in https://github.com/apache/spark/pull/12836
we need to override stringArgs method in MapPartitionsInR in order to avoid too 
large strings generated by "stringArgs" method based on the input arguments.

In this case exclude some of the input arguments: serialized R objects.

## How was this patch tested?
Existing test cases

Author: Narine Kokhlikyan 

Closes #13610 from NarineK/dapply_MapPartitionsInR_stringArgs.

(cherry picked from commit 54f758b5fc60ecb0da6b191939a72ef5829be38c)
Signed-off-by: Cheng Lian 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0a450cff
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0a450cff
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0a450cff

Branch: refs/heads/branch-2.0
Commit: 0a450cfffada67f841795a09af3bf6320343b358
Parents: f41f433
Author: Narine Kokhlikyan 
Authored: Fri Jun 10 17:17:47 2016 -0700
Committer: Cheng Lian 
Committed: Fri Jun 10 17:17:57 2016 -0700

--
 .../org/apache/spark/sql/catalyst/plans/logical/object.scala  | 3 +++
 1 file changed, 3 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/0a450cff/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala
index 55d8adf..78e8822 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala
@@ -141,6 +141,9 @@ case class MapPartitionsInR(
 outputObjAttr: Attribute,
 child: LogicalPlan) extends ObjectConsumer with ObjectProducer {
   override lazy val schema = outputSchema
+
+  override protected def stringArgs: Iterator[Any] = Iterator(inputSchema, 
outputSchema,
+outputObjAttr, child)
 }
 
 object MapElements {


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-15884][SPARKR][SQL] Overriding stringArgs in MapPartitionsInR

2016-06-10 Thread lian
Repository: spark
Updated Branches:
  refs/heads/master 2022afe57 -> 54f758b5f


[SPARK-15884][SPARKR][SQL] Overriding stringArgs in MapPartitionsInR

## What changes were proposed in this pull request?

As discussed in https://github.com/apache/spark/pull/12836
we need to override stringArgs method in MapPartitionsInR in order to avoid too 
large strings generated by "stringArgs" method based on the input arguments.

In this case exclude some of the input arguments: serialized R objects.

## How was this patch tested?
Existing test cases

Author: Narine Kokhlikyan 

Closes #13610 from NarineK/dapply_MapPartitionsInR_stringArgs.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/54f758b5
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/54f758b5
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/54f758b5

Branch: refs/heads/master
Commit: 54f758b5fc60ecb0da6b191939a72ef5829be38c
Parents: 2022afe
Author: Narine Kokhlikyan 
Authored: Fri Jun 10 17:17:47 2016 -0700
Committer: Cheng Lian 
Committed: Fri Jun 10 17:17:47 2016 -0700

--
 .../org/apache/spark/sql/catalyst/plans/logical/object.scala  | 3 +++
 1 file changed, 3 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/54f758b5/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala
index 55d8adf..78e8822 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala
@@ -141,6 +141,9 @@ case class MapPartitionsInR(
 outputObjAttr: Attribute,
 child: LogicalPlan) extends ObjectConsumer with ObjectProducer {
   override lazy val schema = outputSchema
+
+  override protected def stringArgs: Iterator[Any] = Iterator(inputSchema, 
outputSchema,
+outputObjAttr, child)
 }
 
 object MapElements {


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-15773][CORE][EXAMPLE] Avoid creating local variable `sc` in examples if possible

2016-06-10 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/branch-2.0 e6ebb547b -> f41f433b1


[SPARK-15773][CORE][EXAMPLE] Avoid creating local variable `sc` in examples if 
possible

## What changes were proposed in this pull request?

Instead of using local variable `sc` like the following example, this PR uses 
`spark.sparkContext`. This makes examples more concise, and also fixes some 
misleading, i.e., creating SparkContext from SparkSession.
```
-println("Creating SparkContext")
-val sc = spark.sparkContext
-
 println("Writing local file to DFS")
 val dfsFilename = dfsDirPath + "/dfs_read_write_test"
-val fileRDD = sc.parallelize(fileContents)
+val fileRDD = spark.sparkContext.parallelize(fileContents)
```

This will change 12 files (+30 lines, -52 lines).

## How was this patch tested?

Manual.

Author: Dongjoon Hyun 

Closes #13520 from dongjoon-hyun/SPARK-15773.

(cherry picked from commit 2022afe57dbf8cb0c9909399962c4a3649e0601c)
Signed-off-by: Reynold Xin 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f41f433b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f41f433b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f41f433b

Branch: refs/heads/branch-2.0
Commit: f41f433b101d5eac5bdd3a033e15f69e2215d30d
Parents: e6ebb54
Author: Dongjoon Hyun 
Authored: Fri Jun 10 15:40:29 2016 -0700
Committer: Reynold Xin 
Committed: Fri Jun 10 15:40:35 2016 -0700

--
 examples/src/main/python/pi.py  |  4 +---
 examples/src/main/python/transitive_closure.py  |  4 +---
 .../apache/spark/examples/DFSReadWriteTest.scala|  7 ++-
 .../spark/examples/ExceptionHandlingTest.scala  |  3 +--
 .../org/apache/spark/examples/GroupByTest.scala | 14 ++
 .../apache/spark/examples/MultiBroadcastTest.scala  |  8 +++-
 .../spark/examples/SimpleSkewedGroupByTest.scala| 16 +++-
 .../apache/spark/examples/SkewedGroupByTest.scala   | 13 +
 .../scala/org/apache/spark/examples/SparkLR.scala   |  4 +---
 .../scala/org/apache/spark/examples/SparkPi.scala   |  3 +--
 .../scala/org/apache/spark/examples/SparkTC.scala   |  3 +--
 .../spark/examples/sql/hive/HiveFromSpark.scala |  3 +--
 12 files changed, 30 insertions(+), 52 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/f41f433b/examples/src/main/python/pi.py
--
diff --git a/examples/src/main/python/pi.py b/examples/src/main/python/pi.py
index b39d710..e3f0c4a 100755
--- a/examples/src/main/python/pi.py
+++ b/examples/src/main/python/pi.py
@@ -32,8 +32,6 @@ if __name__ == "__main__":
 .appName("PythonPi")\
 .getOrCreate()
 
-sc = spark.sparkContext
-
 partitions = int(sys.argv[1]) if len(sys.argv) > 1 else 2
 n = 10 * partitions
 
@@ -42,7 +40,7 @@ if __name__ == "__main__":
 y = random() * 2 - 1
 return 1 if x ** 2 + y ** 2 < 1 else 0
 
-count = sc.parallelize(range(1, n + 1), partitions).map(f).reduce(add)
+count = spark.sparkContext.parallelize(range(1, n + 1), 
partitions).map(f).reduce(add)
 print("Pi is roughly %f" % (4.0 * count / n))
 
 spark.stop()

http://git-wip-us.apache.org/repos/asf/spark/blob/f41f433b/examples/src/main/python/transitive_closure.py
--
diff --git a/examples/src/main/python/transitive_closure.py 
b/examples/src/main/python/transitive_closure.py
index d88ea94..49551d4 100755
--- a/examples/src/main/python/transitive_closure.py
+++ b/examples/src/main/python/transitive_closure.py
@@ -46,10 +46,8 @@ if __name__ == "__main__":
 .appName("PythonTransitiveClosure")\
 .getOrCreate()
 
-sc = spark.sparkContext
-
 partitions = int(sys.argv[1]) if len(sys.argv) > 1 else 2
-tc = sc.parallelize(generateGraph(), partitions).cache()
+tc = spark.sparkContext.parallelize(generateGraph(), partitions).cache()
 
 # Linear transitive closure: each round grows paths by one edge,
 # by joining the graph's edges with the already-discovered paths.

http://git-wip-us.apache.org/repos/asf/spark/blob/f41f433b/examples/src/main/scala/org/apache/spark/examples/DFSReadWriteTest.scala
--
diff --git 
a/examples/src/main/scala/org/apache/spark/examples/DFSReadWriteTest.scala 
b/examples/src/main/scala/org/apache/spark/examples/DFSReadWriteTest.scala
index 4b5e36c..3bff7ce 100644
--- a/examples/src/main/scala/org/apache/spark/examples/DFSReadWriteTest.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/DFSReadWriteTest.scala
@@ -107,16 +107,13 @@ object DFSReadWriteTest {
   .appName("DFS Read Write Test")
   .getOrCreate()
 
-pri

spark git commit: [SPARK-15773][CORE][EXAMPLE] Avoid creating local variable `sc` in examples if possible

2016-06-10 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/master 127a6678d -> 2022afe57


[SPARK-15773][CORE][EXAMPLE] Avoid creating local variable `sc` in examples if 
possible

## What changes were proposed in this pull request?

Instead of using local variable `sc` like the following example, this PR uses 
`spark.sparkContext`. This makes examples more concise, and also fixes some 
misleading, i.e., creating SparkContext from SparkSession.
```
-println("Creating SparkContext")
-val sc = spark.sparkContext
-
 println("Writing local file to DFS")
 val dfsFilename = dfsDirPath + "/dfs_read_write_test"
-val fileRDD = sc.parallelize(fileContents)
+val fileRDD = spark.sparkContext.parallelize(fileContents)
```

This will change 12 files (+30 lines, -52 lines).

## How was this patch tested?

Manual.

Author: Dongjoon Hyun 

Closes #13520 from dongjoon-hyun/SPARK-15773.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2022afe5
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2022afe5
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2022afe5

Branch: refs/heads/master
Commit: 2022afe57dbf8cb0c9909399962c4a3649e0601c
Parents: 127a667
Author: Dongjoon Hyun 
Authored: Fri Jun 10 15:40:29 2016 -0700
Committer: Reynold Xin 
Committed: Fri Jun 10 15:40:29 2016 -0700

--
 examples/src/main/python/pi.py  |  4 +---
 examples/src/main/python/transitive_closure.py  |  4 +---
 .../apache/spark/examples/DFSReadWriteTest.scala|  7 ++-
 .../spark/examples/ExceptionHandlingTest.scala  |  3 +--
 .../org/apache/spark/examples/GroupByTest.scala | 14 ++
 .../apache/spark/examples/MultiBroadcastTest.scala  |  8 +++-
 .../spark/examples/SimpleSkewedGroupByTest.scala| 16 +++-
 .../apache/spark/examples/SkewedGroupByTest.scala   | 13 +
 .../scala/org/apache/spark/examples/SparkLR.scala   |  4 +---
 .../scala/org/apache/spark/examples/SparkPi.scala   |  3 +--
 .../scala/org/apache/spark/examples/SparkTC.scala   |  3 +--
 .../spark/examples/sql/hive/HiveFromSpark.scala |  3 +--
 12 files changed, 30 insertions(+), 52 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/2022afe5/examples/src/main/python/pi.py
--
diff --git a/examples/src/main/python/pi.py b/examples/src/main/python/pi.py
index b39d710..e3f0c4a 100755
--- a/examples/src/main/python/pi.py
+++ b/examples/src/main/python/pi.py
@@ -32,8 +32,6 @@ if __name__ == "__main__":
 .appName("PythonPi")\
 .getOrCreate()
 
-sc = spark.sparkContext
-
 partitions = int(sys.argv[1]) if len(sys.argv) > 1 else 2
 n = 10 * partitions
 
@@ -42,7 +40,7 @@ if __name__ == "__main__":
 y = random() * 2 - 1
 return 1 if x ** 2 + y ** 2 < 1 else 0
 
-count = sc.parallelize(range(1, n + 1), partitions).map(f).reduce(add)
+count = spark.sparkContext.parallelize(range(1, n + 1), 
partitions).map(f).reduce(add)
 print("Pi is roughly %f" % (4.0 * count / n))
 
 spark.stop()

http://git-wip-us.apache.org/repos/asf/spark/blob/2022afe5/examples/src/main/python/transitive_closure.py
--
diff --git a/examples/src/main/python/transitive_closure.py 
b/examples/src/main/python/transitive_closure.py
index d88ea94..49551d4 100755
--- a/examples/src/main/python/transitive_closure.py
+++ b/examples/src/main/python/transitive_closure.py
@@ -46,10 +46,8 @@ if __name__ == "__main__":
 .appName("PythonTransitiveClosure")\
 .getOrCreate()
 
-sc = spark.sparkContext
-
 partitions = int(sys.argv[1]) if len(sys.argv) > 1 else 2
-tc = sc.parallelize(generateGraph(), partitions).cache()
+tc = spark.sparkContext.parallelize(generateGraph(), partitions).cache()
 
 # Linear transitive closure: each round grows paths by one edge,
 # by joining the graph's edges with the already-discovered paths.

http://git-wip-us.apache.org/repos/asf/spark/blob/2022afe5/examples/src/main/scala/org/apache/spark/examples/DFSReadWriteTest.scala
--
diff --git 
a/examples/src/main/scala/org/apache/spark/examples/DFSReadWriteTest.scala 
b/examples/src/main/scala/org/apache/spark/examples/DFSReadWriteTest.scala
index 4b5e36c..3bff7ce 100644
--- a/examples/src/main/scala/org/apache/spark/examples/DFSReadWriteTest.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/DFSReadWriteTest.scala
@@ -107,16 +107,13 @@ object DFSReadWriteTest {
   .appName("DFS Read Write Test")
   .getOrCreate()
 
-println("Creating SparkContext")
-val sc = spark.sparkContext
-
 println("Writing local file to DFS")

spark git commit: [SPARK-15489][SQL] Dataset kryo encoder won't load custom user settings

2016-06-10 Thread marmbrus
Repository: spark
Updated Branches:
  refs/heads/master aec502d91 -> 127a6678d


[SPARK-15489][SQL] Dataset kryo encoder won't load custom user settings

## What changes were proposed in this pull request?

Serializer instantiation will consider existing SparkConf

## How was this patch tested?
manual test with `ImmutableList` (Guava) and `kryo-serializers`'s 
`Immutable*Serializer` implementations.

Added Test Suite.

(If this patch involves UI changes, please attach a screenshot; otherwise, 
remove this)

Author: Sela 

Closes #13424 from amitsela/SPARK-15489.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/127a6678
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/127a6678
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/127a6678

Branch: refs/heads/master
Commit: 127a6678d7af6b5164a115be7c64525bb80001fe
Parents: aec502d
Author: Sela 
Authored: Fri Jun 10 14:36:51 2016 -0700
Committer: Michael Armbrust 
Committed: Fri Jun 10 14:36:51 2016 -0700

--
 .../catalyst/expressions/objects/objects.scala  | 30 ++---
 .../sql/DatasetSerializerRegistratorSuite.scala | 68 
 2 files changed, 89 insertions(+), 9 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/127a6678/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
index 87c8a2e..c597a2a 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
@@ -22,7 +22,7 @@ import java.lang.reflect.Modifier
 import scala.language.existentials
 import scala.reflect.ClassTag
 
-import org.apache.spark.SparkConf
+import org.apache.spark.{SparkConf, SparkEnv}
 import org.apache.spark.serializer._
 import org.apache.spark.sql.Row
 import org.apache.spark.sql.catalyst.InternalRow
@@ -547,11 +547,17 @@ case class EncodeUsingSerializer(child: Expression, kryo: 
Boolean)
 (classOf[JavaSerializer].getName, 
classOf[JavaSerializerInstance].getName)
   }
 }
+// try conf from env, otherwise create a new one
+val env = s"${classOf[SparkEnv].getName}.get()"
 val sparkConf = s"new ${classOf[SparkConf].getName}()"
-ctx.addMutableState(
-  serializerInstanceClass,
-  serializer,
-  s"$serializer = ($serializerInstanceClass) new 
$serializerClass($sparkConf).newInstance();")
+val serializerInit = s"""
+  if ($env == null) {
+$serializer = ($serializerInstanceClass) new 
$serializerClass($sparkConf).newInstance();
+   } else {
+ $serializer = ($serializerInstanceClass) new 
$serializerClass($env.conf()).newInstance();
+   }
+ """
+ctx.addMutableState(serializerInstanceClass, serializer, serializerInit)
 
 // Code to serialize.
 val input = child.genCode(ctx)
@@ -587,11 +593,17 @@ case class DecodeUsingSerializer[T](child: Expression, 
tag: ClassTag[T], kryo: B
 (classOf[JavaSerializer].getName, 
classOf[JavaSerializerInstance].getName)
   }
 }
+// try conf from env, otherwise create a new one
+val env = s"${classOf[SparkEnv].getName}.get()"
 val sparkConf = s"new ${classOf[SparkConf].getName}()"
-ctx.addMutableState(
-  serializerInstanceClass,
-  serializer,
-  s"$serializer = ($serializerInstanceClass) new 
$serializerClass($sparkConf).newInstance();")
+val serializerInit = s"""
+  if ($env == null) {
+$serializer = ($serializerInstanceClass) new 
$serializerClass($sparkConf).newInstance();
+   } else {
+ $serializer = ($serializerInstanceClass) new 
$serializerClass($env.conf()).newInstance();
+   }
+ """
+ctx.addMutableState(serializerInstanceClass, serializer, serializerInit)
 
 // Code to deserialize.
 val input = child.genCode(ctx)

http://git-wip-us.apache.org/repos/asf/spark/blob/127a6678/sql/core/src/test/scala/org/apache/spark/sql/DatasetSerializerRegistratorSuite.scala
--
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSerializerRegistratorSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSerializerRegistratorSuite.scala
new file mode 100644
index 000..0f3d0ce
--- /dev/null
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSerializerRegistratorSuite.scala
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreem

spark git commit: [SPARK-15489][SQL] Dataset kryo encoder won't load custom user settings

2016-06-10 Thread marmbrus
Repository: spark
Updated Branches:
  refs/heads/branch-2.0 bc53422ad -> e6ebb547b


[SPARK-15489][SQL] Dataset kryo encoder won't load custom user settings

## What changes were proposed in this pull request?

Serializer instantiation will consider existing SparkConf

## How was this patch tested?
manual test with `ImmutableList` (Guava) and `kryo-serializers`'s 
`Immutable*Serializer` implementations.

Added Test Suite.

(If this patch involves UI changes, please attach a screenshot; otherwise, 
remove this)

Author: Sela 

Closes #13424 from amitsela/SPARK-15489.

(cherry picked from commit 127a6678d7af6b5164a115be7c64525bb80001fe)
Signed-off-by: Michael Armbrust 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e6ebb547
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e6ebb547
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e6ebb547

Branch: refs/heads/branch-2.0
Commit: e6ebb547b197f906b9706847ad871b337b4a9e7f
Parents: bc53422
Author: Sela 
Authored: Fri Jun 10 14:36:51 2016 -0700
Committer: Michael Armbrust 
Committed: Fri Jun 10 14:36:59 2016 -0700

--
 .../catalyst/expressions/objects/objects.scala  | 30 ++---
 .../sql/DatasetSerializerRegistratorSuite.scala | 68 
 2 files changed, 89 insertions(+), 9 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/e6ebb547/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
index 87c8a2e..c597a2a 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
@@ -22,7 +22,7 @@ import java.lang.reflect.Modifier
 import scala.language.existentials
 import scala.reflect.ClassTag
 
-import org.apache.spark.SparkConf
+import org.apache.spark.{SparkConf, SparkEnv}
 import org.apache.spark.serializer._
 import org.apache.spark.sql.Row
 import org.apache.spark.sql.catalyst.InternalRow
@@ -547,11 +547,17 @@ case class EncodeUsingSerializer(child: Expression, kryo: 
Boolean)
 (classOf[JavaSerializer].getName, 
classOf[JavaSerializerInstance].getName)
   }
 }
+// try conf from env, otherwise create a new one
+val env = s"${classOf[SparkEnv].getName}.get()"
 val sparkConf = s"new ${classOf[SparkConf].getName}()"
-ctx.addMutableState(
-  serializerInstanceClass,
-  serializer,
-  s"$serializer = ($serializerInstanceClass) new 
$serializerClass($sparkConf).newInstance();")
+val serializerInit = s"""
+  if ($env == null) {
+$serializer = ($serializerInstanceClass) new 
$serializerClass($sparkConf).newInstance();
+   } else {
+ $serializer = ($serializerInstanceClass) new 
$serializerClass($env.conf()).newInstance();
+   }
+ """
+ctx.addMutableState(serializerInstanceClass, serializer, serializerInit)
 
 // Code to serialize.
 val input = child.genCode(ctx)
@@ -587,11 +593,17 @@ case class DecodeUsingSerializer[T](child: Expression, 
tag: ClassTag[T], kryo: B
 (classOf[JavaSerializer].getName, 
classOf[JavaSerializerInstance].getName)
   }
 }
+// try conf from env, otherwise create a new one
+val env = s"${classOf[SparkEnv].getName}.get()"
 val sparkConf = s"new ${classOf[SparkConf].getName}()"
-ctx.addMutableState(
-  serializerInstanceClass,
-  serializer,
-  s"$serializer = ($serializerInstanceClass) new 
$serializerClass($sparkConf).newInstance();")
+val serializerInit = s"""
+  if ($env == null) {
+$serializer = ($serializerInstanceClass) new 
$serializerClass($sparkConf).newInstance();
+   } else {
+ $serializer = ($serializerInstanceClass) new 
$serializerClass($env.conf()).newInstance();
+   }
+ """
+ctx.addMutableState(serializerInstanceClass, serializer, serializerInit)
 
 // Code to deserialize.
 val input = child.genCode(ctx)

http://git-wip-us.apache.org/repos/asf/spark/blob/e6ebb547/sql/core/src/test/scala/org/apache/spark/sql/DatasetSerializerRegistratorSuite.scala
--
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSerializerRegistratorSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSerializerRegistratorSuite.scala
new file mode 100644
index 000..0f3d0ce
--- /dev/null
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSerializerRegistratorSuite.scala
@@ -0,0 +1,

spark git commit: [SPARK-15654] [SQL] fix non-splitable files for text based file formats

2016-06-10 Thread davies
Repository: spark
Updated Branches:
  refs/heads/branch-2.0 f2e5d6d0f -> bc53422ad


[SPARK-15654] [SQL] fix non-splitable files for text based file formats

## What changes were proposed in this pull request?

Currently, we always split the files when it's bigger than maxSplitBytes, but 
Hadoop LineRecordReader does not respect the splits for compressed files 
correctly, we should have a API for FileFormat to check whether the file could 
be splitted or not.

This PR is based on #13442, closes #13442

## How was this patch tested?

add regression tests.

Author: Davies Liu 

Closes #13531 from davies/fix_split.

(cherry picked from commit aec502d9114ad8e18bfbbd63f38780e076d326d1)
Signed-off-by: Davies Liu 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/bc53422a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/bc53422a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/bc53422a

Branch: refs/heads/branch-2.0
Commit: bc53422ad54460069f0e36061c6be5ef76b4dbaa
Parents: f2e5d6d
Author: Davies Liu 
Authored: Fri Jun 10 14:32:43 2016 -0700
Committer: Davies Liu 
Committed: Fri Jun 10 14:32:53 2016 -0700

--
 .../spark/ml/source/libsvm/LibSVMRelation.scala |  2 +-
 .../datasources/FileSourceStrategy.scala| 17 ++---
 .../datasources/csv/CSVFileFormat.scala |  2 +-
 .../datasources/fileSourceInterfaces.scala  | 33 +++--
 .../datasources/json/JsonFileFormat.scala   |  2 +-
 .../datasources/parquet/ParquetFileFormat.scala |  7 
 .../datasources/text/TextFileFormat.scala   |  2 +-
 .../datasources/FileSourceStrategySuite.scala   | 37 +++-
 .../execution/datasources/text/TextSuite.scala  | 17 +
 .../spark/sql/hive/orc/OrcFileFormat.scala  |  7 
 .../spark/sql/sources/SimpleTextRelation.scala  |  2 +-
 11 files changed, 115 insertions(+), 13 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/bc53422a/mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala
--
diff --git 
a/mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala 
b/mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala
index 7629369..b5b2a68 100644
--- 
a/mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala
+++ 
b/mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala
@@ -112,7 +112,7 @@ private[libsvm] class LibSVMOutputWriter(
  */
 // If this is moved or renamed, please update DataSource's 
backwardCompatibilityMap.
 @Since("1.6.0")
-class LibSVMFileFormat extends FileFormat with DataSourceRegister {
+class LibSVMFileFormat extends TextBasedFileFormat with DataSourceRegister {
 
   @Since("1.6.0")
   override def shortName(): String = "libsvm"

http://git-wip-us.apache.org/repos/asf/spark/blob/bc53422a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
index 7503285..13a86bf 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
@@ -151,11 +151,18 @@ private[sql] object FileSourceStrategy extends Strategy 
with Logging {
   val splitFiles = selectedPartitions.flatMap { partition =>
 partition.files.flatMap { file =>
   val blockLocations = getBlockLocations(file)
-  (0L until file.getLen by maxSplitBytes).map { offset =>
-val remaining = file.getLen - offset
-val size = if (remaining > maxSplitBytes) maxSplitBytes else 
remaining
-val hosts = getBlockHosts(blockLocations, offset, size)
-PartitionedFile(partition.values, file.getPath.toUri.toString, 
offset, size, hosts)
+  if (files.fileFormat.isSplitable(files.sparkSession, 
files.options, file.getPath)) {
+(0L until file.getLen by maxSplitBytes).map { offset =>
+  val remaining = file.getLen - offset
+  val size = if (remaining > maxSplitBytes) maxSplitBytes else 
remaining
+  val hosts = getBlockHosts(blockLocations, offset, size)
+  PartitionedFile(
+partition.values, file.getPath.toUri.toString, offset, 
size, hosts)
+}
+  } else {
+val hosts = getBlockHosts(blockLocations, 0, file.getLen)
+   

spark git commit: [SPARK-15654] [SQL] fix non-splitable files for text based file formats

2016-06-10 Thread davies
Repository: spark
Updated Branches:
  refs/heads/master e05a2feeb -> aec502d91


[SPARK-15654] [SQL] fix non-splitable files for text based file formats

## What changes were proposed in this pull request?

Currently, we always split the files when it's bigger than maxSplitBytes, but 
Hadoop LineRecordReader does not respect the splits for compressed files 
correctly, we should have a API for FileFormat to check whether the file could 
be splitted or not.

This PR is based on #13442, closes #13442

## How was this patch tested?

add regression tests.

Author: Davies Liu 

Closes #13531 from davies/fix_split.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/aec502d9
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/aec502d9
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/aec502d9

Branch: refs/heads/master
Commit: aec502d9114ad8e18bfbbd63f38780e076d326d1
Parents: e05a2fe
Author: Davies Liu 
Authored: Fri Jun 10 14:32:43 2016 -0700
Committer: Davies Liu 
Committed: Fri Jun 10 14:32:43 2016 -0700

--
 .../spark/ml/source/libsvm/LibSVMRelation.scala |  2 +-
 .../datasources/FileSourceStrategy.scala| 17 ++---
 .../datasources/csv/CSVFileFormat.scala |  2 +-
 .../datasources/fileSourceInterfaces.scala  | 33 +++--
 .../datasources/json/JsonFileFormat.scala   |  2 +-
 .../datasources/parquet/ParquetFileFormat.scala |  7 
 .../datasources/text/TextFileFormat.scala   |  2 +-
 .../datasources/FileSourceStrategySuite.scala   | 37 +++-
 .../execution/datasources/text/TextSuite.scala  | 17 +
 .../spark/sql/hive/orc/OrcFileFormat.scala  |  7 
 .../spark/sql/sources/SimpleTextRelation.scala  |  2 +-
 11 files changed, 115 insertions(+), 13 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/aec502d9/mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala
--
diff --git 
a/mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala 
b/mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala
index 7629369..b5b2a68 100644
--- 
a/mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala
+++ 
b/mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala
@@ -112,7 +112,7 @@ private[libsvm] class LibSVMOutputWriter(
  */
 // If this is moved or renamed, please update DataSource's 
backwardCompatibilityMap.
 @Since("1.6.0")
-class LibSVMFileFormat extends FileFormat with DataSourceRegister {
+class LibSVMFileFormat extends TextBasedFileFormat with DataSourceRegister {
 
   @Since("1.6.0")
   override def shortName(): String = "libsvm"

http://git-wip-us.apache.org/repos/asf/spark/blob/aec502d9/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
index 7503285..13a86bf 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
@@ -151,11 +151,18 @@ private[sql] object FileSourceStrategy extends Strategy 
with Logging {
   val splitFiles = selectedPartitions.flatMap { partition =>
 partition.files.flatMap { file =>
   val blockLocations = getBlockLocations(file)
-  (0L until file.getLen by maxSplitBytes).map { offset =>
-val remaining = file.getLen - offset
-val size = if (remaining > maxSplitBytes) maxSplitBytes else 
remaining
-val hosts = getBlockHosts(blockLocations, offset, size)
-PartitionedFile(partition.values, file.getPath.toUri.toString, 
offset, size, hosts)
+  if (files.fileFormat.isSplitable(files.sparkSession, 
files.options, file.getPath)) {
+(0L until file.getLen by maxSplitBytes).map { offset =>
+  val remaining = file.getLen - offset
+  val size = if (remaining > maxSplitBytes) maxSplitBytes else 
remaining
+  val hosts = getBlockHosts(blockLocations, offset, size)
+  PartitionedFile(
+partition.values, file.getPath.toUri.toString, offset, 
size, hosts)
+}
+  } else {
+val hosts = getBlockHosts(blockLocations, 0, file.getLen)
+Seq(PartitionedFile(
+  partition.values, file.getPath.toUri.toString, 0, 
file.getL

spark git commit: [SPARK-15825] [SQL] Fix SMJ invalid results

2016-06-10 Thread davies
Repository: spark
Updated Branches:
  refs/heads/master 026eb9064 -> e05a2feeb


[SPARK-15825] [SQL] Fix SMJ invalid results

## What changes were proposed in this pull request?
Code generated `SortMergeJoin` failed with wrong results when using structs as 
keys. This could (eventually) be traced back to the use of a wrong row 
reference when comparing structs.

## How was this patch tested?
TBD

Author: Herman van Hovell 

Closes #13589 from hvanhovell/SPARK-15822.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e05a2fee
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e05a2fee
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e05a2fee

Branch: refs/heads/master
Commit: e05a2feebe928df691d5a8f42f22e088c6263dcf
Parents: 026eb90
Author: Herman van Hovell 
Authored: Fri Jun 10 14:29:05 2016 -0700
Committer: Davies Liu 
Committed: Fri Jun 10 14:29:05 2016 -0700

--
 .../catalyst/expressions/codegen/CodeGenerator.scala |  1 +
 .../spark/sql/execution/joins/InnerJoinSuite.scala   | 15 +++
 2 files changed, 16 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/e05a2fee/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
index 9657f26..ca20292 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
@@ -490,6 +490,7 @@ class CodegenContext {
   addNewFunction(compareFunc, funcCode)
   s"this.$compareFunc($c1, $c2)"
 case schema: StructType =>
+  INPUT_ROW = "i"
   val comparisons = GenerateOrdering.genComparisons(this, schema)
   val compareFunc = freshName("compareStruct")
   val funcCode: String =

http://git-wip-us.apache.org/repos/asf/spark/blob/e05a2fee/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/InnerJoinSuite.scala
--
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/InnerJoinSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/InnerJoinSuite.scala
index 27f6abc..35dab63 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/InnerJoinSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/InnerJoinSuite.scala
@@ -271,4 +271,19 @@ class InnerJoinSuite extends SparkPlanTest with 
SharedSQLContext {
   )
 )
   }
+
+  {
+def df: DataFrame = spark.range(3).selectExpr("struct(id, id) as key", "id 
as value")
+lazy val left = df.selectExpr("key", "concat('L', value) as 
value").alias("left")
+lazy val right = df.selectExpr("key", "concat('R', value) as 
value").alias("right")
+testInnerJoin(
+  "SPARK-15822 - test structs as keys",
+  left,
+  right,
+  () => (left.col("key") === right.col("key")).expr,
+  Seq(
+(Row(0, 0), "L0", Row(0, 0), "R0"),
+(Row(1, 1), "L1", Row(1, 1), "R1"),
+(Row(2, 2), "L2", Row(2, 2), "R2")))
+  }
 }


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-15825] [SQL] Fix SMJ invalid results

2016-06-10 Thread davies
Repository: spark
Updated Branches:
  refs/heads/branch-2.0 80b8711b3 -> f2e5d6d0f


[SPARK-15825] [SQL] Fix SMJ invalid results

## What changes were proposed in this pull request?
Code generated `SortMergeJoin` failed with wrong results when using structs as 
keys. This could (eventually) be traced back to the use of a wrong row 
reference when comparing structs.

## How was this patch tested?
TBD

Author: Herman van Hovell 

Closes #13589 from hvanhovell/SPARK-15822.

(cherry picked from commit e05a2feebe928df691d5a8f42f22e088c6263dcf)
Signed-off-by: Davies Liu 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f2e5d6d0
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f2e5d6d0
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f2e5d6d0

Branch: refs/heads/branch-2.0
Commit: f2e5d6d0f446d7f1d6d8c3208871074abd669482
Parents: 80b8711
Author: Herman van Hovell 
Authored: Fri Jun 10 14:29:05 2016 -0700
Committer: Davies Liu 
Committed: Fri Jun 10 14:31:40 2016 -0700

--
 .../catalyst/expressions/codegen/CodeGenerator.scala |  1 +
 .../spark/sql/execution/joins/InnerJoinSuite.scala   | 15 +++
 2 files changed, 16 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/f2e5d6d0/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
index 9657f26..ca20292 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
@@ -490,6 +490,7 @@ class CodegenContext {
   addNewFunction(compareFunc, funcCode)
   s"this.$compareFunc($c1, $c2)"
 case schema: StructType =>
+  INPUT_ROW = "i"
   val comparisons = GenerateOrdering.genComparisons(this, schema)
   val compareFunc = freshName("compareStruct")
   val funcCode: String =

http://git-wip-us.apache.org/repos/asf/spark/blob/f2e5d6d0/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/InnerJoinSuite.scala
--
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/InnerJoinSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/InnerJoinSuite.scala
index 27f6abc..35dab63 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/InnerJoinSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/InnerJoinSuite.scala
@@ -271,4 +271,19 @@ class InnerJoinSuite extends SparkPlanTest with 
SharedSQLContext {
   )
 )
   }
+
+  {
+def df: DataFrame = spark.range(3).selectExpr("struct(id, id) as key", "id 
as value")
+lazy val left = df.selectExpr("key", "concat('L', value) as 
value").alias("left")
+lazy val right = df.selectExpr("key", "concat('R', value) as 
value").alias("right")
+testInnerJoin(
+  "SPARK-15822 - test structs as keys",
+  left,
+  right,
+  () => (left.col("key") === right.col("key")).expr,
+  Seq(
+(Row(0, 0), "L0", Row(0, 0), "R0"),
+(Row(1, 1), "L1", Row(1, 1), "R1"),
+(Row(2, 2), "L2", Row(2, 2), "R2")))
+  }
 }


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-15738][PYSPARK][ML] Adding Pyspark ml RFormula __str__ method similar to Scala API

2016-06-10 Thread yliang
Repository: spark
Updated Branches:
  refs/heads/branch-2.0 8b6742a37 -> 80b8711b3


[SPARK-15738][PYSPARK][ML] Adding Pyspark ml RFormula __str__ method similar to 
Scala API

## What changes were proposed in this pull request?
Adding __str__ to RFormula and model that will show the set formula param and 
resolved formula.  This is currently present in the Scala API, found missing in 
PySpark during Spark 2.0 coverage review.

## How was this patch tested?
run pyspark-ml tests locally

Author: Bryan Cutler 

Closes #13481 from BryanCutler/pyspark-ml-rformula_str-SPARK-15738.

(cherry picked from commit 7d7a0a5e0749909e97d90188707cc9220a1bb73a)
Signed-off-by: Yanbo Liang 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/80b8711b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/80b8711b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/80b8711b

Branch: refs/heads/branch-2.0
Commit: 80b8711b342c5a569fe89d7ffbdd552653b9b6ec
Parents: 8b6742a
Author: Bryan Cutler 
Authored: Fri Jun 10 11:27:30 2016 -0700
Committer: Yanbo Liang 
Committed: Fri Jun 10 14:01:55 2016 -0700

--
 .../scala/org/apache/spark/ml/feature/RFormula.scala  |  2 +-
 .../org/apache/spark/ml/feature/RFormulaParser.scala  | 14 +-
 python/pyspark/ml/feature.py  | 12 
 3 files changed, 26 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/80b8711b/mllib/src/main/scala/org/apache/spark/ml/feature/RFormula.scala
--
diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/RFormula.scala 
b/mllib/src/main/scala/org/apache/spark/ml/feature/RFormula.scala
index 2916b6d..a7ca0fe 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/feature/RFormula.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/feature/RFormula.scala
@@ -182,7 +182,7 @@ class RFormula(override val uid: String)
 
   override def copy(extra: ParamMap): RFormula = defaultCopy(extra)
 
-  override def toString: String = s"RFormula(${get(formula)}) (uid=$uid)"
+  override def toString: String = s"RFormula(${get(formula).getOrElse("")}) 
(uid=$uid)"
 }
 
 @Since("2.0.0")

http://git-wip-us.apache.org/repos/asf/spark/blob/80b8711b/mllib/src/main/scala/org/apache/spark/ml/feature/RFormulaParser.scala
--
diff --git 
a/mllib/src/main/scala/org/apache/spark/ml/feature/RFormulaParser.scala 
b/mllib/src/main/scala/org/apache/spark/ml/feature/RFormulaParser.scala
index 19aecff..2dd565a 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/feature/RFormulaParser.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/feature/RFormulaParser.scala
@@ -126,7 +126,19 @@ private[ml] case class ParsedRFormula(label: ColumnRef, 
terms: Seq[Term]) {
  * @param hasIntercept whether the formula specifies fitting with an intercept.
  */
 private[ml] case class ResolvedRFormula(
-  label: String, terms: Seq[Seq[String]], hasIntercept: Boolean)
+  label: String, terms: Seq[Seq[String]], hasIntercept: Boolean) {
+
+  override def toString: String = {
+val ts = terms.map {
+  case t if t.length > 1 =>
+s"${t.mkString("{", ",", "}")}"
+  case t =>
+t.mkString
+}
+val termStr = ts.mkString("[", ",", "]")
+s"ResolvedRFormula(label=$label, terms=$termStr, 
hasIntercept=$hasIntercept)"
+  }
+}
 
 /**
  * R formula terms. See the R formula docs here for more information:

http://git-wip-us.apache.org/repos/asf/spark/blob/80b8711b/python/pyspark/ml/feature.py
--
diff --git a/python/pyspark/ml/feature.py b/python/pyspark/ml/feature.py
index bfb2fb7..ca77ac3 100755
--- a/python/pyspark/ml/feature.py
+++ b/python/pyspark/ml/feature.py
@@ -2528,6 +2528,8 @@ class RFormula(JavaEstimator, HasFeaturesCol, 
HasLabelCol, JavaMLReadable, JavaM
 True
 >>> loadedRF.getLabelCol() == rf.getLabelCol()
 True
+>>> str(loadedRF)
+'RFormula(y ~ x + s) (uid=...)'
 >>> modelPath = temp_path + "/rFormulaModel"
 >>> model.save(modelPath)
 >>> loadedModel = RFormulaModel.load(modelPath)
@@ -2542,6 +2544,8 @@ class RFormula(JavaEstimator, HasFeaturesCol, 
HasLabelCol, JavaMLReadable, JavaM
 |0.0|0.0|  a|[0.0,1.0]|  0.0|
 +---+---+---+-+-+
 ...
+>>> str(loadedModel)
+'RFormulaModel(ResolvedRFormula(label=y, terms=[x,s], hasIntercept=true)) 
(uid=...)'
 
 .. versionadded:: 1.5.0
 """
@@ -2586,6 +2590,10 @@ class RFormula(JavaEstimator, HasFeaturesCol, 
HasLabelCol, JavaMLReadable, JavaM
 def _create_model(self, java_model):
 return RFormulaModel(java_model)
 
+def __str__(self):
+formulaStr = self.getFormula()

spark git commit: [SPARK-15875] Try to use Seq.isEmpty and Seq.nonEmpty instead of Seq.length == 0 and Seq.length > 0

2016-06-10 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/branch-2.0 96bb1476c -> 8b6742a37


[SPARK-15875] Try to use Seq.isEmpty and Seq.nonEmpty instead of Seq.length == 
0 and Seq.length > 0

## What changes were proposed in this pull request?

In scala, immutable.List.length is an expensive operation so we should
avoid using Seq.length == 0 or Seq.lenth > 0, and use Seq.isEmpty and 
Seq.nonEmpty instead.

## How was this patch tested?
existing tests

Author: wangyang 

Closes #13601 from yangw1234/isEmpty.

(cherry picked from commit 026eb90644be7685971dacaabae67a293edd0133)
Signed-off-by: Reynold Xin 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8b6742a3
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8b6742a3
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8b6742a3

Branch: refs/heads/branch-2.0
Commit: 8b6742a37d35520eedaee5f3112529136b3a21e4
Parents: 96bb147
Author: wangyang 
Authored: Fri Jun 10 13:10:03 2016 -0700
Committer: Reynold Xin 
Committed: Fri Jun 10 13:10:09 2016 -0700

--
 .../org/apache/spark/api/python/PythonWorkerFactory.scala  | 2 +-
 .../scala/org/apache/spark/rdd/PartitionerAwareUnionRDD.scala  | 2 +-
 .../apache/spark/util/collection/ExternalAppendOnlyMap.scala   | 6 +++---
 .../main/scala/org/apache/spark/mllib/clustering/KMeans.scala  | 2 +-
 .../org/apache/spark/sql/catalyst/analysis/Analyzer.scala  | 2 +-
 .../org/apache/spark/sql/catalyst/optimizer/Optimizer.scala| 2 +-
 .../spark/sql/execution/aggregate/SortAggregateExec.scala  | 4 ++--
 .../org/apache/spark/sql/execution/metric/SQLMetrics.scala | 2 +-
 .../apache/spark/streaming/dstream/TransformedDStream.scala| 2 +-
 9 files changed, 12 insertions(+), 12 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/8b6742a3/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala 
b/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala
index 3df87f6..6a5e6f7 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala
@@ -235,7 +235,7 @@ private[spark] class PythonWorkerFactory(pythonExec: 
String, envVars: Map[String
   }
 
   private def cleanupIdleWorkers() {
-while (idleWorkers.length > 0) {
+while (idleWorkers.nonEmpty) {
   val worker = idleWorkers.dequeue()
   try {
 // the worker will exit after closing the socket

http://git-wip-us.apache.org/repos/asf/spark/blob/8b6742a3/core/src/main/scala/org/apache/spark/rdd/PartitionerAwareUnionRDD.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/rdd/PartitionerAwareUnionRDD.scala 
b/core/src/main/scala/org/apache/spark/rdd/PartitionerAwareUnionRDD.scala
index b6366f3..d744d67 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PartitionerAwareUnionRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PartitionerAwareUnionRDD.scala
@@ -60,7 +60,7 @@ class PartitionerAwareUnionRDD[T: ClassTag](
 sc: SparkContext,
 var rdds: Seq[RDD[T]]
   ) extends RDD[T](sc, rdds.map(x => new OneToOneDependency(x))) {
-  require(rdds.length > 0)
+  require(rdds.nonEmpty)
   require(rdds.forall(_.partitioner.isDefined))
   require(rdds.flatMap(_.partitioner).toSet.size == 1,
 "Parent RDDs have different partitioners: " + rdds.flatMap(_.partitioner))

http://git-wip-us.apache.org/repos/asf/spark/blob/8b6742a3/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
 
b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
index fc71f83..6ddc72a 100644
--- 
a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
+++ 
b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
@@ -375,14 +375,14 @@ class ExternalAppendOnlyMap[K, V, C](
 /**
  * Return true if there exists an input stream that still has unvisited 
pairs.
  */
-override def hasNext: Boolean = mergeHeap.length > 0
+override def hasNext: Boolean = mergeHeap.nonEmpty
 
 /**
  * Select a key with the minimum hash, then combine all values with the 
same key from all
  * input streams.
  */
 override def next(): (K, C) = {
-  if (mergeHeap.length == 0) {
+  if (mergeHeap.isEmpty) {
 throw new NoSuchElementException
   }
   // Select a key from the Stre

spark git commit: [SPARK-15875] Try to use Seq.isEmpty and Seq.nonEmpty instead of Seq.length == 0 and Seq.length > 0

2016-06-10 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/master 865ec32dd -> 026eb9064


[SPARK-15875] Try to use Seq.isEmpty and Seq.nonEmpty instead of Seq.length == 
0 and Seq.length > 0

## What changes were proposed in this pull request?

In scala, immutable.List.length is an expensive operation so we should
avoid using Seq.length == 0 or Seq.lenth > 0, and use Seq.isEmpty and 
Seq.nonEmpty instead.

## How was this patch tested?
existing tests

Author: wangyang 

Closes #13601 from yangw1234/isEmpty.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/026eb906
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/026eb906
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/026eb906

Branch: refs/heads/master
Commit: 026eb90644be7685971dacaabae67a293edd0133
Parents: 865ec32
Author: wangyang 
Authored: Fri Jun 10 13:10:03 2016 -0700
Committer: Reynold Xin 
Committed: Fri Jun 10 13:10:03 2016 -0700

--
 .../org/apache/spark/api/python/PythonWorkerFactory.scala  | 2 +-
 .../scala/org/apache/spark/rdd/PartitionerAwareUnionRDD.scala  | 2 +-
 .../apache/spark/util/collection/ExternalAppendOnlyMap.scala   | 6 +++---
 .../main/scala/org/apache/spark/mllib/clustering/KMeans.scala  | 2 +-
 .../org/apache/spark/sql/catalyst/analysis/Analyzer.scala  | 2 +-
 .../org/apache/spark/sql/catalyst/optimizer/Optimizer.scala| 2 +-
 .../spark/sql/execution/aggregate/SortAggregateExec.scala  | 4 ++--
 .../org/apache/spark/sql/execution/metric/SQLMetrics.scala | 2 +-
 .../apache/spark/streaming/dstream/TransformedDStream.scala| 2 +-
 9 files changed, 12 insertions(+), 12 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/026eb906/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala 
b/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala
index 3df87f6..6a5e6f7 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala
@@ -235,7 +235,7 @@ private[spark] class PythonWorkerFactory(pythonExec: 
String, envVars: Map[String
   }
 
   private def cleanupIdleWorkers() {
-while (idleWorkers.length > 0) {
+while (idleWorkers.nonEmpty) {
   val worker = idleWorkers.dequeue()
   try {
 // the worker will exit after closing the socket

http://git-wip-us.apache.org/repos/asf/spark/blob/026eb906/core/src/main/scala/org/apache/spark/rdd/PartitionerAwareUnionRDD.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/rdd/PartitionerAwareUnionRDD.scala 
b/core/src/main/scala/org/apache/spark/rdd/PartitionerAwareUnionRDD.scala
index b6366f3..d744d67 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PartitionerAwareUnionRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PartitionerAwareUnionRDD.scala
@@ -60,7 +60,7 @@ class PartitionerAwareUnionRDD[T: ClassTag](
 sc: SparkContext,
 var rdds: Seq[RDD[T]]
   ) extends RDD[T](sc, rdds.map(x => new OneToOneDependency(x))) {
-  require(rdds.length > 0)
+  require(rdds.nonEmpty)
   require(rdds.forall(_.partitioner.isDefined))
   require(rdds.flatMap(_.partitioner).toSet.size == 1,
 "Parent RDDs have different partitioners: " + rdds.flatMap(_.partitioner))

http://git-wip-us.apache.org/repos/asf/spark/blob/026eb906/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
 
b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
index fc71f83..6ddc72a 100644
--- 
a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
+++ 
b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
@@ -375,14 +375,14 @@ class ExternalAppendOnlyMap[K, V, C](
 /**
  * Return true if there exists an input stream that still has unvisited 
pairs.
  */
-override def hasNext: Boolean = mergeHeap.length > 0
+override def hasNext: Boolean = mergeHeap.nonEmpty
 
 /**
  * Select a key with the minimum hash, then combine all values with the 
same key from all
  * input streams.
  */
 override def next(): (K, C) = {
-  if (mergeHeap.length == 0) {
+  if (mergeHeap.isEmpty) {
 throw new NoSuchElementException
   }
   // Select a key from the StreamBuffer that holds the lowest key hash
@@ -397,7 +397,7 @@ class ExternalAppendOnlyMap[K, V, C](
   /

spark git commit: [SPARK-6320][SQL] Move planLater method into GenericStrategy.

2016-06-10 Thread marmbrus
Repository: spark
Updated Branches:
  refs/heads/master fb219029d -> 667d4ea7b


[SPARK-6320][SQL] Move planLater method into GenericStrategy.

## What changes were proposed in this pull request?

This PR moves `QueryPlanner.planLater()` method into `GenericStrategy` for 
extra strategies to be able to use `planLater` in its strategy.

## How was this patch tested?

Existing tests.

Author: Takuya UESHIN 

Closes #13147 from ueshin/issues/SPARK-6320.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/667d4ea7
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/667d4ea7
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/667d4ea7

Branch: refs/heads/master
Commit: 667d4ea7b35f285954ea7cb719b7c80581e31f4d
Parents: fb21902
Author: Takuya UESHIN 
Authored: Fri Jun 10 13:06:18 2016 -0700
Committer: Michael Armbrust 
Committed: Fri Jun 10 13:06:18 2016 -0700

--
 .../sql/catalyst/planning/QueryPlanner.scala| 58 ++
 .../spark/sql/execution/QueryExecution.scala|  2 +
 .../spark/sql/execution/SparkPlanner.scala  | 13 
 .../spark/sql/execution/SparkStrategies.scala   | 23 +++
 .../scala/org/apache/spark/sql/package.scala|  4 +-
 .../spark/sql/execution/SparkPlannerSuite.scala | 63 
 6 files changed, 151 insertions(+), 12 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/667d4ea7/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/QueryPlanner.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/QueryPlanner.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/QueryPlanner.scala
index 8b1a34f..5f694f4 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/QueryPlanner.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/QueryPlanner.scala
@@ -27,6 +27,14 @@ import org.apache.spark.sql.catalyst.trees.TreeNode
  * empty list should be returned.
  */
 abstract class GenericStrategy[PhysicalPlan <: TreeNode[PhysicalPlan]] extends 
Logging {
+
+  /**
+   * Returns a placeholder for a physical plan that executes `plan`. This 
placeholder will be
+   * filled in automatically by the QueryPlanner using the other execution 
strategies that are
+   * available.
+   */
+  protected def planLater(plan: LogicalPlan): PhysicalPlan
+
   def apply(plan: LogicalPlan): Seq[PhysicalPlan]
 }
 
@@ -47,17 +55,47 @@ abstract class QueryPlanner[PhysicalPlan <: 
TreeNode[PhysicalPlan]] {
   /** A list of execution strategies that can be used by the planner */
   def strategies: Seq[GenericStrategy[PhysicalPlan]]
 
-  /**
-   * Returns a placeholder for a physical plan that executes `plan`. This 
placeholder will be
-   * filled in automatically by the QueryPlanner using the other execution 
strategies that are
-   * available.
-   */
-  protected def planLater(plan: LogicalPlan): PhysicalPlan = 
this.plan(plan).next()
-
   def plan(plan: LogicalPlan): Iterator[PhysicalPlan] = {
 // Obviously a lot to do here still...
-val iter = strategies.view.flatMap(_(plan)).toIterator
-assert(iter.hasNext, s"No plan for $plan")
-iter
+
+// Collect physical plan candidates.
+val candidates = strategies.iterator.flatMap(_(plan))
+
+// The candidates may contain placeholders marked as [[planLater]],
+// so try to replace them by their child plans.
+val plans = candidates.flatMap { candidate =>
+  val placeholders = collectPlaceholders(candidate)
+
+  if (placeholders.isEmpty) {
+// Take the candidate as is because it does not contain placeholders.
+Iterator(candidate)
+  } else {
+// Plan the logical plan marked as [[planLater]] and replace the 
placeholders.
+placeholders.iterator.foldLeft(Iterator(candidate)) {
+  case (candidatesWithPlaceholders, (placeholder, logicalPlan)) =>
+// Plan the logical plan for the placeholder.
+val childPlans = this.plan(logicalPlan)
+
+candidatesWithPlaceholders.flatMap { candidateWithPlaceholders =>
+  childPlans.map { childPlan =>
+// Replace the placeholder by the child plan
+candidateWithPlaceholders.transformUp {
+  case p if p == placeholder => childPlan
+}
+  }
+}
+}
+  }
+}
+
+val pruned = prunePlans(plans)
+assert(pruned.hasNext, s"No plan for $plan")
+pruned
   }
+
+  /** Collects placeholders marked as [[planLater]] by strategy and its 
[[LogicalPlan]]s */
+  protected def collectPlaceholders(plan: PhysicalPlan): Seq[(PhysicalPlan, 
LogicalPlan)]
+
+  /** Prunes bad plans to prevent com

spark git commit: [MINOR][X][X] Replace all occurrences of None: Option with Option.empty

2016-06-10 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/master 667d4ea7b -> 865ec32dd


[MINOR][X][X] Replace all occurrences of None: Option with Option.empty

## What changes were proposed in this pull request?
Replace all occurrences of `None: Option[X]` with `Option.empty[X]`

## How was this patch tested?
Exisiting Tests

Author: Sandeep Singh 

Closes #13591 from techaddict/minor-7.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/865ec32d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/865ec32d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/865ec32d

Branch: refs/heads/master
Commit: 865ec32dd997e63aea01a871d1c7b4947f43c111
Parents: 667d4ea
Author: Sandeep Singh 
Authored: Fri Jun 10 13:06:51 2016 -0700
Committer: Reynold Xin 
Committed: Fri Jun 10 13:06:51 2016 -0700

--
 .../org/apache/spark/sql/catalyst/trees/TreeNode.scala|  4 ++--
 .../main/scala/org/apache/spark/sql/DataFrameWriter.scala |  2 +-
 .../sql/execution/command/createDataSourceTables.scala|  2 +-
 .../spark/sql/execution/exchange/ShuffleExchange.scala|  2 +-
 .../org/apache/spark/sql/hive/orc/OrcQuerySuite.scala | 10 +-
 .../spark/streaming/receiver/ReceivedBlockHandler.scala   |  2 +-
 6 files changed, 11 insertions(+), 11 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/865ec32d/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
index f924efe..3cc7a1a 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
@@ -105,7 +105,7 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] 
extends Product {
*/
   def find(f: BaseType => Boolean): Option[BaseType] = f(this) match {
 case true => Some(this)
-case false => children.foldLeft(None: Option[BaseType]) { (l, r) => 
l.orElse(r.find(f)) }
+case false => children.foldLeft(Option.empty[BaseType]) { (l, r) => 
l.orElse(r.find(f)) }
   }
 
   /**
@@ -165,7 +165,7 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] 
extends Product {
   def collectFirst[B](pf: PartialFunction[BaseType, B]): Option[B] = {
 val lifted = pf.lift
 lifted(this).orElse {
-  children.foldLeft(None: Option[B]) { (l, r) => 
l.orElse(r.collectFirst(pf)) }
+  children.foldLeft(Option.empty[B]) { (l, r) => 
l.orElse(r.collectFirst(pf)) }
 }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/865ec32d/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
--
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index 78b74f9..1c2003c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -503,7 +503,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) 
{
   private def insertInto(tableIdent: TableIdentifier): Unit = {
 assertNotBucketed("insertInto")
 assertNotStreaming("insertInto() can only be called on non-continuous 
queries")
-val partitions = normalizedParCols.map(_.map(col => col -> (None: 
Option[String])).toMap)
+val partitions = normalizedParCols.map(_.map(col => col -> 
(Option.empty[String])).toMap)
 val overwrite = mode == SaveMode.Overwrite
 
 df.sparkSession.sessionState.executePlan(

http://git-wip-us.apache.org/repos/asf/spark/blob/865ec32d/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
index 66753fa..865e406 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
@@ -169,7 +169,7 @@ case class CreateDataSourceTableAsSelectCommand(
 options
   }
 
-var existingSchema = None: Option[StructType]
+var existingSchema = Option.empty[StructType]
 if (sparkSession.sessionState.catalog.tableExists(tableIdent)) {
   // Check if we need to throw an exception or just return.
   mode match {

http://git-wip-us.apache.org

spark git commit: [MINOR][X][X] Replace all occurrences of None: Option with Option.empty

2016-06-10 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/branch-2.0 f15d641e2 -> 96bb1476c


[MINOR][X][X] Replace all occurrences of None: Option with Option.empty

## What changes were proposed in this pull request?
Replace all occurrences of `None: Option[X]` with `Option.empty[X]`

## How was this patch tested?
Exisiting Tests

Author: Sandeep Singh 

Closes #13591 from techaddict/minor-7.

(cherry picked from commit 865ec32dd997e63aea01a871d1c7b4947f43c111)
Signed-off-by: Reynold Xin 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/96bb1476
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/96bb1476
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/96bb1476

Branch: refs/heads/branch-2.0
Commit: 96bb1476ce884168f232d3e63aa21b5f7dba474f
Parents: f15d641
Author: Sandeep Singh 
Authored: Fri Jun 10 13:06:51 2016 -0700
Committer: Reynold Xin 
Committed: Fri Jun 10 13:06:57 2016 -0700

--
 .../org/apache/spark/sql/catalyst/trees/TreeNode.scala|  4 ++--
 .../main/scala/org/apache/spark/sql/DataFrameWriter.scala |  2 +-
 .../sql/execution/command/createDataSourceTables.scala|  2 +-
 .../spark/sql/execution/exchange/ShuffleExchange.scala|  2 +-
 .../org/apache/spark/sql/hive/orc/OrcQuerySuite.scala | 10 +-
 .../spark/streaming/receiver/ReceivedBlockHandler.scala   |  2 +-
 6 files changed, 11 insertions(+), 11 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/96bb1476/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
index f924efe..3cc7a1a 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
@@ -105,7 +105,7 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] 
extends Product {
*/
   def find(f: BaseType => Boolean): Option[BaseType] = f(this) match {
 case true => Some(this)
-case false => children.foldLeft(None: Option[BaseType]) { (l, r) => 
l.orElse(r.find(f)) }
+case false => children.foldLeft(Option.empty[BaseType]) { (l, r) => 
l.orElse(r.find(f)) }
   }
 
   /**
@@ -165,7 +165,7 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] 
extends Product {
   def collectFirst[B](pf: PartialFunction[BaseType, B]): Option[B] = {
 val lifted = pf.lift
 lifted(this).orElse {
-  children.foldLeft(None: Option[B]) { (l, r) => 
l.orElse(r.collectFirst(pf)) }
+  children.foldLeft(Option.empty[B]) { (l, r) => 
l.orElse(r.collectFirst(pf)) }
 }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/96bb1476/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
--
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index 78b74f9..1c2003c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -503,7 +503,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) 
{
   private def insertInto(tableIdent: TableIdentifier): Unit = {
 assertNotBucketed("insertInto")
 assertNotStreaming("insertInto() can only be called on non-continuous 
queries")
-val partitions = normalizedParCols.map(_.map(col => col -> (None: 
Option[String])).toMap)
+val partitions = normalizedParCols.map(_.map(col => col -> 
(Option.empty[String])).toMap)
 val overwrite = mode == SaveMode.Overwrite
 
 df.sparkSession.sessionState.executePlan(

http://git-wip-us.apache.org/repos/asf/spark/blob/96bb1476/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
index 66753fa..865e406 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
@@ -169,7 +169,7 @@ case class CreateDataSourceTableAsSelectCommand(
 options
   }
 
-var existingSchema = None: Option[StructType]
+var existingSchema = Option.empty[StructType]
 if (sparkSession.sessionState.catalog.tableExists(tableIdent)) {
   /

spark git commit: [SPARK-15871][SQL] Add `assertNotPartitioned` check in `DataFrameWriter`

2016-06-10 Thread zsxwing
Repository: spark
Updated Branches:
  refs/heads/branch-2.0 c1390ccbb -> f15d641e2


[SPARK-15871][SQL] Add `assertNotPartitioned` check in `DataFrameWriter`

## What changes were proposed in this pull request?

It doesn't make sense to specify partitioning parameters, when we write data 
out from Datasets/DataFrames into `jdbc` tables or streaming `ForeachWriter`s.

This patch adds `assertNotPartitioned` check in `DataFrameWriter`.



operation
should check not partitioned?


mode



outputMode



trigger



format



option/options



partitionBy



bucketBy



sortBy



save



queryName



startStream



foreach
yes


insertInto



saveAsTable



jdbc
yes


json



parquet



orc



text



csv




## How was this patch tested?

New dedicated tests.

Author: Liwei Lin 

Closes #13597 from lw-lin/add-assertNotPartitioned.

(cherry picked from commit fb219029dd1b8d2783c3e202361401048296595c)
Signed-off-by: Shixiong Zhu 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f15d641e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f15d641e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f15d641e

Branch: refs/heads/branch-2.0
Commit: f15d641e297d425a8c1b4ba6c93f4f98a3f70d0f
Parents: c1390cc
Author: Liwei Lin 
Authored: Fri Jun 10 13:01:29 2016 -0700
Committer: Shixiong Zhu 
Committed: Fri Jun 10 13:01:37 2016 -0700

--
 .../org/apache/spark/sql/DataFrameWriter.scala  | 12 +-
 .../test/DataFrameReaderWriterSuite.scala   | 42 ++--
 .../spark/sql/sources/BucketedWriteSuite.scala  |  8 ++--
 3 files changed, 52 insertions(+), 10 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/f15d641e/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
--
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index 6ce59e8..78b74f9 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -432,6 +432,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) 
{
*/
   @Experimental
   def foreach(writer: ForeachWriter[T]): ContinuousQuery = {
+assertNotPartitioned("foreach")
 assertNotBucketed("foreach")
 assertStreaming(
   "foreach() can only be called on streaming Datasets/DataFrames.")
@@ -562,8 +563,13 @@ final class DataFrameWriter[T] private[sql](ds: 
Dataset[T]) {
 
   private def assertNotBucketed(operation: String): Unit = {
 if (numBuckets.isDefined || sortColumnNames.isDefined) {
-  throw new IllegalArgumentException(
-s"'$operation' does not support bucketing right now.")
+  throw new AnalysisException(s"'$operation' does not support bucketing 
right now")
+}
+  }
+
+  private def assertNotPartitioned(operation: String): Unit = {
+if (partitioningColumns.isDefined) {
+  throw new AnalysisException( s"'$operation' does not support 
partitioning")
 }
   }
 
@@ -646,6 +652,8 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) 
{
* @since 1.4.0
*/
   def jdbc(url: String, table: String, connectionProperties: Properties): Unit 
= {
+assertNotPartitioned("jdbc")
+assertNotBucketed("jdbc")
 assertNotStreaming("jdbc() can only be called on non-continuous queries")
 
 val props = new Properties()

http://git-wip-us.apache.org/repos/asf/spark/blob/f15d641e/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataFrameReaderWriterSuite.scala
--
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataFrameReaderWriterSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataFrameReaderWriterSuite.scala
index bf6063a..6e0d66a 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataFrameReaderWriterSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataFrameReaderWriterSuite.scala
@@ -455,8 +455,8 @@ class DataFrameReaderWriterSuite extends StreamTest with 
BeforeAndAfter {
   .format("org.apache.spark.sql.streaming.test")
   .stream()
 val w = df.write
-val e = intercept[IllegalArgumentException](w.bucketBy(1, 
"text").startStream())
-assert(e.getMessage == "'startStream' does not support 

spark git commit: [SPARK-15871][SQL] Add `assertNotPartitioned` check in `DataFrameWriter`

2016-06-10 Thread zsxwing
Repository: spark
Updated Branches:
  refs/heads/master 5c16ad0d5 -> fb219029d


[SPARK-15871][SQL] Add `assertNotPartitioned` check in `DataFrameWriter`

## What changes were proposed in this pull request?

It doesn't make sense to specify partitioning parameters, when we write data 
out from Datasets/DataFrames into `jdbc` tables or streaming `ForeachWriter`s.

This patch adds `assertNotPartitioned` check in `DataFrameWriter`.



operation
should check not partitioned?


mode



outputMode



trigger



format



option/options



partitionBy



bucketBy



sortBy



save



queryName



startStream



foreach
yes


insertInto



saveAsTable



jdbc
yes


json



parquet



orc



text



csv




## How was this patch tested?

New dedicated tests.

Author: Liwei Lin 

Closes #13597 from lw-lin/add-assertNotPartitioned.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fb219029
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fb219029
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fb219029

Branch: refs/heads/master
Commit: fb219029dd1b8d2783c3e202361401048296595c
Parents: 5c16ad0
Author: Liwei Lin 
Authored: Fri Jun 10 13:01:29 2016 -0700
Committer: Shixiong Zhu 
Committed: Fri Jun 10 13:01:29 2016 -0700

--
 .../org/apache/spark/sql/DataFrameWriter.scala  | 12 +-
 .../test/DataFrameReaderWriterSuite.scala   | 42 ++--
 .../spark/sql/sources/BucketedWriteSuite.scala  |  8 ++--
 3 files changed, 52 insertions(+), 10 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/fb219029/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
--
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index 6ce59e8..78b74f9 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -432,6 +432,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) 
{
*/
   @Experimental
   def foreach(writer: ForeachWriter[T]): ContinuousQuery = {
+assertNotPartitioned("foreach")
 assertNotBucketed("foreach")
 assertStreaming(
   "foreach() can only be called on streaming Datasets/DataFrames.")
@@ -562,8 +563,13 @@ final class DataFrameWriter[T] private[sql](ds: 
Dataset[T]) {
 
   private def assertNotBucketed(operation: String): Unit = {
 if (numBuckets.isDefined || sortColumnNames.isDefined) {
-  throw new IllegalArgumentException(
-s"'$operation' does not support bucketing right now.")
+  throw new AnalysisException(s"'$operation' does not support bucketing 
right now")
+}
+  }
+
+  private def assertNotPartitioned(operation: String): Unit = {
+if (partitioningColumns.isDefined) {
+  throw new AnalysisException( s"'$operation' does not support 
partitioning")
 }
   }
 
@@ -646,6 +652,8 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) 
{
* @since 1.4.0
*/
   def jdbc(url: String, table: String, connectionProperties: Properties): Unit 
= {
+assertNotPartitioned("jdbc")
+assertNotBucketed("jdbc")
 assertNotStreaming("jdbc() can only be called on non-continuous queries")
 
 val props = new Properties()

http://git-wip-us.apache.org/repos/asf/spark/blob/fb219029/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataFrameReaderWriterSuite.scala
--
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataFrameReaderWriterSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataFrameReaderWriterSuite.scala
index bf6063a..6e0d66a 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataFrameReaderWriterSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataFrameReaderWriterSuite.scala
@@ -455,8 +455,8 @@ class DataFrameReaderWriterSuite extends StreamTest with 
BeforeAndAfter {
   .format("org.apache.spark.sql.streaming.test")
   .stream()
 val w = df.write
-val e = intercept[IllegalArgumentException](w.bucketBy(1, 
"text").startStream())
-assert(e.getMessage == "'startStream' does not support bucketing right 
now.")
+val e = intercept[AnalysisException](w.bucketBy(1, "text").startStream())
+   

spark git commit: [SPARK-15766][SPARKR] R should export is.nan

2016-06-10 Thread shivaram
Repository: spark
Updated Branches:
  refs/heads/branch-2.0 8dd82f8de -> f895d6d85


[SPARK-15766][SPARKR] R should export is.nan

## What changes were proposed in this pull request?

When reviewing SPARK-15545, we found that is.nan is not exported, which should 
be exported.

Add it to the NAMESPACE.

## How was this patch tested?

Manual tests.

Author: wm...@hotmail.com 

Closes #13508 from wangmiao1981/unused.

(cherry picked from commit 2c8f40cea113b597fbaf1cdd80a5b8bdd66155fb)
Signed-off-by: Shivaram Venkataraman 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f895d6d8
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f895d6d8
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f895d6d8

Branch: refs/heads/branch-2.0
Commit: f895d6d859bc3b259abe8bc39cf8367e3e72a243
Parents: 8dd82f8
Author: wm...@hotmail.com 
Authored: Fri Jun 10 12:46:22 2016 -0700
Committer: Shivaram Venkataraman 
Committed: Fri Jun 10 12:46:31 2016 -0700

--
 R/pkg/NAMESPACE | 2 ++
 1 file changed, 2 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/f895d6d8/R/pkg/NAMESPACE
--
diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE
index 239ad06..ba386da 100644
--- a/R/pkg/NAMESPACE
+++ b/R/pkg/NAMESPACE
@@ -185,6 +185,8 @@ exportMethods("%in%",
   "isNaN",
   "isNotNull",
   "isNull",
+  "is.nan",
+  "isnan",
   "kurtosis",
   "lag",
   "last",


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: Revert [SPARK-14485][CORE] ignore task finished for executor lost

2016-06-10 Thread kayousterhout
Repository: spark
Updated Branches:
  refs/heads/branch-2.0 f895d6d85 -> c1390ccbb


Revert [SPARK-14485][CORE] ignore task finished for executor lost

This reverts commit 695dbc816a6d70289abeb145cb62ff4e62b3f49b.

This change is being reverted because it hurts performance of some jobs, and
only helps in a narrow set of cases.  For more discussion, refer to the JIRA.

Author: Kay Ousterhout 

Closes #13580 from kayousterhout/revert-SPARK-14485.

(cherry picked from commit 5c16ad0d522e5124a6977533077afb7b38fc42a1)
Signed-off-by: Kay Ousterhout 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c1390ccb
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c1390ccb
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c1390ccb

Branch: refs/heads/branch-2.0
Commit: c1390ccbb2968156245e267e6c5cd2a27f7d0121
Parents: f895d6d
Author: Kay Ousterhout 
Authored: Fri Jun 10 12:50:27 2016 -0700
Committer: Kay Ousterhout 
Committed: Fri Jun 10 12:51:29 2016 -0700

--
 .../apache/spark/scheduler/TaskSchedulerImpl.scala| 14 +-
 1 file changed, 1 insertion(+), 13 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/c1390ccb/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala 
b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
index d6f58e4..01e85ca 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
@@ -346,11 +346,9 @@ private[spark] class TaskSchedulerImpl(
 }
 taskIdToTaskSetManager.get(tid) match {
   case Some(taskSet) =>
-var executorId: String = null
 if (TaskState.isFinished(state)) {
   taskIdToTaskSetManager.remove(tid)
   taskIdToExecutorId.remove(tid).foreach { execId =>
-executorId = execId
 if (executorIdToTaskCount.contains(execId)) {
   executorIdToTaskCount(execId) -= 1
 }
@@ -358,17 +356,7 @@ private[spark] class TaskSchedulerImpl(
 }
 if (state == TaskState.FINISHED) {
   taskSet.removeRunningTask(tid)
-  // In some case, executor has already been removed by driver for 
heartbeats timeout,
-  // but at sometime, before executor killed by cluster, the task 
of running on this
-  // executor is finished and return task success state to driver. 
However, this kinds
-  // of task should be ignored, because the task on this executor 
is already re-queued
-  // by driver. For more details, can check in SPARK-14485.
-  if (executorId != null && 
!executorIdToTaskCount.contains(executorId)) {
-logInfo(s"Ignoring update with state $state for TID $tid 
because its executor " +
-  s"has already been removed by driver")
-  } else {
-taskResultGetter.enqueueSuccessfulTask(taskSet, tid, 
serializedData)
-  }
+  taskResultGetter.enqueueSuccessfulTask(taskSet, tid, 
serializedData)
 } else if (Set(TaskState.FAILED, TaskState.KILLED, 
TaskState.LOST).contains(state)) {
   taskSet.removeRunningTask(tid)
   taskResultGetter.enqueueFailedTask(taskSet, tid, state, 
serializedData)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: Revert [SPARK-14485][CORE] ignore task finished for executor lost

2016-06-10 Thread kayousterhout
Repository: spark
Updated Branches:
  refs/heads/master 2c8f40cea -> 5c16ad0d5


Revert [SPARK-14485][CORE] ignore task finished for executor lost

This reverts commit 695dbc816a6d70289abeb145cb62ff4e62b3f49b.

This change is being reverted because it hurts performance of some jobs, and
only helps in a narrow set of cases.  For more discussion, refer to the JIRA.

Author: Kay Ousterhout 

Closes #13580 from kayousterhout/revert-SPARK-14485.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5c16ad0d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5c16ad0d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5c16ad0d

Branch: refs/heads/master
Commit: 5c16ad0d522e5124a6977533077afb7b38fc42a1
Parents: 2c8f40c
Author: Kay Ousterhout 
Authored: Fri Jun 10 12:50:27 2016 -0700
Committer: Kay Ousterhout 
Committed: Fri Jun 10 12:50:50 2016 -0700

--
 .../apache/spark/scheduler/TaskSchedulerImpl.scala| 14 +-
 1 file changed, 1 insertion(+), 13 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/5c16ad0d/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala 
b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
index 8e1d957..c3adc28 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
@@ -352,11 +352,9 @@ private[spark] class TaskSchedulerImpl(
 }
 taskIdToTaskSetManager.get(tid) match {
   case Some(taskSet) =>
-var executorId: String = null
 if (TaskState.isFinished(state)) {
   taskIdToTaskSetManager.remove(tid)
   taskIdToExecutorId.remove(tid).foreach { execId =>
-executorId = execId
 if (executorIdToTaskCount.contains(execId)) {
   executorIdToTaskCount(execId) -= 1
 }
@@ -364,17 +362,7 @@ private[spark] class TaskSchedulerImpl(
 }
 if (state == TaskState.FINISHED) {
   taskSet.removeRunningTask(tid)
-  // In some case, executor has already been removed by driver for 
heartbeats timeout,
-  // but at sometime, before executor killed by cluster, the task 
of running on this
-  // executor is finished and return task success state to driver. 
However, this kinds
-  // of task should be ignored, because the task on this executor 
is already re-queued
-  // by driver. For more details, can check in SPARK-14485.
-  if (executorId != null && 
!executorIdToTaskCount.contains(executorId)) {
-logInfo(s"Ignoring update with state $state for TID $tid 
because its executor " +
-  s"has already been removed by driver")
-  } else {
-taskResultGetter.enqueueSuccessfulTask(taskSet, tid, 
serializedData)
-  }
+  taskResultGetter.enqueueSuccessfulTask(taskSet, tid, 
serializedData)
 } else if (Set(TaskState.FAILED, TaskState.KILLED, 
TaskState.LOST).contains(state)) {
   taskSet.removeRunningTask(tid)
   taskResultGetter.enqueueFailedTask(taskSet, tid, state, 
serializedData)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-15766][SPARKR] R should export is.nan

2016-06-10 Thread shivaram
Repository: spark
Updated Branches:
  refs/heads/master 2413fce9d -> 2c8f40cea


[SPARK-15766][SPARKR] R should export is.nan

## What changes were proposed in this pull request?

When reviewing SPARK-15545, we found that is.nan is not exported, which should 
be exported.

Add it to the NAMESPACE.

## How was this patch tested?

Manual tests.

Author: wm...@hotmail.com 

Closes #13508 from wangmiao1981/unused.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2c8f40ce
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2c8f40ce
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2c8f40ce

Branch: refs/heads/master
Commit: 2c8f40cea113b597fbaf1cdd80a5b8bdd66155fb
Parents: 2413fce
Author: wm...@hotmail.com 
Authored: Fri Jun 10 12:46:22 2016 -0700
Committer: Shivaram Venkataraman 
Committed: Fri Jun 10 12:46:22 2016 -0700

--
 R/pkg/NAMESPACE | 2 ++
 1 file changed, 2 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/2c8f40ce/R/pkg/NAMESPACE
--
diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE
index 239ad06..ba386da 100644
--- a/R/pkg/NAMESPACE
+++ b/R/pkg/NAMESPACE
@@ -185,6 +185,8 @@ exportMethods("%in%",
   "isNaN",
   "isNotNull",
   "isNull",
+  "is.nan",
+  "isnan",
   "kurtosis",
   "lag",
   "last",


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-15743][SQL] Prevent saving with all-column partitioning

2016-06-10 Thread marmbrus
Repository: spark
Updated Branches:
  refs/heads/master 7d7a0a5e0 -> 2413fce9d


[SPARK-15743][SQL] Prevent saving with all-column partitioning

## What changes were proposed in this pull request?

When saving datasets on storage, `partitionBy` provides an easy way to 
construct the directory structure. However, if a user choose all columns as 
partition columns, some exceptions occurs.

- **ORC with all column partitioning**: `AnalysisException` on **future read** 
due to schema inference failure.
 ```scala
scala> 
spark.range(10).write.format("orc").mode("overwrite").partitionBy("id").save("/tmp/data")

scala> spark.read.format("orc").load("/tmp/data").collect()
org.apache.spark.sql.AnalysisException: Unable to infer schema for ORC at 
/tmp/data. It must be specified manually;
```

- **Parquet with all-column partitioning**: `InvalidSchemaException` on **write 
execution** due to Parquet limitation.
 ```scala
scala> 
spark.range(100).write.format("parquet").mode("overwrite").partitionBy("id").save("/tmp/data")
[Stage 0:>  (0 + 8) / 
8]16/06/02 16:51:17
ERROR Utils: Aborting task
org.apache.parquet.schema.InvalidSchemaException: A group type can not be 
empty. Parquet does not support empty group without leaves. Empty group: 
spark_schema
... (lots of error messages)
```

Although some formats like JSON support all-column partitioning without any 
problem, it seems not a good idea to make lots of empty directories.

This PR prevents saving with all-column partitioning by consistently raising 
`AnalysisException` before executing save operation.

## How was this patch tested?

Newly added `PartitioningUtilsSuite`.

Author: Dongjoon Hyun 

Closes #13486 from dongjoon-hyun/SPARK-15743.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2413fce9
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2413fce9
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2413fce9

Branch: refs/heads/master
Commit: 2413fce9d6812a91eeffb4435c2b5b361d23214b
Parents: 7d7a0a5
Author: Dongjoon Hyun 
Authored: Fri Jun 10 12:43:27 2016 -0700
Committer: Michael Armbrust 
Committed: Fri Jun 10 12:43:27 2016 -0700

--
 .../sql/execution/datasources/DataSource.scala  | 32 ++--
 .../datasources/PartitioningUtils.scala |  8 +++--
 .../spark/sql/execution/datasources/rules.scala |  4 +--
 .../execution/streaming/FileStreamSink.scala|  2 +-
 .../test/DataFrameReaderWriterSuite.scala   | 12 
 5 files changed, 37 insertions(+), 21 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/2413fce9/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
index 5f17fdf..d327302 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
@@ -1,19 +1,19 @@
 /*
-* Licensed to the Apache Software Foundation (ASF) under one or more
-* contributor license agreements.  See the NOTICE file distributed with
-* this work for additional information regarding copyright ownership.
-* The ASF licenses this file to You under the Apache License, Version 2.0
-* (the "License"); you may not use this file except in compliance with
-* the License.  You may obtain a copy of the License at
-*
-*http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the L

spark git commit: [SPARK-15743][SQL] Prevent saving with all-column partitioning

2016-06-10 Thread marmbrus
Repository: spark
Updated Branches:
  refs/heads/branch-2.0 935b6e0e4 -> 8dd82f8de


[SPARK-15743][SQL] Prevent saving with all-column partitioning

## What changes were proposed in this pull request?

When saving datasets on storage, `partitionBy` provides an easy way to 
construct the directory structure. However, if a user choose all columns as 
partition columns, some exceptions occurs.

- **ORC with all column partitioning**: `AnalysisException` on **future read** 
due to schema inference failure.
 ```scala
scala> 
spark.range(10).write.format("orc").mode("overwrite").partitionBy("id").save("/tmp/data")

scala> spark.read.format("orc").load("/tmp/data").collect()
org.apache.spark.sql.AnalysisException: Unable to infer schema for ORC at 
/tmp/data. It must be specified manually;
```

- **Parquet with all-column partitioning**: `InvalidSchemaException` on **write 
execution** due to Parquet limitation.
 ```scala
scala> 
spark.range(100).write.format("parquet").mode("overwrite").partitionBy("id").save("/tmp/data")
[Stage 0:>  (0 + 8) / 
8]16/06/02 16:51:17
ERROR Utils: Aborting task
org.apache.parquet.schema.InvalidSchemaException: A group type can not be 
empty. Parquet does not support empty group without leaves. Empty group: 
spark_schema
... (lots of error messages)
```

Although some formats like JSON support all-column partitioning without any 
problem, it seems not a good idea to make lots of empty directories.

This PR prevents saving with all-column partitioning by consistently raising 
`AnalysisException` before executing save operation.

## How was this patch tested?

Newly added `PartitioningUtilsSuite`.

Author: Dongjoon Hyun 

Closes #13486 from dongjoon-hyun/SPARK-15743.

(cherry picked from commit 2413fce9d6812a91eeffb4435c2b5b361d23214b)
Signed-off-by: Michael Armbrust 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8dd82f8d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8dd82f8d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8dd82f8d

Branch: refs/heads/branch-2.0
Commit: 8dd82f8de40a9ef54ef147f1acfb54a40d270c67
Parents: 935b6e0
Author: Dongjoon Hyun 
Authored: Fri Jun 10 12:43:27 2016 -0700
Committer: Michael Armbrust 
Committed: Fri Jun 10 12:43:40 2016 -0700

--
 .../sql/execution/datasources/DataSource.scala  | 32 ++--
 .../datasources/PartitioningUtils.scala |  8 +++--
 .../spark/sql/execution/datasources/rules.scala |  4 +--
 .../execution/streaming/FileStreamSink.scala|  2 +-
 .../test/DataFrameReaderWriterSuite.scala   | 12 
 5 files changed, 37 insertions(+), 21 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/8dd82f8d/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
index 5f17fdf..d327302 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
@@ -1,19 +1,19 @@
 /*
-* Licensed to the Apache Software Foundation (ASF) under one or more
-* contributor license agreements.  See the NOTICE file distributed with
-* this work for additional information regarding copyright ownership.
-* The ASF licenses this file to You under the Apache License, Version 2.0
-* (the "License"); you may not use this file except in compliance with
-* the License.  You may obtain a copy of the License at
-*
-*http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express 

spark git commit: [SPARK-15738][PYSPARK][ML] Adding Pyspark ml RFormula __str__ method similar to Scala API

2016-06-10 Thread yliang
Repository: spark
Updated Branches:
  refs/heads/master 254bc8c34 -> 7d7a0a5e0


[SPARK-15738][PYSPARK][ML] Adding Pyspark ml RFormula __str__ method similar to 
Scala API

## What changes were proposed in this pull request?
Adding __str__ to RFormula and model that will show the set formula param and 
resolved formula.  This is currently present in the Scala API, found missing in 
PySpark during Spark 2.0 coverage review.

## How was this patch tested?
run pyspark-ml tests locally

Author: Bryan Cutler 

Closes #13481 from BryanCutler/pyspark-ml-rformula_str-SPARK-15738.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7d7a0a5e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7d7a0a5e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7d7a0a5e

Branch: refs/heads/master
Commit: 7d7a0a5e0749909e97d90188707cc9220a1bb73a
Parents: 254bc8c
Author: Bryan Cutler 
Authored: Fri Jun 10 11:27:30 2016 -0700
Committer: Yanbo Liang 
Committed: Fri Jun 10 11:27:30 2016 -0700

--
 .../scala/org/apache/spark/ml/feature/RFormula.scala  |  2 +-
 .../org/apache/spark/ml/feature/RFormulaParser.scala  | 14 +-
 python/pyspark/ml/feature.py  | 12 
 3 files changed, 26 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/7d7a0a5e/mllib/src/main/scala/org/apache/spark/ml/feature/RFormula.scala
--
diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/RFormula.scala 
b/mllib/src/main/scala/org/apache/spark/ml/feature/RFormula.scala
index 2916b6d..a7ca0fe 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/feature/RFormula.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/feature/RFormula.scala
@@ -182,7 +182,7 @@ class RFormula(override val uid: String)
 
   override def copy(extra: ParamMap): RFormula = defaultCopy(extra)
 
-  override def toString: String = s"RFormula(${get(formula)}) (uid=$uid)"
+  override def toString: String = s"RFormula(${get(formula).getOrElse("")}) 
(uid=$uid)"
 }
 
 @Since("2.0.0")

http://git-wip-us.apache.org/repos/asf/spark/blob/7d7a0a5e/mllib/src/main/scala/org/apache/spark/ml/feature/RFormulaParser.scala
--
diff --git 
a/mllib/src/main/scala/org/apache/spark/ml/feature/RFormulaParser.scala 
b/mllib/src/main/scala/org/apache/spark/ml/feature/RFormulaParser.scala
index 19aecff..2dd565a 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/feature/RFormulaParser.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/feature/RFormulaParser.scala
@@ -126,7 +126,19 @@ private[ml] case class ParsedRFormula(label: ColumnRef, 
terms: Seq[Term]) {
  * @param hasIntercept whether the formula specifies fitting with an intercept.
  */
 private[ml] case class ResolvedRFormula(
-  label: String, terms: Seq[Seq[String]], hasIntercept: Boolean)
+  label: String, terms: Seq[Seq[String]], hasIntercept: Boolean) {
+
+  override def toString: String = {
+val ts = terms.map {
+  case t if t.length > 1 =>
+s"${t.mkString("{", ",", "}")}"
+  case t =>
+t.mkString
+}
+val termStr = ts.mkString("[", ",", "]")
+s"ResolvedRFormula(label=$label, terms=$termStr, 
hasIntercept=$hasIntercept)"
+  }
+}
 
 /**
  * R formula terms. See the R formula docs here for more information:

http://git-wip-us.apache.org/repos/asf/spark/blob/7d7a0a5e/python/pyspark/ml/feature.py
--
diff --git a/python/pyspark/ml/feature.py b/python/pyspark/ml/feature.py
index bfb2fb7..ca77ac3 100755
--- a/python/pyspark/ml/feature.py
+++ b/python/pyspark/ml/feature.py
@@ -2528,6 +2528,8 @@ class RFormula(JavaEstimator, HasFeaturesCol, 
HasLabelCol, JavaMLReadable, JavaM
 True
 >>> loadedRF.getLabelCol() == rf.getLabelCol()
 True
+>>> str(loadedRF)
+'RFormula(y ~ x + s) (uid=...)'
 >>> modelPath = temp_path + "/rFormulaModel"
 >>> model.save(modelPath)
 >>> loadedModel = RFormulaModel.load(modelPath)
@@ -2542,6 +2544,8 @@ class RFormula(JavaEstimator, HasFeaturesCol, 
HasLabelCol, JavaMLReadable, JavaM
 |0.0|0.0|  a|[0.0,1.0]|  0.0|
 +---+---+---+-+-+
 ...
+>>> str(loadedModel)
+'RFormulaModel(ResolvedRFormula(label=y, terms=[x,s], hasIntercept=true)) 
(uid=...)'
 
 .. versionadded:: 1.5.0
 """
@@ -2586,6 +2590,10 @@ class RFormula(JavaEstimator, HasFeaturesCol, 
HasLabelCol, JavaMLReadable, JavaM
 def _create_model(self, java_model):
 return RFormulaModel(java_model)
 
+def __str__(self):
+formulaStr = self.getFormula() if self.isDefined(self.formula) else ""
+return "RFormula(%s) (uid=%s)" % (formulaStr, self.uid)

svn commit: r1747764 - in /spark: css/ site/ site/css/ site/images/

2016-06-10 Thread matei
Author: matei
Date: Fri Jun 10 18:15:52 2016
New Revision: 1747764

URL: http://svn.apache.org/viewvc?rev=1747764&view=rev
Log:
CSS tweaks

Modified:
spark/css/custom.css
spark/site/css/custom.css
spark/site/documentation.html
spark/site/images/spark-logo-reverse.eps
spark/site/images/spark-logo-trademark.png
spark/site/images/spark-logo.eps
spark/site/images/spark-logo.png
spark/site/images/spark-runs-everywhere.png

Modified: spark/css/custom.css
URL: 
http://svn.apache.org/viewvc/spark/css/custom.css?rev=1747764&r1=1747763&r2=1747764&view=diff
==
--- spark/css/custom.css (original)
+++ spark/css/custom.css Fri Jun 10 18:15:52 2016
@@ -1,7 +1,8 @@
 .tagline {
   display: inline-block;
   color: rgb(47, 164, 231);
-  padding-bottom: 5px;
+  padding-bottom: 7px;
+  padding-left: 10px;
   font-style: italic;
 }
 
@@ -204,6 +205,7 @@ footer a {
   display: inline-block;
   font-weight: 200;
   color: #333;
+  padding-left: 9px;
 }
 
 .subproject {
@@ -212,7 +214,7 @@ footer a {
 
 @media (min-width: 768px) {
   .subproject {
-font-size: 45px;
+font-size: 46px;
   }
 }
 

Modified: spark/site/css/custom.css
URL: 
http://svn.apache.org/viewvc/spark/site/css/custom.css?rev=1747764&r1=1747763&r2=1747764&view=diff
==
--- spark/site/css/custom.css (original)
+++ spark/site/css/custom.css Fri Jun 10 18:15:52 2016
@@ -1,7 +1,8 @@
 .tagline {
   display: inline-block;
   color: rgb(47, 164, 231);
-  padding-bottom: 5px;
+  padding-bottom: 7px;
+  padding-left: 10px;
   font-style: italic;
 }
 
@@ -204,6 +205,7 @@ footer a {
   display: inline-block;
   font-weight: 200;
   color: #333;
+  padding-left: 9px;
 }
 
 .subproject {
@@ -212,7 +214,7 @@ footer a {
 
 @media (min-width: 768px) {
   .subproject {
-font-size: 45px;
+font-size: 46px;
   }
 }
 

Modified: spark/site/documentation.html
URL: 
http://svn.apache.org/viewvc/spark/site/documentation.html?rev=1747764&r1=1747763&r2=1747764&view=diff
==
--- spark/site/documentation.html (original)
+++ spark/site/documentation.html Fri Jun 10 18:15:52 2016
@@ -249,13 +249,12 @@
 
 
 Meetup Talk Videos
-In addition to the videos listed below, you can also view http://www.meetup.com/spark-users/files/";>all slides from Bay Area 
meetups here.
+In addition to the videos listed below, you can also view http://www.meetup.com/spark-users/files/";>all slides from Bay Area 
meetups here.
 
   .video-meta-info {
 font-size: 0.95em;
   }
-
-
+
 
   http://www.youtube.com/watch?v=NUQ-8to2XAk&list=PL-x35fyliRwiP3YteXbnhk0QGOtYLBT3a";>Spark
 1.0 and Beyond (http://files.meetup.com/3138542/Spark%201.0%20Meetup.ppt";>slides) 
by Patrick Wendell, at Cisco in San Jose, 
2014-04-23
 

Modified: spark/site/images/spark-logo-reverse.eps
URL: 
http://svn.apache.org/viewvc/spark/site/images/spark-logo-reverse.eps?rev=1747764&r1=1747763&r2=1747764&view=diff
==
Binary files - no diff available.

Modified: spark/site/images/spark-logo-trademark.png
URL: 
http://svn.apache.org/viewvc/spark/site/images/spark-logo-trademark.png?rev=1747764&r1=1747763&r2=1747764&view=diff
==
Binary files - no diff available.

Modified: spark/site/images/spark-logo.eps
URL: 
http://svn.apache.org/viewvc/spark/site/images/spark-logo.eps?rev=1747764&r1=1747763&r2=1747764&view=diff
==
Binary files - no diff available.

Modified: spark/site/images/spark-logo.png
URL: 
http://svn.apache.org/viewvc/spark/site/images/spark-logo.png?rev=1747764&r1=1747763&r2=1747764&view=diff
==
Binary files - no diff available.

Modified: spark/site/images/spark-runs-everywhere.png
URL: 
http://svn.apache.org/viewvc/spark/site/images/spark-runs-everywhere.png?rev=1747764&r1=1747763&r2=1747764&view=diff
==
Binary files - no diff available.



-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



svn commit: r1747763 - in /spark/images: spark-logo-reverse.eps spark-logo-trademark.png spark-logo.eps spark-logo.png spark-runs-everywhere.png

2016-06-10 Thread matei
Author: matei
Date: Fri Jun 10 18:15:35 2016
New Revision: 1747763

URL: http://svn.apache.org/viewvc?rev=1747763&view=rev
Log:
Version of logo with Apache

Modified:
spark/images/spark-logo-reverse.eps
spark/images/spark-logo-trademark.png
spark/images/spark-logo.eps
spark/images/spark-logo.png
spark/images/spark-runs-everywhere.png

Modified: spark/images/spark-logo-reverse.eps
URL: 
http://svn.apache.org/viewvc/spark/images/spark-logo-reverse.eps?rev=1747763&r1=1747762&r2=1747763&view=diff
==
Binary files - no diff available.

Modified: spark/images/spark-logo-trademark.png
URL: 
http://svn.apache.org/viewvc/spark/images/spark-logo-trademark.png?rev=1747763&r1=1747762&r2=1747763&view=diff
==
Binary files - no diff available.

Modified: spark/images/spark-logo.eps
URL: 
http://svn.apache.org/viewvc/spark/images/spark-logo.eps?rev=1747763&r1=1747762&r2=1747763&view=diff
==
Binary files - no diff available.

Modified: spark/images/spark-logo.png
URL: 
http://svn.apache.org/viewvc/spark/images/spark-logo.png?rev=1747763&r1=1747762&r2=1747763&view=diff
==
Binary files - no diff available.

Modified: spark/images/spark-runs-everywhere.png
URL: 
http://svn.apache.org/viewvc/spark/images/spark-runs-everywhere.png?rev=1747763&r1=1747762&r2=1747763&view=diff
==
Binary files - no diff available.



-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-15753][SQL] Move Analyzer stuff to Analyzer from DataFrameWriter

2016-06-10 Thread lian
Repository: spark
Updated Branches:
  refs/heads/branch-2.0 47c2a265f -> 55a837246


[SPARK-15753][SQL] Move Analyzer stuff to Analyzer from DataFrameWriter

## What changes were proposed in this pull request?

This patch moves some codes in `DataFrameWriter.insertInto` that belongs to 
`Analyzer`.

## How was this patch tested?
Existing tests.

Author: Liang-Chi Hsieh 

Closes #13496 from viirya/move-analyzer-stuff.

(cherry picked from commit 0ec279ffdf92853965e327a9f0f6956cacb7a23e)
Signed-off-by: Cheng Lian 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/55a83724
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/55a83724
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/55a83724

Branch: refs/heads/branch-2.0
Commit: 55a83724632aa54e49aedbab8ddd21d010eca26d
Parents: 47c2a26
Author: Liang-Chi Hsieh 
Authored: Fri Jun 10 11:05:04 2016 -0700
Committer: Cheng Lian 
Committed: Fri Jun 10 11:05:14 2016 -0700

--
 .../spark/sql/catalyst/analysis/Analyzer.scala | 17 ++---
 .../org/apache/spark/sql/DataFrameWriter.scala | 12 +---
 .../spark/sql/hive/execution/HiveQuerySuite.scala  |  4 ++--
 3 files changed, 17 insertions(+), 16 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/55a83724/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 4446140..a081357 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -452,6 +452,17 @@ class Analyzer(
 
 def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
   case i @ InsertIntoTable(u: UnresolvedRelation, parts, child, _, _) if 
child.resolved =>
+// A partitioned relation's schema can be different from the input 
logicalPlan, since
+// partition columns are all moved after data columns. We Project to 
adjust the ordering.
+val input = if (parts.nonEmpty) {
+  val (inputPartCols, inputDataCols) = child.output.partition { attr =>
+parts.contains(attr.name)
+  }
+  Project(inputDataCols ++ inputPartCols, child)
+} else {
+  child
+}
+
 val table = lookupTableFromCatalog(u)
 // adding the table's partitions or validate the query's partition info
 table match {
@@ -467,8 +478,8 @@ class Analyzer(
  |Requested partitions: ${parts.keys.mkString(",")}
  |Table partitions: 
${tablePartitionNames.mkString(",")}""".stripMargin)
   }
-  // Assume partition columns are correctly placed at the end of 
the child's output
-  i.copy(table = EliminateSubqueryAliases(table))
+  // Partition columns are already correctly placed at the end of 
the child's output
+  i.copy(table = EliminateSubqueryAliases(table), child = input)
 } else {
   // Set up the table's partition scheme with all dynamic 
partitions by moving partition
   // columns to the end of the column list, in partition order.
@@ -486,7 +497,7 @@ class Analyzer(
 child = Project(columns ++ partColumns, child))
 }
   case _ =>
-i.copy(table = EliminateSubqueryAliases(table))
+i.copy(table = EliminateSubqueryAliases(table), child = input)
 }
   case u: UnresolvedRelation =>
 val table = u.tableIdentifier

http://git-wip-us.apache.org/repos/asf/spark/blob/55a83724/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
--
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index 32e2fdc..6ce59e8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -505,21 +505,11 @@ final class DataFrameWriter[T] private[sql](ds: 
Dataset[T]) {
 val partitions = normalizedParCols.map(_.map(col => col -> (None: 
Option[String])).toMap)
 val overwrite = mode == SaveMode.Overwrite
 
-// A partitioned relation's schema can be different from the input 
logicalPlan, since
-// partition columns are all moved after data columns. We Project to 
adjust the ordering.
-// TODO: this belongs to the analyzer.
-val input = normalizedParCols.map {

spark git commit: [SPARK-15866] Rename listAccumulator collectionAccumulator

2016-06-10 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/master 0ec279ffd -> 254bc8c34


[SPARK-15866] Rename listAccumulator collectionAccumulator

## What changes were proposed in this pull request?
SparkContext.listAccumulator, by Spark's convention, makes it sound like "list" 
is a verb and the method should return a list of accumulators. This patch 
renames the method and the class collection accumulator.

## How was this patch tested?
Updated test case to reflect the names.

Author: Reynold Xin 

Closes #13594 from rxin/SPARK-15866.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/254bc8c3
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/254bc8c3
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/254bc8c3

Branch: refs/heads/master
Commit: 254bc8c34e70241508bdfc8ff42a65491f5280cd
Parents: 0ec279f
Author: Reynold Xin 
Authored: Fri Jun 10 11:08:39 2016 -0700
Committer: Reynold Xin 
Committed: Fri Jun 10 11:08:39 2016 -0700

--
 .../main/scala/org/apache/spark/SparkContext.scala  | 16 
 .../scala/org/apache/spark/util/AccumulatorV2.scala | 15 ++-
 .../org/apache/spark/util/AccumulatorV2Suite.scala  |  2 +-
 .../execution/columnar/InMemoryTableScanExec.scala  |  8 
 4 files changed, 23 insertions(+), 18 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/254bc8c3/core/src/main/scala/org/apache/spark/SparkContext.scala
--
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala 
b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 33b11ed..230fabd 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -1340,21 +1340,21 @@ class SparkContext(config: SparkConf) extends Logging 
with ExecutorAllocationCli
   }
 
   /**
-   * Create and register a list accumulator, which starts with empty list and 
accumulates inputs
-   * by adding them into the inner list.
+   * Create and register a [[CollectionAccumulator]], which starts with empty 
list and accumulates
+   * inputs by adding them into the list.
*/
-  def listAccumulator[T]: ListAccumulator[T] = {
-val acc = new ListAccumulator[T]
+  def collectionAccumulator[T]: CollectionAccumulator[T] = {
+val acc = new CollectionAccumulator[T]
 register(acc)
 acc
   }
 
   /**
-   * Create and register a list accumulator, which starts with empty list and 
accumulates inputs
-   * by adding them into the inner list.
+   * Create and register a [[CollectionAccumulator]], which starts with empty 
list and accumulates
+   * inputs by adding them into the list.
*/
-  def listAccumulator[T](name: String): ListAccumulator[T] = {
-val acc = new ListAccumulator[T]
+  def collectionAccumulator[T](name: String): CollectionAccumulator[T] = {
+val acc = new CollectionAccumulator[T]
 register(acc, name)
 acc
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/254bc8c3/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala
--
diff --git a/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala 
b/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala
index 0b9a47c..044dd69 100644
--- a/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala
+++ b/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala
@@ -415,15 +415,20 @@ class DoubleAccumulator extends AccumulatorV2[jl.Double, 
jl.Double] {
 }
 
 
-class ListAccumulator[T] extends AccumulatorV2[T, java.util.List[T]] {
+/**
+ * An [[AccumulatorV2 accumulator]] for collecting a list of elements.
+ *
+ * @since 2.0.0
+ */
+class CollectionAccumulator[T] extends AccumulatorV2[T, java.util.List[T]] {
   private val _list: java.util.List[T] = new ArrayList[T]
 
   override def isZero: Boolean = _list.isEmpty
 
-  override def copyAndReset(): ListAccumulator[T] = new ListAccumulator
+  override def copyAndReset(): CollectionAccumulator[T] = new 
CollectionAccumulator
 
-  override def copy(): ListAccumulator[T] = {
-val newAcc = new ListAccumulator[T]
+  override def copy(): CollectionAccumulator[T] = {
+val newAcc = new CollectionAccumulator[T]
 newAcc._list.addAll(_list)
 newAcc
   }
@@ -433,7 +438,7 @@ class ListAccumulator[T] extends AccumulatorV2[T, 
java.util.List[T]] {
   override def add(v: T): Unit = _list.add(v)
 
   override def merge(other: AccumulatorV2[T, java.util.List[T]]): Unit = other 
match {
-case o: ListAccumulator[T] => _list.addAll(o.value)
+case o: CollectionAccumulator[T] => _list.addAll(o.value)
 case _ => throw new UnsupportedOperationException(
   s"Cannot merge ${this.getClass.getName} with ${other.getClass.ge

spark git commit: [SPARK-15866] Rename listAccumulator collectionAccumulator

2016-06-10 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/branch-2.0 55a837246 -> 935b6e0e4


[SPARK-15866] Rename listAccumulator collectionAccumulator

## What changes were proposed in this pull request?
SparkContext.listAccumulator, by Spark's convention, makes it sound like "list" 
is a verb and the method should return a list of accumulators. This patch 
renames the method and the class collection accumulator.

## How was this patch tested?
Updated test case to reflect the names.

Author: Reynold Xin 

Closes #13594 from rxin/SPARK-15866.

(cherry picked from commit 254bc8c34e70241508bdfc8ff42a65491f5280cd)
Signed-off-by: Reynold Xin 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/935b6e0e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/935b6e0e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/935b6e0e

Branch: refs/heads/branch-2.0
Commit: 935b6e0e48e258f447622033b512f7ba5d83da69
Parents: 55a8372
Author: Reynold Xin 
Authored: Fri Jun 10 11:08:39 2016 -0700
Committer: Reynold Xin 
Committed: Fri Jun 10 11:08:47 2016 -0700

--
 .../main/scala/org/apache/spark/SparkContext.scala  | 16 
 .../scala/org/apache/spark/util/AccumulatorV2.scala | 15 ++-
 .../org/apache/spark/util/AccumulatorV2Suite.scala  |  2 +-
 .../execution/columnar/InMemoryTableScanExec.scala  |  8 
 4 files changed, 23 insertions(+), 18 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/935b6e0e/core/src/main/scala/org/apache/spark/SparkContext.scala
--
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala 
b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 33b11ed..230fabd 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -1340,21 +1340,21 @@ class SparkContext(config: SparkConf) extends Logging 
with ExecutorAllocationCli
   }
 
   /**
-   * Create and register a list accumulator, which starts with empty list and 
accumulates inputs
-   * by adding them into the inner list.
+   * Create and register a [[CollectionAccumulator]], which starts with empty 
list and accumulates
+   * inputs by adding them into the list.
*/
-  def listAccumulator[T]: ListAccumulator[T] = {
-val acc = new ListAccumulator[T]
+  def collectionAccumulator[T]: CollectionAccumulator[T] = {
+val acc = new CollectionAccumulator[T]
 register(acc)
 acc
   }
 
   /**
-   * Create and register a list accumulator, which starts with empty list and 
accumulates inputs
-   * by adding them into the inner list.
+   * Create and register a [[CollectionAccumulator]], which starts with empty 
list and accumulates
+   * inputs by adding them into the list.
*/
-  def listAccumulator[T](name: String): ListAccumulator[T] = {
-val acc = new ListAccumulator[T]
+  def collectionAccumulator[T](name: String): CollectionAccumulator[T] = {
+val acc = new CollectionAccumulator[T]
 register(acc, name)
 acc
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/935b6e0e/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala
--
diff --git a/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala 
b/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala
index 0b9a47c..044dd69 100644
--- a/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala
+++ b/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala
@@ -415,15 +415,20 @@ class DoubleAccumulator extends AccumulatorV2[jl.Double, 
jl.Double] {
 }
 
 
-class ListAccumulator[T] extends AccumulatorV2[T, java.util.List[T]] {
+/**
+ * An [[AccumulatorV2 accumulator]] for collecting a list of elements.
+ *
+ * @since 2.0.0
+ */
+class CollectionAccumulator[T] extends AccumulatorV2[T, java.util.List[T]] {
   private val _list: java.util.List[T] = new ArrayList[T]
 
   override def isZero: Boolean = _list.isEmpty
 
-  override def copyAndReset(): ListAccumulator[T] = new ListAccumulator
+  override def copyAndReset(): CollectionAccumulator[T] = new 
CollectionAccumulator
 
-  override def copy(): ListAccumulator[T] = {
-val newAcc = new ListAccumulator[T]
+  override def copy(): CollectionAccumulator[T] = {
+val newAcc = new CollectionAccumulator[T]
 newAcc._list.addAll(_list)
 newAcc
   }
@@ -433,7 +438,7 @@ class ListAccumulator[T] extends AccumulatorV2[T, 
java.util.List[T]] {
   override def add(v: T): Unit = _list.add(v)
 
   override def merge(other: AccumulatorV2[T, java.util.List[T]]): Unit = other 
match {
-case o: ListAccumulator[T] => _list.addAll(o.value)
+case o: CollectionAccumulator[T] => _list.addAll(o.value)
 case _ => throw 

spark git commit: [SPARK-15753][SQL] Move Analyzer stuff to Analyzer from DataFrameWriter

2016-06-10 Thread lian
Repository: spark
Updated Branches:
  refs/heads/master abdb5d42c -> 0ec279ffd


[SPARK-15753][SQL] Move Analyzer stuff to Analyzer from DataFrameWriter

## What changes were proposed in this pull request?

This patch moves some codes in `DataFrameWriter.insertInto` that belongs to 
`Analyzer`.

## How was this patch tested?
Existing tests.

Author: Liang-Chi Hsieh 

Closes #13496 from viirya/move-analyzer-stuff.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0ec279ff
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0ec279ff
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0ec279ff

Branch: refs/heads/master
Commit: 0ec279ffdf92853965e327a9f0f6956cacb7a23e
Parents: abdb5d4
Author: Liang-Chi Hsieh 
Authored: Fri Jun 10 11:05:04 2016 -0700
Committer: Cheng Lian 
Committed: Fri Jun 10 11:05:04 2016 -0700

--
 .../spark/sql/catalyst/analysis/Analyzer.scala | 17 ++---
 .../org/apache/spark/sql/DataFrameWriter.scala | 12 +---
 .../spark/sql/hive/execution/HiveQuerySuite.scala  |  4 ++--
 3 files changed, 17 insertions(+), 16 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/0ec279ff/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index d1ca99f..58f3904 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -452,6 +452,17 @@ class Analyzer(
 
 def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
   case i @ InsertIntoTable(u: UnresolvedRelation, parts, child, _, _) if 
child.resolved =>
+// A partitioned relation's schema can be different from the input 
logicalPlan, since
+// partition columns are all moved after data columns. We Project to 
adjust the ordering.
+val input = if (parts.nonEmpty) {
+  val (inputPartCols, inputDataCols) = child.output.partition { attr =>
+parts.contains(attr.name)
+  }
+  Project(inputDataCols ++ inputPartCols, child)
+} else {
+  child
+}
+
 val table = lookupTableFromCatalog(u)
 // adding the table's partitions or validate the query's partition info
 table match {
@@ -467,8 +478,8 @@ class Analyzer(
  |Requested partitions: ${parts.keys.mkString(",")}
  |Table partitions: 
${tablePartitionNames.mkString(",")}""".stripMargin)
   }
-  // Assume partition columns are correctly placed at the end of 
the child's output
-  i.copy(table = EliminateSubqueryAliases(table))
+  // Partition columns are already correctly placed at the end of 
the child's output
+  i.copy(table = EliminateSubqueryAliases(table), child = input)
 } else {
   // Set up the table's partition scheme with all dynamic 
partitions by moving partition
   // columns to the end of the column list, in partition order.
@@ -486,7 +497,7 @@ class Analyzer(
 child = Project(columns ++ partColumns, child))
 }
   case _ =>
-i.copy(table = EliminateSubqueryAliases(table))
+i.copy(table = EliminateSubqueryAliases(table), child = input)
 }
   case u: UnresolvedRelation =>
 val table = u.tableIdentifier

http://git-wip-us.apache.org/repos/asf/spark/blob/0ec279ff/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
--
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index 32e2fdc..6ce59e8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -505,21 +505,11 @@ final class DataFrameWriter[T] private[sql](ds: 
Dataset[T]) {
 val partitions = normalizedParCols.map(_.map(col => col -> (None: 
Option[String])).toMap)
 val overwrite = mode == SaveMode.Overwrite
 
-// A partitioned relation's schema can be different from the input 
logicalPlan, since
-// partition columns are all moved after data columns. We Project to 
adjust the ordering.
-// TODO: this belongs to the analyzer.
-val input = normalizedParCols.map { parCols =>
-  val (inputPartCols, inputDataCols) = df.logicalPlan.output.partition { 
attr =>
- 

spark git commit: [SPARK-15812][SQ][STREAMING] Added support for sorting after streaming aggregation with complete mode

2016-06-10 Thread tdas
Repository: spark
Updated Branches:
  refs/heads/branch-2.0 54b4763d2 -> 47c2a265f


[SPARK-15812][SQ][STREAMING] Added support for sorting after streaming 
aggregation with complete mode

## What changes were proposed in this pull request?

When the output mode is complete, then the output of a streaming aggregation 
essentially will contain the complete aggregates every time. So this is not 
different from a batch dataset within an incremental execution. Other 
non-streaming operations should be supported on this dataset. In this PR, I am 
just adding support for sorting, as it is a common useful functionality. 
Support for other operations will come later.

## How was this patch tested?
Additional unit tests.

Author: Tathagata Das 

Closes #13549 from tdas/SPARK-15812.

(cherry picked from commit abdb5d42c5802c8f60876aa1285c803d02881258)
Signed-off-by: Tathagata Das 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/47c2a265
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/47c2a265
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/47c2a265

Branch: refs/heads/branch-2.0
Commit: 47c2a265fbdb91cf5684f0d6342869ca08cb2d27
Parents: 54b4763
Author: Tathagata Das 
Authored: Fri Jun 10 10:48:28 2016 -0700
Committer: Tathagata Das 
Committed: Fri Jun 10 10:48:38 2016 -0700

--
 .../analysis/UnsupportedOperationChecker.scala  | 61 
 .../analysis/UnsupportedOperationsSuite.scala   | 17 +-
 .../apache/spark/sql/streaming/StreamTest.scala | 24 +---
 .../streaming/StreamingAggregationSuite.scala   | 25 
 4 files changed, 95 insertions(+), 32 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/47c2a265/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
index 8373fa3..689e016 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
@@ -43,6 +43,41 @@ object UnsupportedOperationChecker {
 "Queries without streaming sources cannot be executed with 
write.startStream()")(plan)
 }
 
+// Disallow multiple streaming aggregations
+val aggregates = plan.collect { case a@Aggregate(_, _, _) if a.isStreaming 
=> a }
+
+if (aggregates.size > 1) {
+  throwError(
+"Multiple streaming aggregations are not supported with " +
+  "streaming DataFrames/Datasets")(plan)
+}
+
+// Disallow some output mode
+outputMode match {
+  case InternalOutputModes.Append if aggregates.nonEmpty =>
+throwError(
+  s"$outputMode output mode not supported when there are streaming 
aggregations on " +
+s"streaming DataFrames/DataSets")(plan)
+
+  case InternalOutputModes.Complete | InternalOutputModes.Update if 
aggregates.isEmpty =>
+throwError(
+  s"$outputMode output mode not supported when there are no streaming 
aggregations on " +
+s"streaming DataFrames/Datasets")(plan)
+
+  case _ =>
+}
+
+/**
+ * Whether the subplan will contain complete data or incremental data in 
every incremental
+ * execution. Some operations may be allowed only when the child logical 
plan gives complete
+ * data.
+ */
+def containsCompleteData(subplan: LogicalPlan): Boolean = {
+  val aggs = plan.collect { case a@Aggregate(_, _, _) if a.isStreaming => 
a }
+  // Either the subplan has no streaming source, or it has aggregation 
with Complete mode
+  !subplan.isStreaming || (aggs.nonEmpty && outputMode == 
InternalOutputModes.Complete)
+}
+
 plan.foreachUp { implicit subPlan =>
 
   // Operations that cannot exists anywhere in a streaming plan
@@ -107,8 +142,9 @@ object UnsupportedOperationChecker {
 case GlobalLimit(_, _) | LocalLimit(_, _) if 
subPlan.children.forall(_.isStreaming) =>
   throwError("Limits are not supported on streaming 
DataFrames/Datasets")
 
-case Sort(_, _, _) | SortPartitions(_, _) if 
subPlan.children.forall(_.isStreaming) =>
-  throwError("Sorting is not supported on streaming 
DataFrames/Datasets")
+case Sort(_, _, _) | SortPartitions(_, _) if 
!containsCompleteData(subPlan) =>
+  throwError("Sorting is not supported on streaming 
DataFrames/Datasets, unless it is on" +
+"aggregated DataFrame/Dataset in Complete mode")
 
 case Sample(_, _, _, _

spark git commit: [SPARK-15812][SQ][STREAMING] Added support for sorting after streaming aggregation with complete mode

2016-06-10 Thread tdas
Repository: spark
Updated Branches:
  refs/heads/master cdd7f5a57 -> abdb5d42c


[SPARK-15812][SQ][STREAMING] Added support for sorting after streaming 
aggregation with complete mode

## What changes were proposed in this pull request?

When the output mode is complete, then the output of a streaming aggregation 
essentially will contain the complete aggregates every time. So this is not 
different from a batch dataset within an incremental execution. Other 
non-streaming operations should be supported on this dataset. In this PR, I am 
just adding support for sorting, as it is a common useful functionality. 
Support for other operations will come later.

## How was this patch tested?
Additional unit tests.

Author: Tathagata Das 

Closes #13549 from tdas/SPARK-15812.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/abdb5d42
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/abdb5d42
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/abdb5d42

Branch: refs/heads/master
Commit: abdb5d42c5802c8f60876aa1285c803d02881258
Parents: cdd7f5a
Author: Tathagata Das 
Authored: Fri Jun 10 10:48:28 2016 -0700
Committer: Tathagata Das 
Committed: Fri Jun 10 10:48:28 2016 -0700

--
 .../analysis/UnsupportedOperationChecker.scala  | 61 
 .../analysis/UnsupportedOperationsSuite.scala   | 17 +-
 .../apache/spark/sql/streaming/StreamTest.scala | 24 +---
 .../streaming/StreamingAggregationSuite.scala   | 25 
 4 files changed, 95 insertions(+), 32 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/abdb5d42/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
index 8373fa3..689e016 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
@@ -43,6 +43,41 @@ object UnsupportedOperationChecker {
 "Queries without streaming sources cannot be executed with 
write.startStream()")(plan)
 }
 
+// Disallow multiple streaming aggregations
+val aggregates = plan.collect { case a@Aggregate(_, _, _) if a.isStreaming 
=> a }
+
+if (aggregates.size > 1) {
+  throwError(
+"Multiple streaming aggregations are not supported with " +
+  "streaming DataFrames/Datasets")(plan)
+}
+
+// Disallow some output mode
+outputMode match {
+  case InternalOutputModes.Append if aggregates.nonEmpty =>
+throwError(
+  s"$outputMode output mode not supported when there are streaming 
aggregations on " +
+s"streaming DataFrames/DataSets")(plan)
+
+  case InternalOutputModes.Complete | InternalOutputModes.Update if 
aggregates.isEmpty =>
+throwError(
+  s"$outputMode output mode not supported when there are no streaming 
aggregations on " +
+s"streaming DataFrames/Datasets")(plan)
+
+  case _ =>
+}
+
+/**
+ * Whether the subplan will contain complete data or incremental data in 
every incremental
+ * execution. Some operations may be allowed only when the child logical 
plan gives complete
+ * data.
+ */
+def containsCompleteData(subplan: LogicalPlan): Boolean = {
+  val aggs = plan.collect { case a@Aggregate(_, _, _) if a.isStreaming => 
a }
+  // Either the subplan has no streaming source, or it has aggregation 
with Complete mode
+  !subplan.isStreaming || (aggs.nonEmpty && outputMode == 
InternalOutputModes.Complete)
+}
+
 plan.foreachUp { implicit subPlan =>
 
   // Operations that cannot exists anywhere in a streaming plan
@@ -107,8 +142,9 @@ object UnsupportedOperationChecker {
 case GlobalLimit(_, _) | LocalLimit(_, _) if 
subPlan.children.forall(_.isStreaming) =>
   throwError("Limits are not supported on streaming 
DataFrames/Datasets")
 
-case Sort(_, _, _) | SortPartitions(_, _) if 
subPlan.children.forall(_.isStreaming) =>
-  throwError("Sorting is not supported on streaming 
DataFrames/Datasets")
+case Sort(_, _, _) | SortPartitions(_, _) if 
!containsCompleteData(subPlan) =>
+  throwError("Sorting is not supported on streaming 
DataFrames/Datasets, unless it is on" +
+"aggregated DataFrame/Dataset in Complete mode")
 
 case Sample(_, _, _, _, child) if child.isStreaming =>
   throwError("Sampling is not supported on streaming 
DataFrames/D

spark git commit: [SPARK-15837][ML][PYSPARK] Word2vec python add maxsentence parameter

2016-06-10 Thread srowen
Repository: spark
Updated Branches:
  refs/heads/branch-2.0 6709ce1ae -> 54b4763d2


[SPARK-15837][ML][PYSPARK] Word2vec python add maxsentence parameter

## What changes were proposed in this pull request?

Word2vec python add maxsentence parameter.

## How was this patch tested?

Existing test.

Author: WeichenXu 

Closes #13578 from WeichenXu123/word2vec_python_add_maxsentence.

(cherry picked from commit cdd7f5a57a21d4a8f93456d149f65859c96190cf)
Signed-off-by: Sean Owen 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/54b4763d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/54b4763d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/54b4763d

Branch: refs/heads/branch-2.0
Commit: 54b4763d295d6aeab6105d0430470343dd4ca3a3
Parents: 6709ce1
Author: WeichenXu 
Authored: Fri Jun 10 12:26:53 2016 +0100
Committer: Sean Owen 
Committed: Fri Jun 10 12:27:04 2016 +0100

--
 python/pyspark/ml/feature.py | 29 -
 1 file changed, 24 insertions(+), 5 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/54b4763d/python/pyspark/ml/feature.py
--
diff --git a/python/pyspark/ml/feature.py b/python/pyspark/ml/feature.py
index ebe1300..bfb2fb7 100755
--- a/python/pyspark/ml/feature.py
+++ b/python/pyspark/ml/feature.py
@@ -2244,28 +2244,33 @@ class Word2Vec(JavaEstimator, HasStepSize, HasMaxIter, 
HasSeed, HasInputCol, Has
 windowSize = Param(Params._dummy(), "windowSize",
"the window size (context words from [-window, 
window]). Default value is 5",
typeConverter=TypeConverters.toInt)
+maxSentenceLength = Param(Params._dummy(), "maxSentenceLength",
+  "Maximum length (in words) of each sentence in 
the input data. " +
+  "Any sentence longer than this threshold will " +
+  "be divided into chunks up to the size.",
+  typeConverter=TypeConverters.toInt)
 
 @keyword_only
 def __init__(self, vectorSize=100, minCount=5, numPartitions=1, 
stepSize=0.025, maxIter=1,
- seed=None, inputCol=None, outputCol=None, windowSize=5):
+ seed=None, inputCol=None, outputCol=None, windowSize=5, 
maxSentenceLength=1000):
 """
 __init__(self, vectorSize=100, minCount=5, numPartitions=1, 
stepSize=0.025, maxIter=1, \
- seed=None, inputCol=None, outputCol=None, windowSize=5)
+ seed=None, inputCol=None, outputCol=None, windowSize=5, 
maxSentenceLength=1000)
 """
 super(Word2Vec, self).__init__()
 self._java_obj = 
self._new_java_obj("org.apache.spark.ml.feature.Word2Vec", self.uid)
 self._setDefault(vectorSize=100, minCount=5, numPartitions=1, 
stepSize=0.025, maxIter=1,
- seed=None, windowSize=5)
+ seed=None, windowSize=5, maxSentenceLength=1000)
 kwargs = self.__init__._input_kwargs
 self.setParams(**kwargs)
 
 @keyword_only
 @since("1.4.0")
 def setParams(self, vectorSize=100, minCount=5, numPartitions=1, 
stepSize=0.025, maxIter=1,
-  seed=None, inputCol=None, outputCol=None, windowSize=5):
+  seed=None, inputCol=None, outputCol=None, windowSize=5, 
maxSentenceLength=1000):
 """
 setParams(self, minCount=5, numPartitions=1, stepSize=0.025, 
maxIter=1, seed=None, \
- inputCol=None, outputCol=None, windowSize=5)
+ inputCol=None, outputCol=None, windowSize=5, 
maxSentenceLength=1000)
 Sets params for this Word2Vec.
 """
 kwargs = self.setParams._input_kwargs
@@ -2327,6 +2332,20 @@ class Word2Vec(JavaEstimator, HasStepSize, HasMaxIter, 
HasSeed, HasInputCol, Has
 """
 return self.getOrDefault(self.windowSize)
 
+@since("2.0.0")
+def setMaxSentenceLength(self, value):
+"""
+Sets the value of :py:attr:`maxSentenceLength`.
+"""
+return self._set(maxSentenceLength=value)
+
+@since("2.0.0")
+def getMaxSentenceLength(self):
+"""
+Gets the value of maxSentenceLength or its default value.
+"""
+return self.getOrDefault(self.maxSentenceLength)
+
 def _create_model(self, java_model):
 return Word2VecModel(java_model)
 


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-15837][ML][PYSPARK] Word2vec python add maxsentence parameter

2016-06-10 Thread srowen
Repository: spark
Updated Branches:
  refs/heads/master 16ca32eac -> cdd7f5a57


[SPARK-15837][ML][PYSPARK] Word2vec python add maxsentence parameter

## What changes were proposed in this pull request?

Word2vec python add maxsentence parameter.

## How was this patch tested?

Existing test.

Author: WeichenXu 

Closes #13578 from WeichenXu123/word2vec_python_add_maxsentence.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/cdd7f5a5
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/cdd7f5a5
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/cdd7f5a5

Branch: refs/heads/master
Commit: cdd7f5a57a21d4a8f93456d149f65859c96190cf
Parents: 16ca32e
Author: WeichenXu 
Authored: Fri Jun 10 12:26:53 2016 +0100
Committer: Sean Owen 
Committed: Fri Jun 10 12:26:53 2016 +0100

--
 python/pyspark/ml/feature.py | 29 -
 1 file changed, 24 insertions(+), 5 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/cdd7f5a5/python/pyspark/ml/feature.py
--
diff --git a/python/pyspark/ml/feature.py b/python/pyspark/ml/feature.py
index ebe1300..bfb2fb7 100755
--- a/python/pyspark/ml/feature.py
+++ b/python/pyspark/ml/feature.py
@@ -2244,28 +2244,33 @@ class Word2Vec(JavaEstimator, HasStepSize, HasMaxIter, 
HasSeed, HasInputCol, Has
 windowSize = Param(Params._dummy(), "windowSize",
"the window size (context words from [-window, 
window]). Default value is 5",
typeConverter=TypeConverters.toInt)
+maxSentenceLength = Param(Params._dummy(), "maxSentenceLength",
+  "Maximum length (in words) of each sentence in 
the input data. " +
+  "Any sentence longer than this threshold will " +
+  "be divided into chunks up to the size.",
+  typeConverter=TypeConverters.toInt)
 
 @keyword_only
 def __init__(self, vectorSize=100, minCount=5, numPartitions=1, 
stepSize=0.025, maxIter=1,
- seed=None, inputCol=None, outputCol=None, windowSize=5):
+ seed=None, inputCol=None, outputCol=None, windowSize=5, 
maxSentenceLength=1000):
 """
 __init__(self, vectorSize=100, minCount=5, numPartitions=1, 
stepSize=0.025, maxIter=1, \
- seed=None, inputCol=None, outputCol=None, windowSize=5)
+ seed=None, inputCol=None, outputCol=None, windowSize=5, 
maxSentenceLength=1000)
 """
 super(Word2Vec, self).__init__()
 self._java_obj = 
self._new_java_obj("org.apache.spark.ml.feature.Word2Vec", self.uid)
 self._setDefault(vectorSize=100, minCount=5, numPartitions=1, 
stepSize=0.025, maxIter=1,
- seed=None, windowSize=5)
+ seed=None, windowSize=5, maxSentenceLength=1000)
 kwargs = self.__init__._input_kwargs
 self.setParams(**kwargs)
 
 @keyword_only
 @since("1.4.0")
 def setParams(self, vectorSize=100, minCount=5, numPartitions=1, 
stepSize=0.025, maxIter=1,
-  seed=None, inputCol=None, outputCol=None, windowSize=5):
+  seed=None, inputCol=None, outputCol=None, windowSize=5, 
maxSentenceLength=1000):
 """
 setParams(self, minCount=5, numPartitions=1, stepSize=0.025, 
maxIter=1, seed=None, \
- inputCol=None, outputCol=None, windowSize=5)
+ inputCol=None, outputCol=None, windowSize=5, 
maxSentenceLength=1000)
 Sets params for this Word2Vec.
 """
 kwargs = self.setParams._input_kwargs
@@ -2327,6 +2332,20 @@ class Word2Vec(JavaEstimator, HasStepSize, HasMaxIter, 
HasSeed, HasInputCol, Has
 """
 return self.getOrDefault(self.windowSize)
 
+@since("2.0.0")
+def setMaxSentenceLength(self, value):
+"""
+Sets the value of :py:attr:`maxSentenceLength`.
+"""
+return self._set(maxSentenceLength=value)
+
+@since("2.0.0")
+def getMaxSentenceLength(self):
+"""
+Gets the value of maxSentenceLength or its default value.
+"""
+return self.getOrDefault(self.maxSentenceLength)
+
 def _create_model(self, java_model):
 return Word2VecModel(java_model)
 


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-15823][PYSPARK][ML] Add @property for 'accuracy' in MulticlassMetrics

2016-06-10 Thread srowen
Repository: spark
Updated Branches:
  refs/heads/branch-2.0 84a8421e5 -> 6709ce1ae


[SPARK-15823][PYSPARK][ML] Add @property for 'accuracy' in MulticlassMetrics

## What changes were proposed in this pull request?
`accuracy` should be decorated with `property` to keep step with other methods 
in `pyspark.MulticlassMetrics`, like `weightedPrecision`, `weightedRecall`, etc

## How was this patch tested?
manual tests

Author: Zheng RuiFeng 

Closes #13560 from zhengruifeng/add_accuracy_property.

(cherry picked from commit 16ca32eace39c423224b0ec25922038fd45c501a)
Signed-off-by: Sean Owen 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6709ce1a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6709ce1a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6709ce1a

Branch: refs/heads/branch-2.0
Commit: 6709ce1aea4a8d7438722f48fd7f2ed0fc7fa5be
Parents: 84a8421
Author: Zheng RuiFeng 
Authored: Fri Jun 10 10:09:19 2016 +0100
Committer: Sean Owen 
Committed: Fri Jun 10 10:09:29 2016 +0100

--
 python/pyspark/mllib/evaluation.py | 7 ++-
 1 file changed, 2 insertions(+), 5 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/6709ce1a/python/pyspark/mllib/evaluation.py
--
diff --git a/python/pyspark/mllib/evaluation.py 
b/python/pyspark/mllib/evaluation.py
index 2eaac87..fc2a0b3 100644
--- a/python/pyspark/mllib/evaluation.py
+++ b/python/pyspark/mllib/evaluation.py
@@ -179,11 +179,7 @@ class MulticlassMetrics(JavaModelWrapper):
 1.0...
 >>> metrics.fMeasure(0.0, 2.0)
 0.52...
->>> metrics.precision()
-0.66...
->>> metrics.recall()
-0.66...
->>> metrics.accuracy()
+>>> metrics.accuracy
 0.66...
 >>> metrics.weightedFalsePositiveRate
 0.19...
@@ -273,6 +269,7 @@ class MulticlassMetrics(JavaModelWrapper):
 else:
 return self.call("fMeasure", label, beta)
 
+@property
 @since('2.0.0')
 def accuracy(self):
 """


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-15823][PYSPARK][ML] Add @property for 'accuracy' in MulticlassMetrics

2016-06-10 Thread srowen
Repository: spark
Updated Branches:
  refs/heads/master 675a73715 -> 16ca32eac


[SPARK-15823][PYSPARK][ML] Add @property for 'accuracy' in MulticlassMetrics

## What changes were proposed in this pull request?
`accuracy` should be decorated with `property` to keep step with other methods 
in `pyspark.MulticlassMetrics`, like `weightedPrecision`, `weightedRecall`, etc

## How was this patch tested?
manual tests

Author: Zheng RuiFeng 

Closes #13560 from zhengruifeng/add_accuracy_property.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/16ca32ea
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/16ca32ea
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/16ca32ea

Branch: refs/heads/master
Commit: 16ca32eace39c423224b0ec25922038fd45c501a
Parents: 675a737
Author: Zheng RuiFeng 
Authored: Fri Jun 10 10:09:19 2016 +0100
Committer: Sean Owen 
Committed: Fri Jun 10 10:09:19 2016 +0100

--
 python/pyspark/mllib/evaluation.py | 7 ++-
 1 file changed, 2 insertions(+), 5 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/16ca32ea/python/pyspark/mllib/evaluation.py
--
diff --git a/python/pyspark/mllib/evaluation.py 
b/python/pyspark/mllib/evaluation.py
index 2eaac87..fc2a0b3 100644
--- a/python/pyspark/mllib/evaluation.py
+++ b/python/pyspark/mllib/evaluation.py
@@ -179,11 +179,7 @@ class MulticlassMetrics(JavaModelWrapper):
 1.0...
 >>> metrics.fMeasure(0.0, 2.0)
 0.52...
->>> metrics.precision()
-0.66...
->>> metrics.recall()
-0.66...
->>> metrics.accuracy()
+>>> metrics.accuracy
 0.66...
 >>> metrics.weightedFalsePositiveRate
 0.19...
@@ -273,6 +269,7 @@ class MulticlassMetrics(JavaModelWrapper):
 else:
 return self.call("fMeasure", label, beta)
 
+@property
 @since('2.0.0')
 def accuracy(self):
 """


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [DOCUMENTATION] fixed groupby aggregation example for pyspark

2016-06-10 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/master 00c310133 -> 675a73715


[DOCUMENTATION] fixed groupby aggregation example for pyspark

## What changes were proposed in this pull request?

fixing documentation for the groupby/agg example in python

## How was this patch tested?

the existing example in the documentation dose not contain valid syntax 
(missing parenthesis) and is not using `Column` in the expression for `agg()`

after the fix here's how I tested it:

```
In [1]: from pyspark.sql import Row

In [2]: import pyspark.sql.functions as func

In [3]: %cpaste
Pasting code; enter '--' alone on the line to stop or use Ctrl-D.
:records = [{'age': 19, 'department': 1, 'expense': 100},
: {'age': 20, 'department': 1, 'expense': 200},
: {'age': 21, 'department': 2, 'expense': 300},
: {'age': 22, 'department': 2, 'expense': 300},
: {'age': 23, 'department': 3, 'expense': 300}]
:--

In [4]: df = sqlContext.createDataFrame([Row(**d) for d in records])

In [5]: df.groupBy("department").agg(df["department"], func.max("age"), 
func.sum("expense")).show()

+--+--+++
|department|department|max(age)|sum(expense)|
+--+--+++
| 1| 1|  20| 300|
| 2| 2|  22| 600|
| 3| 3|  23| 300|
+--+--+++

Author: Mortada Mehyar 

Closes #13587 from mortada/groupby_agg_doc_fix.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/675a7371
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/675a7371
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/675a7371

Branch: refs/heads/master
Commit: 675a73715d3c8adb9d9a9dce5f76a2db5106790c
Parents: 00c3101
Author: Mortada Mehyar 
Authored: Fri Jun 10 00:23:34 2016 -0700
Committer: Reynold Xin 
Committed: Fri Jun 10 00:23:34 2016 -0700

--
 docs/sql-programming-guide.md | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/675a7371/docs/sql-programming-guide.md
--
diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md
index 940c1d7..efdf873 100644
--- a/docs/sql-programming-guide.md
+++ b/docs/sql-programming-guide.md
@@ -2221,7 +2221,7 @@ import pyspark.sql.functions as func
 
 # In 1.3.x, in order for the grouping column "department" to show up,
 # it must be included explicitly as part of the agg function call.
-df.groupBy("department").agg("department"), func.max("age"), 
func.sum("expense"))
+df.groupBy("department").agg(df["department"], func.max("age"), 
func.sum("expense"))
 
 # In 1.4+, grouping column "department" is included automatically.
 df.groupBy("department").agg(func.max("age"), func.sum("expense"))


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [DOCUMENTATION] fixed groupby aggregation example for pyspark

2016-06-10 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/branch-2.0 02ed7b536 -> 84a8421e5


[DOCUMENTATION] fixed groupby aggregation example for pyspark

## What changes were proposed in this pull request?

fixing documentation for the groupby/agg example in python

## How was this patch tested?

the existing example in the documentation dose not contain valid syntax 
(missing parenthesis) and is not using `Column` in the expression for `agg()`

after the fix here's how I tested it:

```
In [1]: from pyspark.sql import Row

In [2]: import pyspark.sql.functions as func

In [3]: %cpaste
Pasting code; enter '--' alone on the line to stop or use Ctrl-D.
:records = [{'age': 19, 'department': 1, 'expense': 100},
: {'age': 20, 'department': 1, 'expense': 200},
: {'age': 21, 'department': 2, 'expense': 300},
: {'age': 22, 'department': 2, 'expense': 300},
: {'age': 23, 'department': 3, 'expense': 300}]
:--

In [4]: df = sqlContext.createDataFrame([Row(**d) for d in records])

In [5]: df.groupBy("department").agg(df["department"], func.max("age"), 
func.sum("expense")).show()

+--+--+++
|department|department|max(age)|sum(expense)|
+--+--+++
| 1| 1|  20| 300|
| 2| 2|  22| 600|
| 3| 3|  23| 300|
+--+--+++

Author: Mortada Mehyar 

Closes #13587 from mortada/groupby_agg_doc_fix.

(cherry picked from commit 675a73715d3c8adb9d9a9dce5f76a2db5106790c)
Signed-off-by: Reynold Xin 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/84a8421e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/84a8421e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/84a8421e

Branch: refs/heads/branch-2.0
Commit: 84a8421e5cd5756cffb3d796117149c413204264
Parents: 02ed7b5
Author: Mortada Mehyar 
Authored: Fri Jun 10 00:23:34 2016 -0700
Committer: Reynold Xin 
Committed: Fri Jun 10 00:23:41 2016 -0700

--
 docs/sql-programming-guide.md | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/84a8421e/docs/sql-programming-guide.md
--
diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md
index 940c1d7..efdf873 100644
--- a/docs/sql-programming-guide.md
+++ b/docs/sql-programming-guide.md
@@ -2221,7 +2221,7 @@ import pyspark.sql.functions as func
 
 # In 1.3.x, in order for the grouping column "department" to show up,
 # it must be included explicitly as part of the agg function call.
-df.groupBy("department").agg("department"), func.max("age"), 
func.sum("expense"))
+df.groupBy("department").agg(df["department"], func.max("age"), 
func.sum("expense"))
 
 # In 1.4+, grouping column "department" is included automatically.
 df.groupBy("department").agg(func.max("age"), func.sum("expense"))


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [DOCUMENTATION] fixed groupby aggregation example for pyspark

2016-06-10 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/branch-1.6 739d992f0 -> 393f4ba15


[DOCUMENTATION] fixed groupby aggregation example for pyspark

## What changes were proposed in this pull request?

fixing documentation for the groupby/agg example in python

## How was this patch tested?

the existing example in the documentation dose not contain valid syntax 
(missing parenthesis) and is not using `Column` in the expression for `agg()`

after the fix here's how I tested it:

```
In [1]: from pyspark.sql import Row

In [2]: import pyspark.sql.functions as func

In [3]: %cpaste
Pasting code; enter '--' alone on the line to stop or use Ctrl-D.
:records = [{'age': 19, 'department': 1, 'expense': 100},
: {'age': 20, 'department': 1, 'expense': 200},
: {'age': 21, 'department': 2, 'expense': 300},
: {'age': 22, 'department': 2, 'expense': 300},
: {'age': 23, 'department': 3, 'expense': 300}]
:--

In [4]: df = sqlContext.createDataFrame([Row(**d) for d in records])

In [5]: df.groupBy("department").agg(df["department"], func.max("age"), 
func.sum("expense")).show()

+--+--+++
|department|department|max(age)|sum(expense)|
+--+--+++
| 1| 1|  20| 300|
| 2| 2|  22| 600|
| 3| 3|  23| 300|
+--+--+++

Author: Mortada Mehyar 

Closes #13587 from mortada/groupby_agg_doc_fix.

(cherry picked from commit 675a73715d3c8adb9d9a9dce5f76a2db5106790c)
Signed-off-by: Reynold Xin 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/393f4ba1
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/393f4ba1
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/393f4ba1

Branch: refs/heads/branch-1.6
Commit: 393f4ba1516af47388e72310aee8dbbea9652134
Parents: 739d992
Author: Mortada Mehyar 
Authored: Fri Jun 10 00:23:34 2016 -0700
Committer: Reynold Xin 
Committed: Fri Jun 10 00:23:49 2016 -0700

--
 docs/sql-programming-guide.md | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/393f4ba1/docs/sql-programming-guide.md
--
diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md
index 803701e..26511b5 100644
--- a/docs/sql-programming-guide.md
+++ b/docs/sql-programming-guide.md
@@ -2248,7 +2248,7 @@ import pyspark.sql.functions as func
 
 # In 1.3.x, in order for the grouping column "department" to show up,
 # it must be included explicitly as part of the agg function call.
-df.groupBy("department").agg("department"), func.max("age"), 
func.sum("expense"))
+df.groupBy("department").agg(df["department"], func.max("age"), 
func.sum("expense"))
 
 # In 1.4+, grouping column "department" is included automatically.
 df.groupBy("department").agg(func.max("age"), func.sum("expense"))


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-15593][SQL] Add DataFrameWriter.foreach to allow the user consuming data in ContinuousQuery

2016-06-10 Thread tdas
Repository: spark
Updated Branches:
  refs/heads/branch-2.0 1371d5ece -> 02ed7b536


[SPARK-15593][SQL] Add DataFrameWriter.foreach to allow the user consuming data 
in ContinuousQuery

## What changes were proposed in this pull request?

* Add DataFrameWriter.foreach to allow the user consuming data in 
ContinuousQuery
  * ForeachWriter is the interface for the user to consume partitions of data
* Add a type parameter T to DataFrameWriter

Usage
```Scala
val ds = spark.readstream().as[String]
ds.write
 .queryName(...)
.option("checkpointLocation", ...)
.foreach(new ForeachWriter[Int] {
  def open(partitionId: Long, version: Long): Boolean = {
 // prepare some resources for a partition
 // check `version` if possible and return `false` if this is a 
duplicated data to skip the data processing.
  }

  override def process(value: Int): Unit = {
  // process data
  }

  def close(errorOrNull: Throwable): Unit = {
 // release resources for a partition
 // check `errorOrNull` and handle the error if necessary.
  }
})
```

## How was this patch tested?

New unit tests.

Author: Shixiong Zhu 

Closes #13342 from zsxwing/foreach.

(cherry picked from commit 00c310133df4f3893dd90d801168c2ab9841b102)
Signed-off-by: Tathagata Das 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/02ed7b53
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/02ed7b53
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/02ed7b53

Branch: refs/heads/branch-2.0
Commit: 02ed7b536f54f46de9b10a8a4ea79544a7a813bf
Parents: 1371d5e
Author: Shixiong Zhu 
Authored: Fri Jun 10 00:11:46 2016 -0700
Committer: Tathagata Das 
Committed: Fri Jun 10 00:11:56 2016 -0700

--
 .../org/apache/spark/sql/DataFrameWriter.scala  | 150 ++-
 .../scala/org/apache/spark/sql/Dataset.scala|   2 +-
 .../org/apache/spark/sql/ForeachWriter.scala| 105 +
 .../sql/execution/streaming/ForeachSink.scala   |  53 +++
 .../execution/streaming/ForeachSinkSuite.scala  | 141 +
 .../spark/sql/sources/BucketedReadSuite.scala   |   4 +-
 6 files changed, 413 insertions(+), 42 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/02ed7b53/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
--
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index 1dd8818..32e2fdc 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -29,7 +29,7 @@ import 
org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
 import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, Project}
 import org.apache.spark.sql.execution.datasources.{BucketSpec, 
CreateTableUsingAsSelect, DataSource, HadoopFsRelation}
 import org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils
-import org.apache.spark.sql.execution.streaming.{MemoryPlan, MemorySink, 
StreamExecution}
+import org.apache.spark.sql.execution.streaming._
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.streaming.{ContinuousQuery, OutputMode, 
ProcessingTime, Trigger}
 import org.apache.spark.util.Utils
@@ -40,7 +40,9 @@ import org.apache.spark.util.Utils
  *
  * @since 1.4.0
  */
-final class DataFrameWriter private[sql](df: DataFrame) {
+final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
+
+  private val df = ds.toDF()
 
   /**
* Specifies the behavior when data or table already exists. Options include:
@@ -51,7 +53,7 @@ final class DataFrameWriter private[sql](df: DataFrame) {
*
* @since 1.4.0
*/
-  def mode(saveMode: SaveMode): DataFrameWriter = {
+  def mode(saveMode: SaveMode): DataFrameWriter[T] = {
 // mode() is used for non-continuous queries
 // outputMode() is used for continuous queries
 assertNotStreaming("mode() can only be called on non-continuous queries")
@@ -68,7 +70,7 @@ final class DataFrameWriter private[sql](df: DataFrame) {
*
* @since 1.4.0
*/
-  def mode(saveMode: String): DataFrameWriter = {
+  def mode(saveMode: String): DataFrameWriter[T] = {
 // mode() is used for non-continuous queries
 // outputMode() is used for continuous queries
 assertNotStreaming("mode() can only be called on non-continuous queries")
@@ -93,7 +95,7 @@ final class DataFrameWriter private[sql](df: DataFrame) {
* @since 2.0.0
*/
   @Experimental
-  def outputMode(outputMode: OutputMode): DataFrameWriter = {
+  def outputMode(outputMode: OutputMod

spark git commit: [SPARK-15593][SQL] Add DataFrameWriter.foreach to allow the user consuming data in ContinuousQuery

2016-06-10 Thread tdas
Repository: spark
Updated Branches:
  refs/heads/master 5a3533e77 -> 00c310133


[SPARK-15593][SQL] Add DataFrameWriter.foreach to allow the user consuming data 
in ContinuousQuery

## What changes were proposed in this pull request?

* Add DataFrameWriter.foreach to allow the user consuming data in 
ContinuousQuery
  * ForeachWriter is the interface for the user to consume partitions of data
* Add a type parameter T to DataFrameWriter

Usage
```Scala
val ds = spark.readstream().as[String]
ds.write
 .queryName(...)
.option("checkpointLocation", ...)
.foreach(new ForeachWriter[Int] {
  def open(partitionId: Long, version: Long): Boolean = {
 // prepare some resources for a partition
 // check `version` if possible and return `false` if this is a 
duplicated data to skip the data processing.
  }

  override def process(value: Int): Unit = {
  // process data
  }

  def close(errorOrNull: Throwable): Unit = {
 // release resources for a partition
 // check `errorOrNull` and handle the error if necessary.
  }
})
```

## How was this patch tested?

New unit tests.

Author: Shixiong Zhu 

Closes #13342 from zsxwing/foreach.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/00c31013
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/00c31013
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/00c31013

Branch: refs/heads/master
Commit: 00c310133df4f3893dd90d801168c2ab9841b102
Parents: 5a3533e
Author: Shixiong Zhu 
Authored: Fri Jun 10 00:11:46 2016 -0700
Committer: Tathagata Das 
Committed: Fri Jun 10 00:11:46 2016 -0700

--
 .../org/apache/spark/sql/DataFrameWriter.scala  | 150 ++-
 .../scala/org/apache/spark/sql/Dataset.scala|   2 +-
 .../org/apache/spark/sql/ForeachWriter.scala| 105 +
 .../sql/execution/streaming/ForeachSink.scala   |  53 +++
 .../execution/streaming/ForeachSinkSuite.scala  | 141 +
 .../spark/sql/sources/BucketedReadSuite.scala   |   4 +-
 6 files changed, 413 insertions(+), 42 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/00c31013/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
--
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index 1dd8818..32e2fdc 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -29,7 +29,7 @@ import 
org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
 import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, Project}
 import org.apache.spark.sql.execution.datasources.{BucketSpec, 
CreateTableUsingAsSelect, DataSource, HadoopFsRelation}
 import org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils
-import org.apache.spark.sql.execution.streaming.{MemoryPlan, MemorySink, 
StreamExecution}
+import org.apache.spark.sql.execution.streaming._
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.streaming.{ContinuousQuery, OutputMode, 
ProcessingTime, Trigger}
 import org.apache.spark.util.Utils
@@ -40,7 +40,9 @@ import org.apache.spark.util.Utils
  *
  * @since 1.4.0
  */
-final class DataFrameWriter private[sql](df: DataFrame) {
+final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
+
+  private val df = ds.toDF()
 
   /**
* Specifies the behavior when data or table already exists. Options include:
@@ -51,7 +53,7 @@ final class DataFrameWriter private[sql](df: DataFrame) {
*
* @since 1.4.0
*/
-  def mode(saveMode: SaveMode): DataFrameWriter = {
+  def mode(saveMode: SaveMode): DataFrameWriter[T] = {
 // mode() is used for non-continuous queries
 // outputMode() is used for continuous queries
 assertNotStreaming("mode() can only be called on non-continuous queries")
@@ -68,7 +70,7 @@ final class DataFrameWriter private[sql](df: DataFrame) {
*
* @since 1.4.0
*/
-  def mode(saveMode: String): DataFrameWriter = {
+  def mode(saveMode: String): DataFrameWriter[T] = {
 // mode() is used for non-continuous queries
 // outputMode() is used for continuous queries
 assertNotStreaming("mode() can only be called on non-continuous queries")
@@ -93,7 +95,7 @@ final class DataFrameWriter private[sql](df: DataFrame) {
* @since 2.0.0
*/
   @Experimental
-  def outputMode(outputMode: OutputMode): DataFrameWriter = {
+  def outputMode(outputMode: OutputMode): DataFrameWriter[T] = {
 assertStreaming("outputMode() can only be called on continuous queries")