spark git commit: [DOC] bucketing is applicable to all file-based data sources

2016-12-21 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/master 7c5b7b3a2 -> 2e861df96


[DOC] bucketing is applicable to all file-based data sources

## What changes were proposed in this pull request?
Starting Spark 2.1.0, bucketing feature is available for all file-based data 
sources. This patch fixes some function docs that haven't yet been updated to 
reflect that.

## How was this patch tested?
N/A

Author: Reynold Xin 

Closes #16349 from rxin/ds-doc.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2e861df9
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2e861df9
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2e861df9

Branch: refs/heads/master
Commit: 2e861df96eacd821edbbd9883121bff67611074f
Parents: 7c5b7b3
Author: Reynold Xin 
Authored: Wed Dec 21 23:46:33 2016 -0800
Committer: Reynold Xin 
Committed: Wed Dec 21 23:46:33 2016 -0800

--
 .../src/main/scala/org/apache/spark/sql/DataFrameWriter.scala  | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/2e861df9/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
--
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index d33f7da..9c5660a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -150,7 +150,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) 
{
* predicates on the partitioned columns. In order for partitioning to work 
well, the number
* of distinct values in each column should typically be less than tens of 
thousands.
*
-   * This was initially applicable for Parquet but in 1.5+ covers JSON, text, 
ORC and avro as well.
+   * This is applicable for all file-based data sources (e.g. Parquet, JSON) 
staring Spark 2.1.0.
*
* @since 1.4.0
*/
@@ -164,7 +164,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) 
{
* Buckets the output by the given columns. If specified, the output is laid 
out on the file
* system similar to Hive's bucketing scheme.
*
-   * This is applicable for Parquet, JSON and ORC.
+   * This is applicable for all file-based data sources (e.g. Parquet, JSON) 
staring Spark 2.1.0.
*
* @since 2.0
*/
@@ -178,7 +178,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) 
{
   /**
* Sorts the output in each bucket by the given columns.
*
-   * This is applicable for Parquet, JSON and ORC.
+   * This is applicable for all file-based data sources (e.g. Parquet, JSON) 
staring Spark 2.1.0.
*
* @since 2.0
*/


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [DOC] bucketing is applicable to all file-based data sources

2016-12-21 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/branch-2.1 def3690f6 -> ec0d6e21e


[DOC] bucketing is applicable to all file-based data sources

## What changes were proposed in this pull request?
Starting Spark 2.1.0, bucketing feature is available for all file-based data 
sources. This patch fixes some function docs that haven't yet been updated to 
reflect that.

## How was this patch tested?
N/A

Author: Reynold Xin 

Closes #16349 from rxin/ds-doc.

(cherry picked from commit 2e861df96eacd821edbbd9883121bff67611074f)
Signed-off-by: Reynold Xin 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ec0d6e21
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ec0d6e21
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ec0d6e21

Branch: refs/heads/branch-2.1
Commit: ec0d6e21ed85164fd7eb519ec1d017497122c55c
Parents: def3690
Author: Reynold Xin 
Authored: Wed Dec 21 23:46:33 2016 -0800
Committer: Reynold Xin 
Committed: Wed Dec 21 23:46:38 2016 -0800

--
 .../src/main/scala/org/apache/spark/sql/DataFrameWriter.scala  | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/ec0d6e21/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
--
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index fa8e8cb..44c407d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -150,7 +150,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) 
{
* predicates on the partitioned columns. In order for partitioning to work 
well, the number
* of distinct values in each column should typically be less than tens of 
thousands.
*
-   * This was initially applicable for Parquet but in 1.5+ covers JSON, text, 
ORC and avro as well.
+   * This is applicable for all file-based data sources (e.g. Parquet, JSON) 
staring Spark 2.1.0.
*
* @since 1.4.0
*/
@@ -164,7 +164,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) 
{
* Buckets the output by the given columns. If specified, the output is laid 
out on the file
* system similar to Hive's bucketing scheme.
*
-   * This is applicable for Parquet, JSON and ORC.
+   * This is applicable for all file-based data sources (e.g. Parquet, JSON) 
staring Spark 2.1.0.
*
* @since 2.0
*/
@@ -178,7 +178,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) 
{
   /**
* Sorts the output in each bucket by the given columns.
*
-   * This is applicable for Parquet, JSON and ORC.
+   * This is applicable for all file-based data sources (e.g. Parquet, JSON) 
staring Spark 2.1.0.
*
* @since 2.0
*/


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SQL] Minor readability improvement for partition handling code

2016-12-21 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/branch-2.1 07e2a17d1 -> def3690f6


[SQL] Minor readability improvement for partition handling code

This patch includes minor changes to improve readability for partition handling 
code. I'm in the middle of implementing some new feature and found some naming 
/ implicit type inference not as intuitive.

This patch should have no semantic change and the changes should be covered by 
existing test cases.

Author: Reynold Xin 

Closes #16378 from rxin/minor-fix.

(cherry picked from commit 7c5b7b3a2e5a7c1b2d0d8ce655840cad581e47ac)
Signed-off-by: Reynold Xin 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/def3690f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/def3690f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/def3690f

Branch: refs/heads/branch-2.1
Commit: def3690f6889979226478bf9c35a240d7e0662e6
Parents: 07e2a17
Author: Reynold Xin 
Authored: Thu Dec 22 15:29:56 2016 +0800
Committer: Reynold Xin 
Committed: Wed Dec 21 23:45:16 2016 -0800

--
 .../sql/execution/DataSourceScanExec.scala  |  7 +-
 .../datasources/CatalogFileIndex.scala  | 11 +--
 .../sql/execution/datasources/FileFormat.scala  |  3 +-
 .../execution/datasources/FileStatusCache.scala | 72 ++--
 4 files changed, 49 insertions(+), 44 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/def3690f/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
index e485b52..7616164 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
@@ -136,7 +136,7 @@ case class RowDataSourceScanExec(
  * @param outputSchema Output schema of the scan.
  * @param partitionFilters Predicates to use for partition pruning.
  * @param dataFilters Data source filters to use for filtering data within 
partitions.
- * @param metastoreTableIdentifier
+ * @param metastoreTableIdentifier identifier for the table in the metastore.
  */
 case class FileSourceScanExec(
 @transient relation: HadoopFsRelation,
@@ -147,10 +147,10 @@ case class FileSourceScanExec(
 override val metastoreTableIdentifier: Option[TableIdentifier])
   extends DataSourceScanExec {
 
-  val supportsBatch = relation.fileFormat.supportBatch(
+  val supportsBatch: Boolean = relation.fileFormat.supportBatch(
 relation.sparkSession, StructType.fromAttributes(output))
 
-  val needsUnsafeRowConversion = if 
(relation.fileFormat.isInstanceOf[ParquetSource]) {
+  val needsUnsafeRowConversion: Boolean = if 
(relation.fileFormat.isInstanceOf[ParquetSource]) {
 
SparkSession.getActiveSession.get.sessionState.conf.parquetVectorizedReaderEnabled
   } else {
 false
@@ -516,7 +516,6 @@ case class FileSourceScanExec(
 }
 
 // Assign files to partitions using "First Fit Decreasing" (FFD)
-// TODO: consider adding a slop factor here?
 splitFiles.foreach { file =>
   if (currentSize + file.length > maxSplitBytes) {
 closePartition()

http://git-wip-us.apache.org/repos/asf/spark/blob/def3690f/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala
index 4ad91dc..1235a4b 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.sql.execution.datasources
 
+import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
 
 import org.apache.spark.sql.SparkSession
@@ -37,14 +38,15 @@ class CatalogFileIndex(
 val table: CatalogTable,
 override val sizeInBytes: Long) extends FileIndex {
 
-  protected val hadoopConf = sparkSession.sessionState.newHadoopConf
+  protected val hadoopConf: Configuration = 
sparkSession.sessionState.newHadoopConf()
 
-  private val fileStatusCache = FileStatusCache.newCache(sparkSession)
+  /** Globally shared (not exclusive to this table) cache for file statuses to 
speed up listing. */
+  private val fileStatusCache = FileStatusCache.getOrCreate(sparkSession)
 
   assert(table.identifier.database.isDefined,
 "The table identifier must be

spark git commit: [SQL] Minor readability improvement for partition handling code

2016-12-21 Thread wenchen
Repository: spark
Updated Branches:
  refs/heads/master ff7d82a20 -> 7c5b7b3a2


[SQL] Minor readability improvement for partition handling code

## What changes were proposed in this pull request?
This patch includes minor changes to improve readability for partition handling 
code. I'm in the middle of implementing some new feature and found some naming 
/ implicit type inference not as intuitive.

## How was this patch tested?
This patch should have no semantic change and the changes should be covered by 
existing test cases.

Author: Reynold Xin 

Closes #16378 from rxin/minor-fix.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7c5b7b3a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7c5b7b3a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7c5b7b3a

Branch: refs/heads/master
Commit: 7c5b7b3a2e5a7c1b2d0d8ce655840cad581e47ac
Parents: ff7d82a
Author: Reynold Xin 
Authored: Thu Dec 22 15:29:56 2016 +0800
Committer: Wenchen Fan 
Committed: Thu Dec 22 15:29:56 2016 +0800

--
 .../sql/execution/DataSourceScanExec.scala  |  7 +-
 .../datasources/CatalogFileIndex.scala  | 11 +--
 .../sql/execution/datasources/FileFormat.scala  |  3 +-
 .../execution/datasources/FileStatusCache.scala | 72 ++--
 4 files changed, 49 insertions(+), 44 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/7c5b7b3a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
index e485b52..7616164 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
@@ -136,7 +136,7 @@ case class RowDataSourceScanExec(
  * @param outputSchema Output schema of the scan.
  * @param partitionFilters Predicates to use for partition pruning.
  * @param dataFilters Data source filters to use for filtering data within 
partitions.
- * @param metastoreTableIdentifier
+ * @param metastoreTableIdentifier identifier for the table in the metastore.
  */
 case class FileSourceScanExec(
 @transient relation: HadoopFsRelation,
@@ -147,10 +147,10 @@ case class FileSourceScanExec(
 override val metastoreTableIdentifier: Option[TableIdentifier])
   extends DataSourceScanExec {
 
-  val supportsBatch = relation.fileFormat.supportBatch(
+  val supportsBatch: Boolean = relation.fileFormat.supportBatch(
 relation.sparkSession, StructType.fromAttributes(output))
 
-  val needsUnsafeRowConversion = if 
(relation.fileFormat.isInstanceOf[ParquetSource]) {
+  val needsUnsafeRowConversion: Boolean = if 
(relation.fileFormat.isInstanceOf[ParquetSource]) {
 
SparkSession.getActiveSession.get.sessionState.conf.parquetVectorizedReaderEnabled
   } else {
 false
@@ -516,7 +516,6 @@ case class FileSourceScanExec(
 }
 
 // Assign files to partitions using "First Fit Decreasing" (FFD)
-// TODO: consider adding a slop factor here?
 splitFiles.foreach { file =>
   if (currentSize + file.length > maxSplitBytes) {
 closePartition()

http://git-wip-us.apache.org/repos/asf/spark/blob/7c5b7b3a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala
index 4ad91dc..1235a4b 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.sql.execution.datasources
 
+import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
 
 import org.apache.spark.sql.SparkSession
@@ -37,14 +38,15 @@ class CatalogFileIndex(
 val table: CatalogTable,
 override val sizeInBytes: Long) extends FileIndex {
 
-  protected val hadoopConf = sparkSession.sessionState.newHadoopConf
+  protected val hadoopConf: Configuration = 
sparkSession.sessionState.newHadoopConf()
 
-  private val fileStatusCache = FileStatusCache.newCache(sparkSession)
+  /** Globally shared (not exclusive to this table) cache for file statuses to 
speed up listing. */
+  private val fileStatusCache = FileStatusCache.getOrCreate(sparkSession)
 
   assert(table.identifier.database.isDefined,
 "The table identifier must be qualified in CatalogFil

spark git commit: [SPARK-18908][SS] Creating StreamingQueryException should check if logicalPlan is created

2016-12-21 Thread zsxwing
Repository: spark
Updated Branches:
  refs/heads/branch-2.1 9a3c5bd70 -> 07e2a17d1


[SPARK-18908][SS] Creating StreamingQueryException should check if logicalPlan 
is created

## What changes were proposed in this pull request?

This PR audits places using `logicalPlan` in StreamExecution and ensures they 
all handles the case that `logicalPlan` cannot be created.

In addition, this PR also fixes the following issues in 
`StreamingQueryException`:
- `StreamingQueryException` and `StreamExecution` are cycle-dependent because 
in the `StreamingQueryException`'s constructor, it calls `StreamExecution`'s 
`toDebugString` which uses `StreamingQueryException`. Hence it will output 
`null` value in the error message.
- Duplicated stack trace when calling Throwable.printStackTrace because 
StreamingQueryException's toString contains the stack trace.

## How was this patch tested?

The updated `test("max files per trigger - incorrect values")`. I found this 
issue when I switched from `testStream` to the real codes to verify the failure 
in this test.

Author: Shixiong Zhu 

Closes #16322 from zsxwing/SPARK-18907.

(cherry picked from commit ff7d82a207e8bef7779c27378f7a50a138627341)
Signed-off-by: Shixiong Zhu 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/07e2a17d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/07e2a17d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/07e2a17d

Branch: refs/heads/branch-2.1
Commit: 07e2a17d1cb7eade93d482d18a2079e9e6f40f57
Parents: 9a3c5bd
Author: Shixiong Zhu 
Authored: Wed Dec 21 22:02:57 2016 -0800
Committer: Shixiong Zhu 
Committed: Wed Dec 21 22:03:05 2016 -0800

--
 .../execution/streaming/StreamExecution.scala   | 141 ---
 .../sql/streaming/StreamingQueryException.scala |  28 +---
 .../sql/streaming/FileStreamSourceSuite.scala   |  39 +++--
 .../spark/sql/streaming/StreamSuite.scala   |   3 +-
 .../apache/spark/sql/streaming/StreamTest.scala |  52 +--
 .../streaming/StreamingQueryListenerSuite.scala |   2 +-
 .../sql/streaming/StreamingQuerySuite.scala |   4 +-
 7 files changed, 165 insertions(+), 104 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/07e2a17d/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index e05200d..a35950e 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -31,7 +31,6 @@ import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.encoders.RowEncoder
 import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, 
CurrentBatchTimestamp, CurrentDate, CurrentTimestamp}
 import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
-import org.apache.spark.sql.catalyst.util._
 import org.apache.spark.sql.execution.QueryExecution
 import org.apache.spark.sql.execution.command.ExplainCommand
 import org.apache.spark.sql.streaming._
@@ -67,6 +66,7 @@ class StreamExecution(
   private val awaitBatchLock = new ReentrantLock(true)
   private val awaitBatchLockCondition = awaitBatchLock.newCondition()
 
+  private val initializationLatch = new CountDownLatch(1)
   private val startLatch = new CountDownLatch(1)
   private val terminationLatch = new CountDownLatch(1)
 
@@ -118,9 +118,22 @@ class StreamExecution(
   private val prettyIdString =
 Option(name).map(_ + " ").getOrElse("") + s"[id = $id, runId = $runId]"
 
+  /**
+   * All stream sources present in the query plan. This will be set when 
generating logical plan.
+   */
+  @volatile protected var sources: Seq[Source] = Seq.empty
+
+  /**
+   * A list of unique sources in the query plan. This will be set when 
generating logical plan.
+   */
+  @volatile private var uniqueSources: Seq[Source] = Seq.empty
+
   override lazy val logicalPlan: LogicalPlan = {
+assert(microBatchThread eq Thread.currentThread,
+  "logicalPlan must be initialized in StreamExecutionThread " +
+s"but the current thread was ${Thread.currentThread}")
 var nextSourceId = 0L
-analyzedPlan.transform {
+val _logicalPlan = analyzedPlan.transform {
   case StreamingRelation(dataSource, _, output) =>
 // Materialize source to avoid creating it in every batch
 val metadataPath = s"$checkpointRoot/sources/$nextSourceId"
@@ -130,22 +143,18 @@ class StreamExecution(
 // "df.logicalPlan" has already used attributes of the previous 
`output`.

spark git commit: [SPARK-18908][SS] Creating StreamingQueryException should check if logicalPlan is created

2016-12-21 Thread zsxwing
Repository: spark
Updated Branches:
  refs/heads/master e1b43dc45 -> ff7d82a20


[SPARK-18908][SS] Creating StreamingQueryException should check if logicalPlan 
is created

## What changes were proposed in this pull request?

This PR audits places using `logicalPlan` in StreamExecution and ensures they 
all handles the case that `logicalPlan` cannot be created.

In addition, this PR also fixes the following issues in 
`StreamingQueryException`:
- `StreamingQueryException` and `StreamExecution` are cycle-dependent because 
in the `StreamingQueryException`'s constructor, it calls `StreamExecution`'s 
`toDebugString` which uses `StreamingQueryException`. Hence it will output 
`null` value in the error message.
- Duplicated stack trace when calling Throwable.printStackTrace because 
StreamingQueryException's toString contains the stack trace.

## How was this patch tested?

The updated `test("max files per trigger - incorrect values")`. I found this 
issue when I switched from `testStream` to the real codes to verify the failure 
in this test.

Author: Shixiong Zhu 

Closes #16322 from zsxwing/SPARK-18907.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ff7d82a2
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ff7d82a2
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ff7d82a2

Branch: refs/heads/master
Commit: ff7d82a207e8bef7779c27378f7a50a138627341
Parents: e1b43dc
Author: Shixiong Zhu 
Authored: Wed Dec 21 22:02:57 2016 -0800
Committer: Shixiong Zhu 
Committed: Wed Dec 21 22:02:57 2016 -0800

--
 .../execution/streaming/StreamExecution.scala   | 141 ---
 .../sql/streaming/StreamingQueryException.scala |  28 +---
 .../sql/streaming/FileStreamSourceSuite.scala   |  39 +++--
 .../spark/sql/streaming/StreamSuite.scala   |   3 +-
 .../apache/spark/sql/streaming/StreamTest.scala |  52 +--
 .../streaming/StreamingQueryListenerSuite.scala |   2 +-
 .../sql/streaming/StreamingQuerySuite.scala |   4 +-
 7 files changed, 165 insertions(+), 104 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/ff7d82a2/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index e05200d..a35950e 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -31,7 +31,6 @@ import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.encoders.RowEncoder
 import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, 
CurrentBatchTimestamp, CurrentDate, CurrentTimestamp}
 import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
-import org.apache.spark.sql.catalyst.util._
 import org.apache.spark.sql.execution.QueryExecution
 import org.apache.spark.sql.execution.command.ExplainCommand
 import org.apache.spark.sql.streaming._
@@ -67,6 +66,7 @@ class StreamExecution(
   private val awaitBatchLock = new ReentrantLock(true)
   private val awaitBatchLockCondition = awaitBatchLock.newCondition()
 
+  private val initializationLatch = new CountDownLatch(1)
   private val startLatch = new CountDownLatch(1)
   private val terminationLatch = new CountDownLatch(1)
 
@@ -118,9 +118,22 @@ class StreamExecution(
   private val prettyIdString =
 Option(name).map(_ + " ").getOrElse("") + s"[id = $id, runId = $runId]"
 
+  /**
+   * All stream sources present in the query plan. This will be set when 
generating logical plan.
+   */
+  @volatile protected var sources: Seq[Source] = Seq.empty
+
+  /**
+   * A list of unique sources in the query plan. This will be set when 
generating logical plan.
+   */
+  @volatile private var uniqueSources: Seq[Source] = Seq.empty
+
   override lazy val logicalPlan: LogicalPlan = {
+assert(microBatchThread eq Thread.currentThread,
+  "logicalPlan must be initialized in StreamExecutionThread " +
+s"but the current thread was ${Thread.currentThread}")
 var nextSourceId = 0L
-analyzedPlan.transform {
+val _logicalPlan = analyzedPlan.transform {
   case StreamingRelation(dataSource, _, output) =>
 // Materialize source to avoid creating it in every batch
 val metadataPath = s"$checkpointRoot/sources/$nextSourceId"
@@ -130,22 +143,18 @@ class StreamExecution(
 // "df.logicalPlan" has already used attributes of the previous 
`output`.
 StreamingExecutionRelation(source, output)
 }
+sources = _logicalPlan.collect { case s: St

spark git commit: [BUILD] make-distribution should find JAVA_HOME for non-RHEL systems

2016-12-21 Thread felixcheung
Repository: spark
Updated Branches:
  refs/heads/master afe36516e -> e1b43dc45


[BUILD] make-distribution should find JAVA_HOME for non-RHEL systems

## What changes were proposed in this pull request?

make-distribution.sh should find JAVA_HOME for Ubuntu, Mac and other non-RHEL 
systems

## How was this patch tested?

Manually

Author: Felix Cheung 

Closes #16363 from felixcheung/buildjava.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e1b43dc4
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e1b43dc4
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e1b43dc4

Branch: refs/heads/master
Commit: e1b43dc45b136a89f105c04c3074233f52568152
Parents: afe3651
Author: Felix Cheung 
Authored: Wed Dec 21 17:24:53 2016 -0800
Committer: Felix Cheung 
Committed: Wed Dec 21 17:24:53 2016 -0800

--
 dev/make-distribution.sh | 7 +++
 1 file changed, 7 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/e1b43dc4/dev/make-distribution.sh
--
diff --git a/dev/make-distribution.sh b/dev/make-distribution.sh
index 6ea319e..6c5ae0d 100755
--- a/dev/make-distribution.sh
+++ b/dev/make-distribution.sh
@@ -102,6 +102,13 @@ if [ -z "$JAVA_HOME" ]; then
   echo "No JAVA_HOME set, proceeding with '$JAVA_HOME' learned from rpm"
 fi
   fi
+
+  if [ -z "$JAVA_HOME" ]; then
+if [ `command -v java` ]; then
+  # If java is in /usr/bin/java, we want /usr
+  JAVA_HOME="$(dirname $(dirname $(which java)))"
+fi
+  fi
 fi
 
 if [ -z "$JAVA_HOME" ]; then


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [FLAKY-TEST] InputStreamsSuite.socket input stream

2016-12-21 Thread tdas
Repository: spark
Updated Branches:
  refs/heads/branch-2.1 021952d58 -> 9a3c5bd70


[FLAKY-TEST] InputStreamsSuite.socket input stream

## What changes were proposed in this pull request?

https://spark-tests.appspot.com/test-details?suite_name=org.apache.spark.streaming.InputStreamsSuite&test_name=socket+input+stream

## How was this patch tested?

Tested 2,000 times.

Author: Burak Yavuz 

Closes #16343 from brkyvz/sock.

(cherry picked from commit afe36516e4b4031196ee2e0a04980ac49208ea6b)
Signed-off-by: Tathagata Das 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9a3c5bd7
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9a3c5bd7
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9a3c5bd7

Branch: refs/heads/branch-2.1
Commit: 9a3c5bd7082474cfb01f021aef103e44d12e2ff1
Parents: 021952d
Author: Burak Yavuz 
Authored: Wed Dec 21 17:23:48 2016 -0800
Committer: Tathagata Das 
Committed: Wed Dec 21 17:23:58 2016 -0800

--
 .../spark/streaming/InputStreamsSuite.scala | 55 
 1 file changed, 23 insertions(+), 32 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/9a3c5bd7/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
--
diff --git 
a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala 
b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
index 9ecfa48..6fb50a4 100644
--- 
a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
+++ 
b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
@@ -67,42 +67,33 @@ class InputStreamsSuite extends TestSuiteBase with 
BeforeAndAfter {
 val expectedOutput = input.map(_.toString)
 for (i <- input.indices) {
   testServer.send(input(i).toString + "\n")
-  Thread.sleep(500)
   clock.advance(batchDuration.milliseconds)
 }
-// Make sure we finish all batches before "stop"
-if (!batchCounter.waitUntilBatchesCompleted(input.size, 3)) {
-  fail("Timeout: cannot finish all batches in 30 seconds")
+
+eventually(eventuallyTimeout) {
+  clock.advance(batchDuration.milliseconds)
+  // Verify whether data received was as expected
+  logInfo("")
+  logInfo("output.size = " + outputQueue.size)
+  logInfo("output")
+  outputQueue.asScala.foreach(x => logInfo("[" + x.mkString(",") + 
"]"))
+  logInfo("expected output.size = " + expectedOutput.size)
+  logInfo("expected output")
+  expectedOutput.foreach(x => logInfo("[" + x.mkString(",") + "]"))
+  logInfo("")
+
+  // Verify whether all the elements received are as expected
+  // (whether the elements were received one in each interval is not 
verified)
+  val output: Array[String] = outputQueue.asScala.flatMap(x => 
x).toArray
+  assert(output.length === expectedOutput.size)
+  for (i <- output.indices) {
+assert(output(i) === expectedOutput(i))
+  }
 }
 
-// Ensure progress listener has been notified of all events
-ssc.sparkContext.listenerBus.waitUntilEmpty(500)
-
-// Verify all "InputInfo"s have been reported
-assert(ssc.progressListener.numTotalReceivedRecords === input.size)
-assert(ssc.progressListener.numTotalProcessedRecords === input.size)
-
-logInfo("Stopping server")
-testServer.stop()
-logInfo("Stopping context")
-ssc.stop()
-
-// Verify whether data received was as expected
-logInfo("")
-logInfo("output.size = " + outputQueue.size)
-logInfo("output")
-outputQueue.asScala.foreach(x => logInfo("[" + x.mkString(",") + "]"))
-logInfo("expected output.size = " + expectedOutput.size)
-logInfo("expected output")
-expectedOutput.foreach(x => logInfo("[" + x.mkString(",") + "]"))
-logInfo("")
-
-// Verify whether all the elements received are as expected
-// (whether the elements were received one in each interval is not 
verified)
-val output: Array[String] = outputQueue.asScala.flatMap(x => x).toArray
-assert(output.length === expectedOutput.size)
-for (i <- output.indices) {
-  assert(output(i) === expectedOutput(i))
+eventually(eventuallyTimeout) {
+  assert(ssc.progressListener.numTotalReceivedRecords === input.length)
+  assert(ssc.progressListener.numTotalProcessedRecords === 
input.length)
 }
   }
 }


-

spark git commit: [FLAKY-TEST] InputStreamsSuite.socket input stream

2016-12-21 Thread tdas
Repository: spark
Updated Branches:
  refs/heads/master 7e8994ffd -> afe36516e


[FLAKY-TEST] InputStreamsSuite.socket input stream

## What changes were proposed in this pull request?

https://spark-tests.appspot.com/test-details?suite_name=org.apache.spark.streaming.InputStreamsSuite&test_name=socket+input+stream

## How was this patch tested?

Tested 2,000 times.

Author: Burak Yavuz 

Closes #16343 from brkyvz/sock.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/afe36516
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/afe36516
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/afe36516

Branch: refs/heads/master
Commit: afe36516e4b4031196ee2e0a04980ac49208ea6b
Parents: 7e8994f
Author: Burak Yavuz 
Authored: Wed Dec 21 17:23:48 2016 -0800
Committer: Tathagata Das 
Committed: Wed Dec 21 17:23:48 2016 -0800

--
 .../spark/streaming/InputStreamsSuite.scala | 55 
 1 file changed, 23 insertions(+), 32 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/afe36516/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
--
diff --git 
a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala 
b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
index 9ecfa48..6fb50a4 100644
--- 
a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
+++ 
b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
@@ -67,42 +67,33 @@ class InputStreamsSuite extends TestSuiteBase with 
BeforeAndAfter {
 val expectedOutput = input.map(_.toString)
 for (i <- input.indices) {
   testServer.send(input(i).toString + "\n")
-  Thread.sleep(500)
   clock.advance(batchDuration.milliseconds)
 }
-// Make sure we finish all batches before "stop"
-if (!batchCounter.waitUntilBatchesCompleted(input.size, 3)) {
-  fail("Timeout: cannot finish all batches in 30 seconds")
+
+eventually(eventuallyTimeout) {
+  clock.advance(batchDuration.milliseconds)
+  // Verify whether data received was as expected
+  logInfo("")
+  logInfo("output.size = " + outputQueue.size)
+  logInfo("output")
+  outputQueue.asScala.foreach(x => logInfo("[" + x.mkString(",") + 
"]"))
+  logInfo("expected output.size = " + expectedOutput.size)
+  logInfo("expected output")
+  expectedOutput.foreach(x => logInfo("[" + x.mkString(",") + "]"))
+  logInfo("")
+
+  // Verify whether all the elements received are as expected
+  // (whether the elements were received one in each interval is not 
verified)
+  val output: Array[String] = outputQueue.asScala.flatMap(x => 
x).toArray
+  assert(output.length === expectedOutput.size)
+  for (i <- output.indices) {
+assert(output(i) === expectedOutput(i))
+  }
 }
 
-// Ensure progress listener has been notified of all events
-ssc.sparkContext.listenerBus.waitUntilEmpty(500)
-
-// Verify all "InputInfo"s have been reported
-assert(ssc.progressListener.numTotalReceivedRecords === input.size)
-assert(ssc.progressListener.numTotalProcessedRecords === input.size)
-
-logInfo("Stopping server")
-testServer.stop()
-logInfo("Stopping context")
-ssc.stop()
-
-// Verify whether data received was as expected
-logInfo("")
-logInfo("output.size = " + outputQueue.size)
-logInfo("output")
-outputQueue.asScala.foreach(x => logInfo("[" + x.mkString(",") + "]"))
-logInfo("expected output.size = " + expectedOutput.size)
-logInfo("expected output")
-expectedOutput.foreach(x => logInfo("[" + x.mkString(",") + "]"))
-logInfo("")
-
-// Verify whether all the elements received are as expected
-// (whether the elements were received one in each interval is not 
verified)
-val output: Array[String] = outputQueue.asScala.flatMap(x => x).toArray
-assert(output.length === expectedOutput.size)
-for (i <- output.indices) {
-  assert(output(i) === expectedOutput(i))
+eventually(eventuallyTimeout) {
+  assert(ssc.progressListener.numTotalReceivedRecords === input.length)
+  assert(ssc.progressListener.numTotalProcessedRecords === 
input.length)
 }
   }
 }


-
To unsubscribe, e-mail: commits-unsubsc

spark git commit: [SPARK-18903][SPARKR] Add API to get SparkUI URL

2016-12-21 Thread felixcheung
Repository: spark
Updated Branches:
  refs/heads/master b41ec9977 -> 7e8994ffd


[SPARK-18903][SPARKR] Add API to get SparkUI URL

## What changes were proposed in this pull request?

API for SparkUI URL from SparkContext

## How was this patch tested?

manual, unit tests

Author: Felix Cheung 

Closes #16367 from felixcheung/rwebui.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7e8994ff
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7e8994ff
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7e8994ff

Branch: refs/heads/master
Commit: 7e8994ffd3d646adb0a769229637931e43cd12b0
Parents: b41ec99
Author: Felix Cheung 
Authored: Wed Dec 21 17:21:17 2016 -0800
Committer: Felix Cheung 
Committed: Wed Dec 21 17:21:17 2016 -0800

--
 R/pkg/NAMESPACE   |  1 +
 R/pkg/R/sparkR.R  | 24 
 R/pkg/inst/tests/testthat/test_sparkSQL.R |  5 -
 3 files changed, 29 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/7e8994ff/R/pkg/NAMESPACE
--
diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE
index 377f942..c3ec3f4 100644
--- a/R/pkg/NAMESPACE
+++ b/R/pkg/NAMESPACE
@@ -16,6 +16,7 @@ export("sparkR.stop")
 export("sparkR.session.stop")
 export("sparkR.conf")
 export("sparkR.version")
+export("sparkR.uiWebUrl")
 export("print.jobj")
 
 export("sparkR.newJObject")

http://git-wip-us.apache.org/repos/asf/spark/blob/7e8994ff/R/pkg/R/sparkR.R
--
diff --git a/R/pkg/R/sparkR.R b/R/pkg/R/sparkR.R
index c57cc8f..e9d42c1 100644
--- a/R/pkg/R/sparkR.R
+++ b/R/pkg/R/sparkR.R
@@ -410,6 +410,30 @@ sparkR.session <- function(
   sparkSession
 }
 
+#' Get the URL of the SparkUI instance for the current active SparkSession
+#'
+#' Get the URL of the SparkUI instance for the current active SparkSession.
+#'
+#' @return the SparkUI URL, or NA if it is disabled, or not started.
+#' @rdname sparkR.uiWebUrl
+#' @name sparkR.uiWebUrl
+#' @export
+#' @examples
+#'\dontrun{
+#' sparkR.session()
+#' url <- sparkR.uiWebUrl()
+#' }
+#' @note sparkR.uiWebUrl since 2.2.0
+sparkR.uiWebUrl <- function() {
+  sc <- sparkR.callJMethod(getSparkContext(), "sc")
+  u <- callJMethod(sc, "uiWebUrl")
+  if (callJMethod(u, "isDefined")) {
+callJMethod(u, "get")
+  } else {
+NA
+  }
+}
+
 #' Assigns a group ID to all the jobs started by this thread until the group 
ID is set to a
 #' different value or cleared.
 #'

http://git-wip-us.apache.org/repos/asf/spark/blob/7e8994ff/R/pkg/inst/tests/testthat/test_sparkSQL.R
--
diff --git a/R/pkg/inst/tests/testthat/test_sparkSQL.R 
b/R/pkg/inst/tests/testthat/test_sparkSQL.R
index 2e95737..4490f31 100644
--- a/R/pkg/inst/tests/testthat/test_sparkSQL.R
+++ b/R/pkg/inst/tests/testthat/test_sparkSQL.R
@@ -2613,7 +2613,7 @@ test_that("randomSplit", {
   expect_true(all(sapply(abs(counts / num - weights / sum(weights)), 
function(e) { e < 0.05 })))
 })
 
-test_that("Setting and getting config on SparkSession", {
+test_that("Setting and getting config on SparkSession, sparkR.conf(), 
sparkR.uiWebUrl()", {
   # first, set it to a random but known value
   conf <- callJMethod(sparkSession, "conf")
   property <- paste0("spark.testing.", as.character(runif(1)))
@@ -2637,6 +2637,9 @@ test_that("Setting and getting config on SparkSession", {
   expect_equal(appNameValue, "sparkSession test")
   expect_equal(testValue, value)
   expect_error(sparkR.conf("completely.dummy"), "Config 'completely.dummy' is 
not set")
+
+  url <- sparkR.uiWebUrl()
+  expect_equal(substr(url, 1, 7), "http://";)
 })
 
 test_that("enableHiveSupport on SparkSession", {


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-18528][SQL] Fix a bug to initialise an iterator of aggregation buffer

2016-12-21 Thread hvanhovell
Repository: spark
Updated Branches:
  refs/heads/branch-2.0 53cd99f65 -> 080ac37fb


[SPARK-18528][SQL] Fix a bug to initialise an iterator of aggregation buffer

## What changes were proposed in this pull request?
This pr is to fix an `NullPointerException` issue caused by a following `limit 
+ aggregate` query;
```
scala> val df = Seq(("a", 1), ("b", 2), ("c", 1), ("d", 5)).toDF("id", "value")
scala> df.limit(2).groupBy("id").count().show
WARN TaskSetManager: Lost task 0.0 in stage 9.0 (TID 8204, 
lvsp20hdn012.stubprod.com): java.lang.NullPointerException
at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithKeys$(Unknown
 Source)
at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
 Source)
```
The root culprit is that 
[`$doAgg()`](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala#L596)
 skips an initialization of [the buffer 
iterator](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala#L603);
 `BaseLimitExec` sets `stopEarly=true` and `$doAgg()` exits in the middle 
without the initialization.

## How was this patch tested?
Added a test to check if no exception happens for limit + aggregates in 
`DataFrameAggregateSuite.scala`.

Author: Takeshi YAMAMURO 

Closes #15980 from maropu/SPARK-18528.

(cherry picked from commit b41ec997786e2be42a8a2a182212a610d08b221b)
Signed-off-by: Herman van Hovell 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/080ac37f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/080ac37f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/080ac37f

Branch: refs/heads/branch-2.0
Commit: 080ac37fb5ff5f6b8781863866e8099eb9be4dba
Parents: 53cd99f
Author: Takeshi YAMAMURO 
Authored: Thu Dec 22 01:53:33 2016 +0100
Committer: Herman van Hovell 
Committed: Thu Dec 22 01:54:00 2016 +0100

--
 .../apache/spark/sql/execution/BufferedRowIterator.java   | 10 ++
 .../spark/sql/execution/WholeStageCodegenExec.scala   |  2 +-
 .../main/scala/org/apache/spark/sql/execution/limit.scala |  6 +++---
 .../org/apache/spark/sql/DataFrameAggregateSuite.scala|  8 
 4 files changed, 22 insertions(+), 4 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/080ac37f/sql/core/src/main/java/org/apache/spark/sql/execution/BufferedRowIterator.java
--
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/execution/BufferedRowIterator.java
 
b/sql/core/src/main/java/org/apache/spark/sql/execution/BufferedRowIterator.java
index 086547c..730a4ae 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/execution/BufferedRowIterator.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/execution/BufferedRowIterator.java
@@ -70,6 +70,16 @@ public abstract class BufferedRowIterator {
   }
 
   /**
+   * Returns whether this iterator should stop fetching next row from 
[[CodegenSupport#inputRDDs]].
+   *
+   * If it returns true, the caller should exit the loop that [[InputAdapter]] 
generates.
+   * This interface is mainly used to limit the number of input rows.
+   */
+  protected boolean stopEarly() {
+return false;
+  }
+
+  /**
* Returns whether `processNext()` should stop processing next row from 
`input` or not.
*
* If it returns true, the caller should exit the loop (return from 
processNext()).

http://git-wip-us.apache.org/repos/asf/spark/blob/080ac37f/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
index fb57ed7..697db39 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
@@ -239,7 +239,7 @@ case class InputAdapter(child: SparkPlan) extends 
UnaryExecNode with CodegenSupp
 ctx.addMutableState("scala.collection.Iterator", input, s"$input = 
inputs[0];")
 val row = ctx.freshName("row")
 s"""
-   | while ($input.hasNext()) {
+   | while ($input.hasNext() && !stopEarly()) {
|   InternalRow $row = (InternalRow) $input.next();
|   ${consume(ctx, null, row).trim}
|   if (shouldStop()) return;

http://git-wip-us.apache.org/repos/asf/spark/blob/080ac37f/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala
---

spark git commit: [SPARK-18528][SQL] Fix a bug to initialise an iterator of aggregation buffer

2016-12-21 Thread hvanhovell
Repository: spark
Updated Branches:
  refs/heads/branch-2.1 60e02a173 -> 021952d58


[SPARK-18528][SQL] Fix a bug to initialise an iterator of aggregation buffer

## What changes were proposed in this pull request?
This pr is to fix an `NullPointerException` issue caused by a following `limit 
+ aggregate` query;
```
scala> val df = Seq(("a", 1), ("b", 2), ("c", 1), ("d", 5)).toDF("id", "value")
scala> df.limit(2).groupBy("id").count().show
WARN TaskSetManager: Lost task 0.0 in stage 9.0 (TID 8204, 
lvsp20hdn012.stubprod.com): java.lang.NullPointerException
at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithKeys$(Unknown
 Source)
at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
 Source)
```
The root culprit is that 
[`$doAgg()`](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala#L596)
 skips an initialization of [the buffer 
iterator](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala#L603);
 `BaseLimitExec` sets `stopEarly=true` and `$doAgg()` exits in the middle 
without the initialization.

## How was this patch tested?
Added a test to check if no exception happens for limit + aggregates in 
`DataFrameAggregateSuite.scala`.

Author: Takeshi YAMAMURO 

Closes #15980 from maropu/SPARK-18528.

(cherry picked from commit b41ec997786e2be42a8a2a182212a610d08b221b)
Signed-off-by: Herman van Hovell 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/021952d5
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/021952d5
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/021952d5

Branch: refs/heads/branch-2.1
Commit: 021952d5808715d0b9d6c716f8b67cd550f7982e
Parents: 60e02a1
Author: Takeshi YAMAMURO 
Authored: Thu Dec 22 01:53:33 2016 +0100
Committer: Herman van Hovell 
Committed: Thu Dec 22 01:53:44 2016 +0100

--
 .../apache/spark/sql/execution/BufferedRowIterator.java   | 10 ++
 .../spark/sql/execution/WholeStageCodegenExec.scala   |  2 +-
 .../main/scala/org/apache/spark/sql/execution/limit.scala |  6 +++---
 .../org/apache/spark/sql/DataFrameAggregateSuite.scala|  8 
 4 files changed, 22 insertions(+), 4 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/021952d5/sql/core/src/main/java/org/apache/spark/sql/execution/BufferedRowIterator.java
--
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/execution/BufferedRowIterator.java
 
b/sql/core/src/main/java/org/apache/spark/sql/execution/BufferedRowIterator.java
index 086547c..730a4ae 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/execution/BufferedRowIterator.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/execution/BufferedRowIterator.java
@@ -70,6 +70,16 @@ public abstract class BufferedRowIterator {
   }
 
   /**
+   * Returns whether this iterator should stop fetching next row from 
[[CodegenSupport#inputRDDs]].
+   *
+   * If it returns true, the caller should exit the loop that [[InputAdapter]] 
generates.
+   * This interface is mainly used to limit the number of input rows.
+   */
+  protected boolean stopEarly() {
+return false;
+  }
+
+  /**
* Returns whether `processNext()` should stop processing next row from 
`input` or not.
*
* If it returns true, the caller should exit the loop (return from 
processNext()).

http://git-wip-us.apache.org/repos/asf/spark/blob/021952d5/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
index 516b9d5..2ead8f6 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
@@ -241,7 +241,7 @@ case class InputAdapter(child: SparkPlan) extends 
UnaryExecNode with CodegenSupp
 ctx.addMutableState("scala.collection.Iterator", input, s"$input = 
inputs[0];")
 val row = ctx.freshName("row")
 s"""
-   | while ($input.hasNext()) {
+   | while ($input.hasNext() && !stopEarly()) {
|   InternalRow $row = (InternalRow) $input.next();
|   ${consume(ctx, null, row).trim}
|   if (shouldStop()) return;

http://git-wip-us.apache.org/repos/asf/spark/blob/021952d5/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala
---

spark git commit: [SPARK-18528][SQL] Fix a bug to initialise an iterator of aggregation buffer

2016-12-21 Thread hvanhovell
Repository: spark
Updated Branches:
  refs/heads/master 83a6ace0d -> b41ec9977


[SPARK-18528][SQL] Fix a bug to initialise an iterator of aggregation buffer

## What changes were proposed in this pull request?
This pr is to fix an `NullPointerException` issue caused by a following `limit 
+ aggregate` query;
```
scala> val df = Seq(("a", 1), ("b", 2), ("c", 1), ("d", 5)).toDF("id", "value")
scala> df.limit(2).groupBy("id").count().show
WARN TaskSetManager: Lost task 0.0 in stage 9.0 (TID 8204, 
lvsp20hdn012.stubprod.com): java.lang.NullPointerException
at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithKeys$(Unknown
 Source)
at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
 Source)
```
The root culprit is that 
[`$doAgg()`](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala#L596)
 skips an initialization of [the buffer 
iterator](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala#L603);
 `BaseLimitExec` sets `stopEarly=true` and `$doAgg()` exits in the middle 
without the initialization.

## How was this patch tested?
Added a test to check if no exception happens for limit + aggregates in 
`DataFrameAggregateSuite.scala`.

Author: Takeshi YAMAMURO 

Closes #15980 from maropu/SPARK-18528.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b41ec997
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b41ec997
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b41ec997

Branch: refs/heads/master
Commit: b41ec997786e2be42a8a2a182212a610d08b221b
Parents: 83a6ace
Author: Takeshi YAMAMURO 
Authored: Thu Dec 22 01:53:33 2016 +0100
Committer: Herman van Hovell 
Committed: Thu Dec 22 01:53:33 2016 +0100

--
 .../apache/spark/sql/execution/BufferedRowIterator.java   | 10 ++
 .../spark/sql/execution/WholeStageCodegenExec.scala   |  2 +-
 .../main/scala/org/apache/spark/sql/execution/limit.scala |  6 +++---
 .../org/apache/spark/sql/DataFrameAggregateSuite.scala|  8 
 4 files changed, 22 insertions(+), 4 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/b41ec997/sql/core/src/main/java/org/apache/spark/sql/execution/BufferedRowIterator.java
--
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/execution/BufferedRowIterator.java
 
b/sql/core/src/main/java/org/apache/spark/sql/execution/BufferedRowIterator.java
index 086547c..730a4ae 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/execution/BufferedRowIterator.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/execution/BufferedRowIterator.java
@@ -70,6 +70,16 @@ public abstract class BufferedRowIterator {
   }
 
   /**
+   * Returns whether this iterator should stop fetching next row from 
[[CodegenSupport#inputRDDs]].
+   *
+   * If it returns true, the caller should exit the loop that [[InputAdapter]] 
generates.
+   * This interface is mainly used to limit the number of input rows.
+   */
+  protected boolean stopEarly() {
+return false;
+  }
+
+  /**
* Returns whether `processNext()` should stop processing next row from 
`input` or not.
*
* If it returns true, the caller should exit the loop (return from 
processNext()).

http://git-wip-us.apache.org/repos/asf/spark/blob/b41ec997/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
index 516b9d5..2ead8f6 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
@@ -241,7 +241,7 @@ case class InputAdapter(child: SparkPlan) extends 
UnaryExecNode with CodegenSupp
 ctx.addMutableState("scala.collection.Iterator", input, s"$input = 
inputs[0];")
 val row = ctx.freshName("row")
 s"""
-   | while ($input.hasNext()) {
+   | while ($input.hasNext() && !stopEarly()) {
|   InternalRow $row = (InternalRow) $input.next();
|   ${consume(ctx, null, row).trim}
|   if (shouldStop()) return;

http://git-wip-us.apache.org/repos/asf/spark/blob/b41ec997/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala
--
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala 
b/sql/c

spark git commit: [SPARK-18234][SS] Made update mode public

2016-12-21 Thread tdas
Repository: spark
Updated Branches:
  refs/heads/branch-2.1 17ef57fe8 -> 60e02a173


[SPARK-18234][SS] Made update mode public

## What changes were proposed in this pull request?

Made update mode public. As part of that here are the changes.
- Update DatastreamWriter to accept "update"
- Changed package of InternalOutputModes from o.a.s.sql to o.a.s.sql.catalyst
- Added update mode state removing with watermark to StateStoreSaveExec

## How was this patch tested?

Added new tests in changed modules

Author: Tathagata Das 

Closes #16360 from tdas/SPARK-18234.

(cherry picked from commit 83a6ace0d1be44f70e768348ae6688798c84343e)
Signed-off-by: Tathagata Das 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/60e02a17
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/60e02a17
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/60e02a17

Branch: refs/heads/branch-2.1
Commit: 60e02a173ddf335d58852e56611131ec4409ae8b
Parents: 17ef57f
Author: Tathagata Das 
Authored: Wed Dec 21 16:43:17 2016 -0800
Committer: Tathagata Das 
Committed: Wed Dec 21 16:43:25 2016 -0800

--
 .../apache/spark/sql/streaming/OutputMode.java  |  12 +-
 .../apache/spark/sql/InternalOutputModes.scala  |  47 ---
 .../analysis/UnsupportedOperationChecker.scala  |   3 +-
 .../streaming/InternalOutputModes.scala |  47 +++
 .../analysis/UnsupportedOperationsSuite.scala   |   2 +-
 .../sql/execution/datasources/DataSource.scala  |   2 +-
 .../execution/streaming/StatefulAggregate.scala |  61 ++--
 .../spark/sql/execution/streaming/memory.scala  |   5 +-
 .../spark/sql/streaming/DataStreamWriter.scala  |  17 +-
 .../execution/streaming/MemorySinkSuite.scala   | 287 +++
 .../sql/streaming/EventTimeWatermarkSuite.scala |  55 +++-
 .../sql/streaming/FileStreamSinkSuite.scala |  22 +-
 .../sql/streaming/FileStreamSourceSuite.scala   |   2 +-
 .../spark/sql/streaming/MemorySinkSuite.scala   | 274 --
 .../spark/sql/streaming/StreamSuite.scala   |   8 +-
 .../streaming/StreamingAggregationSuite.scala   |   2 +-
 .../test/DataStreamReaderWriterSuite.scala  |  38 ++-
 17 files changed, 507 insertions(+), 377 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/60e02a17/sql/catalyst/src/main/java/org/apache/spark/sql/streaming/OutputMode.java
--
diff --git 
a/sql/catalyst/src/main/java/org/apache/spark/sql/streaming/OutputMode.java 
b/sql/catalyst/src/main/java/org/apache/spark/sql/streaming/OutputMode.java
index a515c1a..cf0579f 100644
--- a/sql/catalyst/src/main/java/org/apache/spark/sql/streaming/OutputMode.java
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/streaming/OutputMode.java
@@ -19,7 +19,7 @@ package org.apache.spark.sql.streaming;
 
 import org.apache.spark.annotation.Experimental;
 import org.apache.spark.annotation.InterfaceStability;
-import org.apache.spark.sql.InternalOutputModes;
+import org.apache.spark.sql.catalyst.streaming.InternalOutputModes;
 
 /**
  * :: Experimental ::
@@ -54,4 +54,14 @@ public class OutputMode {
   public static OutputMode Complete() {
 return InternalOutputModes.Complete$.MODULE$;
   }
+
+  /**
+   * OutputMode in which only the rows that were updated in the streaming 
DataFrame/Dataset will
+   * be written to the sink every time there are some updates.
+   *
+   * @since 2.1.1
+   */
+  public static OutputMode Update() {
+return InternalOutputModes.Update$.MODULE$;
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/60e02a17/sql/catalyst/src/main/scala/org/apache/spark/sql/InternalOutputModes.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/InternalOutputModes.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/InternalOutputModes.scala
deleted file mode 100644
index 594c41c..000
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/InternalOutputModes.scala
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permis

spark git commit: [SPARK-18234][SS] Made update mode public

2016-12-21 Thread tdas
Repository: spark
Updated Branches:
  refs/heads/master afd9bc1d8 -> 83a6ace0d


[SPARK-18234][SS] Made update mode public

## What changes were proposed in this pull request?

Made update mode public. As part of that here are the changes.
- Update DatastreamWriter to accept "update"
- Changed package of InternalOutputModes from o.a.s.sql to o.a.s.sql.catalyst
- Added update mode state removing with watermark to StateStoreSaveExec

## How was this patch tested?

Added new tests in changed modules

Author: Tathagata Das 

Closes #16360 from tdas/SPARK-18234.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/83a6ace0
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/83a6ace0
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/83a6ace0

Branch: refs/heads/master
Commit: 83a6ace0d1be44f70e768348ae6688798c84343e
Parents: afd9bc1
Author: Tathagata Das 
Authored: Wed Dec 21 16:43:17 2016 -0800
Committer: Tathagata Das 
Committed: Wed Dec 21 16:43:17 2016 -0800

--
 .../apache/spark/sql/streaming/OutputMode.java  |  12 +-
 .../apache/spark/sql/InternalOutputModes.scala  |  47 ---
 .../analysis/UnsupportedOperationChecker.scala  |   3 +-
 .../streaming/InternalOutputModes.scala |  47 +++
 .../analysis/UnsupportedOperationsSuite.scala   |   2 +-
 .../sql/execution/datasources/DataSource.scala  |   2 +-
 .../execution/streaming/StatefulAggregate.scala |  61 ++--
 .../spark/sql/execution/streaming/memory.scala  |   5 +-
 .../spark/sql/streaming/DataStreamWriter.scala  |  17 +-
 .../execution/streaming/MemorySinkSuite.scala   | 287 +++
 .../sql/streaming/EventTimeWatermarkSuite.scala |  55 +++-
 .../sql/streaming/FileStreamSinkSuite.scala |  22 +-
 .../sql/streaming/FileStreamSourceSuite.scala   |   2 +-
 .../spark/sql/streaming/MemorySinkSuite.scala   | 274 --
 .../spark/sql/streaming/StreamSuite.scala   |   8 +-
 .../streaming/StreamingAggregationSuite.scala   |   2 +-
 .../test/DataStreamReaderWriterSuite.scala  |  38 ++-
 17 files changed, 507 insertions(+), 377 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/83a6ace0/sql/catalyst/src/main/java/org/apache/spark/sql/streaming/OutputMode.java
--
diff --git 
a/sql/catalyst/src/main/java/org/apache/spark/sql/streaming/OutputMode.java 
b/sql/catalyst/src/main/java/org/apache/spark/sql/streaming/OutputMode.java
index a515c1a..cf0579f 100644
--- a/sql/catalyst/src/main/java/org/apache/spark/sql/streaming/OutputMode.java
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/streaming/OutputMode.java
@@ -19,7 +19,7 @@ package org.apache.spark.sql.streaming;
 
 import org.apache.spark.annotation.Experimental;
 import org.apache.spark.annotation.InterfaceStability;
-import org.apache.spark.sql.InternalOutputModes;
+import org.apache.spark.sql.catalyst.streaming.InternalOutputModes;
 
 /**
  * :: Experimental ::
@@ -54,4 +54,14 @@ public class OutputMode {
   public static OutputMode Complete() {
 return InternalOutputModes.Complete$.MODULE$;
   }
+
+  /**
+   * OutputMode in which only the rows that were updated in the streaming 
DataFrame/Dataset will
+   * be written to the sink every time there are some updates.
+   *
+   * @since 2.1.1
+   */
+  public static OutputMode Update() {
+return InternalOutputModes.Update$.MODULE$;
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/83a6ace0/sql/catalyst/src/main/scala/org/apache/spark/sql/InternalOutputModes.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/InternalOutputModes.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/InternalOutputModes.scala
deleted file mode 100644
index 594c41c..000
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/InternalOutputModes.scala
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql
-
-import org.apache.spark

spark git commit: [SPARK-17807][CORE] split test-tags into test-JAR

2016-12-21 Thread vanzin
Repository: spark
Updated Branches:
  refs/heads/master 95efc895e -> afd9bc1d8


[SPARK-17807][CORE] split test-tags into test-JAR

Remove spark-tag's compile-scope dependency (and, indirectly, spark-core's 
compile-scope transitive-dependency) on scalatest by splitting test-oriented 
tags into spark-tags' test JAR.

Alternative to #16303.

Author: Ryan Williams 

Closes #16311 from ryan-williams/tt.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/afd9bc1d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/afd9bc1d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/afd9bc1d

Branch: refs/heads/master
Commit: afd9bc1d8a85adf88c412d8bc75e46e7ecb4bcdd
Parents: 95efc89
Author: Ryan Williams 
Authored: Wed Dec 21 16:37:20 2016 -0800
Committer: Marcelo Vanzin 
Committed: Wed Dec 21 16:37:20 2016 -0800

--
 common/network-common/pom.xml   | 12 +
 common/network-shuffle/pom.xml  | 12 +
 common/network-yarn/pom.xml | 11 
 common/sketch/pom.xml   | 12 +
 common/tags/pom.xml |  8 --
 .../java/org/apache/spark/tags/DockerTest.java  | 26 ---
 .../org/apache/spark/tags/ExtendedHiveTest.java | 27 
 .../org/apache/spark/tags/ExtendedYarnTest.java | 27 
 .../java/org/apache/spark/tags/DockerTest.java  | 26 +++
 .../org/apache/spark/tags/ExtendedHiveTest.java | 27 
 .../org/apache/spark/tags/ExtendedYarnTest.java | 27 
 common/unsafe/pom.xml   | 12 +
 core/pom.xml| 12 +
 external/docker-integration-tests/pom.xml   |  2 +-
 external/flume-sink/pom.xml | 12 +
 external/flume/pom.xml  | 12 +
 external/java8-tests/pom.xml| 12 +
 external/kafka-0-10-sql/pom.xml | 12 +
 external/kafka-0-10/pom.xml | 12 +
 external/kafka-0-8/pom.xml  | 12 +
 external/kinesis-asl/pom.xml| 12 +
 graphx/pom.xml  | 12 +
 launcher/pom.xml| 11 
 mllib-local/pom.xml | 12 +
 mllib/pom.xml   | 12 +
 pom.xml |  6 +
 repl/pom.xml| 12 +
 resource-managers/yarn/pom.xml  |  2 ++
 sql/catalyst/pom.xml| 12 +
 sql/core/pom.xml| 12 +
 sql/hive-thriftserver/pom.xml   | 12 +
 sql/hive/pom.xml|  2 ++
 streaming/pom.xml   | 11 
 33 files changed, 352 insertions(+), 89 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/afd9bc1d/common/network-common/pom.xml
--
diff --git a/common/network-common/pom.xml b/common/network-common/pom.xml
index b63c0ca..8657af7 100644
--- a/common/network-common/pom.xml
+++ b/common/network-common/pom.xml
@@ -91,6 +91,18 @@
   org.apache.spark
   spark-tags_${scala.binary.version}
 
+
+
+
+  org.apache.spark
+  spark-tags_${scala.binary.version}
+  test-jar
+  test
+
+
 
   org.mockito
   mockito-core

http://git-wip-us.apache.org/repos/asf/spark/blob/afd9bc1d/common/network-shuffle/pom.xml
--
diff --git a/common/network-shuffle/pom.xml b/common/network-shuffle/pom.xml
index 5fc92af..24c10fb 100644
--- a/common/network-shuffle/pom.xml
+++ b/common/network-shuffle/pom.xml
@@ -70,6 +70,18 @@
   org.apache.spark
   spark-tags_${scala.binary.version}
 
+
+
+
+  org.apache.spark
+  spark-tags_${scala.binary.version}
+  test-jar
+  test
+
+
 
   log4j
   log4j

http://git-wip-us.apache.org/repos/asf/spark/blob/afd9bc1d/common/network-yarn/pom.xml
--
diff --git a/common/network-yarn/pom.xml b/common/network-yarn/pom.xml
index 9fcd636..5e5a80b 100644
--- a/common/network-yarn/pom.xml
+++ b/common/network-yarn/pom.xml
@@ -50,6 +50,17 @@
   spark-tags_${scala.binary.version}
 
 
+
+
+  org.apache.spark
+  spark-tags_${scala.binary.version}
+  test-jar
+  test
+
+
 
 
   org.apache.hadoop

http://git-wip-us.apache.org/repos/asf/spark/

spark git commit: [SPARK-18588][SS][KAFKA] Create a new KafkaConsumer when error happens to fix the flaky test

2016-12-21 Thread tdas
Repository: spark
Updated Branches:
  refs/heads/branch-2.1 0e51bb085 -> 17ef57fe8


[SPARK-18588][SS][KAFKA] Create a new KafkaConsumer when error happens to fix 
the flaky test

## What changes were proposed in this pull request?

When KafkaSource fails on Kafka errors, we should create a new consumer to 
retry rather than using the existing broken one because it's possible that the 
broken one will fail again.

This PR also assigns a new group id to the new created consumer for a possible 
race condition:  the broken consumer cannot talk with the Kafka cluster in 
`close` but the new consumer can talk to Kafka cluster. I'm not sure if this 
will happen or not. Just for safety to avoid that the Kafka cluster thinks 
there are two consumers with the same group id in a short time window. (Note: 
CachedKafkaConsumer doesn't need this fix since `assign` never uses the group 
id.)

## How was this patch tested?

In 
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/70370/console
 , it ran this flaky test 120 times and all passed.

Author: Shixiong Zhu 

Closes #16282 from zsxwing/kafka-fix.

(cherry picked from commit 95efc895e929701a605313b87ad0cd91edee2f81)
Signed-off-by: Tathagata Das 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/17ef57fe
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/17ef57fe
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/17ef57fe

Branch: refs/heads/branch-2.1
Commit: 17ef57fe8dab7616200fdd9c00ff29f716459321
Parents: 0e51bb0
Author: Shixiong Zhu 
Authored: Wed Dec 21 15:39:36 2016 -0800
Committer: Tathagata Das 
Committed: Wed Dec 21 15:39:54 2016 -0800

--
 dev/sparktestsupport/modules.py |  3 +-
 .../apache/spark/sql/kafka010/KafkaSource.scala | 58 ++--
 .../sql/kafka010/KafkaSourceProvider.scala  | 21 +++
 .../spark/sql/kafka010/KafkaSourceSuite.scala   |  2 +-
 4 files changed, 52 insertions(+), 32 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/17ef57fe/dev/sparktestsupport/modules.py
--
diff --git a/dev/sparktestsupport/modules.py b/dev/sparktestsupport/modules.py
index b34ab51..0cf078c 100644
--- a/dev/sparktestsupport/modules.py
+++ b/dev/sparktestsupport/modules.py
@@ -245,7 +245,8 @@ streaming_kafka_0_10 = Module(
 name="streaming-kafka-0-10",
 dependencies=[streaming],
 source_file_regexes=[
-"external/kafka-0-10",
+# The ending "/" is necessary otherwise it will include "sql-kafka" 
codes
+"external/kafka-0-10/",
 "external/kafka-0-10-assembly",
 ],
 sbt_test_goals=[

http://git-wip-us.apache.org/repos/asf/spark/blob/17ef57fe/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
--
diff --git 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
index 92ee0ed..43b8d9d 100644
--- 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
+++ 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
@@ -24,7 +24,7 @@ import java.nio.charset.StandardCharsets
 import scala.collection.JavaConverters._
 import scala.util.control.NonFatal
 
-import org.apache.kafka.clients.consumer.{Consumer, KafkaConsumer, 
OffsetOutOfRangeException}
+import org.apache.kafka.clients.consumer.{Consumer, ConsumerConfig, 
KafkaConsumer, OffsetOutOfRangeException}
 import 
org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener
 import org.apache.kafka.common.TopicPartition
 
@@ -81,14 +81,16 @@ import org.apache.spark.util.UninterruptibleThread
  * To avoid this issue, you should make sure stopping the query before 
stopping the Kafka brokers
  * and not use wrong broker addresses.
  */
-private[kafka010] case class KafkaSource(
+private[kafka010] class KafkaSource(
 sqlContext: SQLContext,
 consumerStrategy: ConsumerStrategy,
+driverKafkaParams: ju.Map[String, Object],
 executorKafkaParams: ju.Map[String, Object],
 sourceOptions: Map[String, String],
 metadataPath: String,
 startingOffsets: StartingOffsets,
-failOnDataLoss: Boolean)
+failOnDataLoss: Boolean,
+driverGroupIdPrefix: String)
   extends Source with Logging {
 
   private val sc = sqlContext.sparkContext
@@ -107,11 +109,31 @@ private[kafka010] case class KafkaSource(
   private val maxOffsetsPerTrigger =
 sourceOptions.get("maxOffsetsPerTrigger").map(_.toLong)
 
+  private var groupId: String = null
+
+  private var nextId = 0
+
+  private def nextGroupId(): String = {
+

spark git commit: [SPARK-18588][SS][KAFKA] Create a new KafkaConsumer when error happens to fix the flaky test

2016-12-21 Thread tdas
Repository: spark
Updated Branches:
  refs/heads/master 354e93618 -> 95efc895e


[SPARK-18588][SS][KAFKA] Create a new KafkaConsumer when error happens to fix 
the flaky test

## What changes were proposed in this pull request?

When KafkaSource fails on Kafka errors, we should create a new consumer to 
retry rather than using the existing broken one because it's possible that the 
broken one will fail again.

This PR also assigns a new group id to the new created consumer for a possible 
race condition:  the broken consumer cannot talk with the Kafka cluster in 
`close` but the new consumer can talk to Kafka cluster. I'm not sure if this 
will happen or not. Just for safety to avoid that the Kafka cluster thinks 
there are two consumers with the same group id in a short time window. (Note: 
CachedKafkaConsumer doesn't need this fix since `assign` never uses the group 
id.)

## How was this patch tested?

In 
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/70370/console
 , it ran this flaky test 120 times and all passed.

Author: Shixiong Zhu 

Closes #16282 from zsxwing/kafka-fix.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/95efc895
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/95efc895
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/95efc895

Branch: refs/heads/master
Commit: 95efc895e929701a605313b87ad0cd91edee2f81
Parents: 354e936
Author: Shixiong Zhu 
Authored: Wed Dec 21 15:39:36 2016 -0800
Committer: Tathagata Das 
Committed: Wed Dec 21 15:39:36 2016 -0800

--
 dev/sparktestsupport/modules.py |  3 +-
 .../apache/spark/sql/kafka010/KafkaSource.scala | 58 ++--
 .../sql/kafka010/KafkaSourceProvider.scala  | 21 +++
 .../spark/sql/kafka010/KafkaSourceSuite.scala   |  2 +-
 4 files changed, 52 insertions(+), 32 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/95efc895/dev/sparktestsupport/modules.py
--
diff --git a/dev/sparktestsupport/modules.py b/dev/sparktestsupport/modules.py
index 1a7cf9a..10ad1fe 100644
--- a/dev/sparktestsupport/modules.py
+++ b/dev/sparktestsupport/modules.py
@@ -245,7 +245,8 @@ streaming_kafka_0_10 = Module(
 name="streaming-kafka-0-10",
 dependencies=[streaming],
 source_file_regexes=[
-"external/kafka-0-10",
+# The ending "/" is necessary otherwise it will include "sql-kafka" 
codes
+"external/kafka-0-10/",
 "external/kafka-0-10-assembly",
 ],
 sbt_test_goals=[

http://git-wip-us.apache.org/repos/asf/spark/blob/95efc895/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
--
diff --git 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
index 92ee0ed..43b8d9d 100644
--- 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
+++ 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
@@ -24,7 +24,7 @@ import java.nio.charset.StandardCharsets
 import scala.collection.JavaConverters._
 import scala.util.control.NonFatal
 
-import org.apache.kafka.clients.consumer.{Consumer, KafkaConsumer, 
OffsetOutOfRangeException}
+import org.apache.kafka.clients.consumer.{Consumer, ConsumerConfig, 
KafkaConsumer, OffsetOutOfRangeException}
 import 
org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener
 import org.apache.kafka.common.TopicPartition
 
@@ -81,14 +81,16 @@ import org.apache.spark.util.UninterruptibleThread
  * To avoid this issue, you should make sure stopping the query before 
stopping the Kafka brokers
  * and not use wrong broker addresses.
  */
-private[kafka010] case class KafkaSource(
+private[kafka010] class KafkaSource(
 sqlContext: SQLContext,
 consumerStrategy: ConsumerStrategy,
+driverKafkaParams: ju.Map[String, Object],
 executorKafkaParams: ju.Map[String, Object],
 sourceOptions: Map[String, String],
 metadataPath: String,
 startingOffsets: StartingOffsets,
-failOnDataLoss: Boolean)
+failOnDataLoss: Boolean,
+driverGroupIdPrefix: String)
   extends Source with Logging {
 
   private val sc = sqlContext.sparkContext
@@ -107,11 +109,31 @@ private[kafka010] case class KafkaSource(
   private val maxOffsetsPerTrigger =
 sourceOptions.get("maxOffsetsPerTrigger").map(_.toLong)
 
+  private var groupId: String = null
+
+  private var nextId = 0
+
+  private def nextGroupId(): String = {
+groupId = driverGroupIdPrefix + "-" + nextId
+nextId += 1
+groupId
+  }
+
   /**
* A KafkaConsum

spark git commit: [SPARK-18775][SQL] Limit the max number of records written per file

2016-12-21 Thread hvanhovell
Repository: spark
Updated Branches:
  refs/heads/master 078c71c2d -> 354e93618


[SPARK-18775][SQL] Limit the max number of records written per file

## What changes were proposed in this pull request?
Currently, Spark writes a single file out per task, sometimes leading to very 
large files. It would be great to have an option to limit the max number of 
records written per file in a task, to avoid humongous files.

This patch introduces a new write config option `maxRecordsPerFile` (default to 
a session-wide setting `spark.sql.files.maxRecordsPerFile`) that limits the max 
number of records written to a single file. A non-positive value indicates 
there is no limit (same behavior as not having this flag).

## How was this patch tested?
Added test cases in PartitionedWriteSuite for both dynamic partition insert and 
non-dynamic partition insert.

Author: Reynold Xin 

Closes #16204 from rxin/SPARK-18775.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/354e9361
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/354e9361
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/354e9361

Branch: refs/heads/master
Commit: 354e936187708a404c0349e3d8815a47953123ec
Parents: 078c71c
Author: Reynold Xin 
Authored: Wed Dec 21 23:50:35 2016 +0100
Committer: Herman van Hovell 
Committed: Wed Dec 21 23:50:35 2016 +0100

--
 .../datasources/FileFormatWriter.scala  | 109 ++-
 .../org/apache/spark/sql/internal/SQLConf.scala |  26 +++--
 .../datasources/BucketingUtilsSuite.scala   |  46 
 .../sql/sources/PartitionedWriteSuite.scala |  37 +++
 4 files changed, 179 insertions(+), 39 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/354e9361/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
index d560ad5..1eb4541 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
@@ -31,13 +31,12 @@ import org.apache.spark._
 import org.apache.spark.internal.Logging
 import org.apache.spark.internal.io.{FileCommitProtocol, 
SparkHadoopWriterUtils}
 import org.apache.spark.internal.io.FileCommitProtocol.TaskCommitMessage
-import org.apache.spark.sql.{Dataset, SparkSession}
+import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.catalog.{BucketSpec, ExternalCatalogUtils}
 import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning
 import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution.{QueryExecution, SQLExecution, 
UnsafeKVExternalSorter}
 import org.apache.spark.sql.types.{IntegerType, StringType, StructField, 
StructType}
 import org.apache.spark.util.{SerializableConfiguration, Utils}
@@ -47,6 +46,13 @@ import 
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter
 /** A helper object for writing FileFormat data out to a location. */
 object FileFormatWriter extends Logging {
 
+  /**
+   * Max number of files a single task writes out due to file size. In most 
cases the number of
+   * files written should be very small. This is just a safe guard to protect 
some really bad
+   * settings, e.g. maxRecordsPerFile = 1.
+   */
+  private val MAX_FILE_COUNTER = 1000 * 1000
+
   /** Describes how output files should be placed in the filesystem. */
   case class OutputSpec(
 outputPath: String, customPartitionLocations: Map[TablePartitionSpec, 
String])
@@ -61,7 +67,8 @@ object FileFormatWriter extends Logging {
   val nonPartitionColumns: Seq[Attribute],
   val bucketSpec: Option[BucketSpec],
   val path: String,
-  val customPartitionLocations: Map[TablePartitionSpec, String])
+  val customPartitionLocations: Map[TablePartitionSpec, String],
+  val maxRecordsPerFile: Long)
 extends Serializable {
 
 assert(AttributeSet(allColumns) == AttributeSet(partitionColumns ++ 
nonPartitionColumns),
@@ -116,7 +123,10 @@ object FileFormatWriter extends Logging {
   nonPartitionColumns = dataColumns,
   bucketSpec = bucketSpec,
   path = outputSpec.outputPath,
-  customPartitionLocations = outputSpec.customPartitionLocations)
+  customPartitionLocations = outputSpec.customPartitionLocations,
+  maxRecordsPer

spark git commit: [SPARK-18949][SQL][BACKPORT-2.1] Add recoverPartitions API to Catalog

2016-12-21 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/branch-2.1 318483421 -> 0e51bb085


[SPARK-18949][SQL][BACKPORT-2.1] Add recoverPartitions API to Catalog

### What changes were proposed in this pull request?

This PR is to backport https://github.com/apache/spark/pull/16356 to Spark 
2.1.1 branch.



Currently, we only have a SQL interface for recovering all the partitions in 
the directory of a table and update the catalog. `MSCK REPAIR TABLE` or `ALTER 
TABLE table RECOVER PARTITIONS`. (Actually, very hard for me to remember `MSCK` 
and have no clue what it means)

After the new "Scalable Partition Handling", the table repair becomes much more 
important for making visible the data in the created data source partitioned 
table.

Thus, this PR is to add it into the Catalog interface. After this PR, users can 
repair the table by
```Scala
spark.catalog.recoverPartitions("testTable")
```

### How was this patch tested?
Modified the existing test cases.

Author: gatorsmile 

Closes #16372 from gatorsmile/repairTable2.1.1.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0e51bb08
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0e51bb08
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0e51bb08

Branch: refs/heads/branch-2.1
Commit: 0e51bb085446a482c22eaef93aea513610f41f48
Parents: 3184834
Author: gatorsmile 
Authored: Wed Dec 21 13:55:40 2016 -0800
Committer: Reynold Xin 
Committed: Wed Dec 21 13:55:40 2016 -0800

--
 project/MimaExcludes.scala|  5 -
 python/pyspark/sql/catalog.py |  5 +
 .../scala/org/apache/spark/sql/catalog/Catalog.scala  |  7 +++
 .../org/apache/spark/sql/internal/CatalogImpl.scala   | 14 ++
 .../hive/PartitionProviderCompatibilitySuite.scala|  6 +++---
 5 files changed, 33 insertions(+), 4 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/0e51bb08/project/MimaExcludes.scala
--
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index 978a328..6d1b4d2 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -110,7 +110,10 @@ object MimaExcludes {
   
ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.sql.streaming.StreamingQueryException.startOffset"),
   
ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.sql.streaming.StreamingQueryException.endOffset"),
   
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.sql.streaming.StreamingQueryException.this"),
-  
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.streaming.StreamingQueryException.query")
+  
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.streaming.StreamingQueryException.query"),
+
+  // [SPARK-18949] [SQL] Add repairTable API to Catalog
+  
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.catalog.Catalog.recoverPartitions")
 )
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/0e51bb08/python/pyspark/sql/catalog.py
--
diff --git a/python/pyspark/sql/catalog.py b/python/pyspark/sql/catalog.py
index a36d02e..30c7a3f 100644
--- a/python/pyspark/sql/catalog.py
+++ b/python/pyspark/sql/catalog.py
@@ -258,6 +258,11 @@ class Catalog(object):
 """Invalidate and refresh all the cached metadata of the given 
table."""
 self._jcatalog.refreshTable(tableName)
 
+@since('2.1.1')
+def recoverPartitions(self, tableName):
+"""Recover all the partitions of the given table and update the 
catalog."""
+self._jcatalog.recoverPartitions(tableName)
+
 def _reset(self):
 """(Internal use only) Drop all existing databases (except "default"), 
tables,
 partitions and functions, and set the current database to "default".

http://git-wip-us.apache.org/repos/asf/spark/blob/0e51bb08/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala
--
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala
index aecdda1..6b061f8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala
@@ -301,6 +301,13 @@ abstract class Catalog {
   def dropGlobalTempView(viewName: String): Boolean
 
   /**
+   * Recover all the partitions in the directory of a table and update the 
catalog.
+   *
+   * @since 2.1.1
+   */
+  def recoverPartitions(tableName: String): Unit
+
+  /**
* Returns true 

spark git commit: [SPARK-18700][SQL][BACKPORT-2.0] Add StripedLock for each table's relation in cache

2016-12-21 Thread hvanhovell
Repository: spark
Updated Branches:
  refs/heads/branch-2.0 5f8c0b742 -> 53cd99f65


[SPARK-18700][SQL][BACKPORT-2.0] Add StripedLock for each table's relation in 
cache

## What changes were proposed in this pull request?

Backport of #16135 to branch-2.0

## How was this patch tested?

Because of the diff between branch-2.0 and master/2.1, here add a multi-thread 
access table test in `HiveMetadataCacheSuite` and check it only loading once 
using metrics in `HiveCatalogMetrics`

Author: xuanyuanking 

Closes #16350 from xuanyuanking/SPARK-18700-2.0.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/53cd99f6
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/53cd99f6
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/53cd99f6

Branch: refs/heads/branch-2.0
Commit: 53cd99f65667c4d49db000101460a9d266f199e8
Parents: 5f8c0b7
Author: xuanyuanking 
Authored: Wed Dec 21 22:55:42 2016 +0100
Committer: Herman van Hovell 
Committed: Wed Dec 21 22:55:42 2016 +0100

--
 .../spark/sql/hive/HiveMetastoreCatalog.scala   | 147 +++
 1 file changed, 82 insertions(+), 65 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/53cd99f6/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
--
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index e7d1ed3..670400f 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.hive
 import scala.collection.JavaConverters._
 
 import com.google.common.cache.{CacheBuilder, CacheLoader, LoadingCache}
+import com.google.common.util.concurrent.Striped
 import org.apache.hadoop.fs.Path
 
 import org.apache.spark.internal.Logging
@@ -65,6 +66,18 @@ private[hive] class HiveMetastoreCatalog(sparkSession: 
SparkSession) extends Log
   t.identifier.table.toLowerCase)
   }
 
+  /** These locks guard against multiple attempts to instantiate a table, 
which wastes memory. */
+  private val tableCreationLocks = Striped.lazyWeakLock(100)
+
+  /** Acquires a lock on the table cache for the duration of `f`. */
+  private def withTableCreationLock[A](tableName: QualifiedTableName, f: => 
A): A = {
+val lock = tableCreationLocks.get(tableName)
+lock.lock()
+try f finally {
+  lock.unlock()
+}
+  }
+
   /** A cache of Spark SQL data source tables that have been accessed. */
   protected[hive] val cachedDataSourceTables: LoadingCache[QualifiedTableName, 
LogicalPlan] = {
 val cacheLoader = new CacheLoader[QualifiedTableName, LogicalPlan]() {
@@ -274,77 +287,81 @@ private[hive] class HiveMetastoreCatalog(sparkSession: 
SparkSession) extends Log
 partitionPaths
   }
 
-  val cached = getCached(
-tableIdentifier,
-paths,
-metastoreRelation,
-metastoreSchema,
-fileFormatClass,
-bucketSpec,
-Some(partitionSpec))
-
-  val hadoopFsRelation = cached.getOrElse {
-val fileCatalog = new MetaStorePartitionedTableFileCatalog(
-  sparkSession,
-  new Path(metastoreRelation.catalogTable.storage.locationUri.get),
-  partitionSpec)
-
-val inferredSchema = if (fileType.equals("parquet")) {
-  val inferredSchema =
-defaultSource.inferSchema(sparkSession, options, 
fileCatalog.allFiles())
-  inferredSchema.map { inferred =>
-ParquetFileFormat.mergeMetastoreParquetSchema(metastoreSchema, 
inferred)
-  }.getOrElse(metastoreSchema)
-} else {
-  defaultSource.inferSchema(sparkSession, options, 
fileCatalog.allFiles()).get
-}
+  withTableCreationLock(tableIdentifier, {
+val cached = getCached(
+  tableIdentifier,
+  paths,
+  metastoreRelation,
+  metastoreSchema,
+  fileFormatClass,
+  bucketSpec,
+  Some(partitionSpec))
+
+val hadoopFsRelation = cached.getOrElse {
+  val fileCatalog = new MetaStorePartitionedTableFileCatalog(
+sparkSession,
+new Path(metastoreRelation.catalogTable.storage.locationUri.get),
+partitionSpec)
+
+  val inferredSchema = if (fileType.equals("parquet")) {
+val inferredSchema =
+  defaultSource.inferSchema(sparkSession, options, 
fileCatalog.allFiles())
+inferredSchema.map { inferred =>
+  ParquetFileFormat.mergeMetastoreParquetSchema(metastoreSchema, 
inferred)
+}.getOrElse(metastoreSchema)
+   

spark git commit: [SPARK-18954][TESTS] Fix flaky test: o.a.s.streaming.BasicOperationsSuite rdd cleanup - map and window

2016-12-21 Thread zsxwing
Repository: spark
Updated Branches:
  refs/heads/branch-2.0 ef206ace2 -> 5f8c0b742


[SPARK-18954][TESTS] Fix flaky test: o.a.s.streaming.BasicOperationsSuite rdd 
cleanup - map and window

## What changes were proposed in this pull request?

The issue in this test is the cleanup of RDDs may not be able to finish before 
stopping StreamingContext. This PR basically just puts the assertions into 
`eventually` and runs it before stopping StreamingContext.

## How was this patch tested?

Jenkins

Author: Shixiong Zhu 

Closes #16362 from zsxwing/SPARK-18954.

(cherry picked from commit 078c71c2dcbb1470d22f8eb8138fb17e3d7c2414)
Signed-off-by: Shixiong Zhu 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5f8c0b74
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5f8c0b74
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5f8c0b74

Branch: refs/heads/branch-2.0
Commit: 5f8c0b742d9a6bc82dd257075ba4c58b1257a476
Parents: ef206ac
Author: Shixiong Zhu 
Authored: Wed Dec 21 11:59:21 2016 -0800
Committer: Shixiong Zhu 
Committed: Wed Dec 21 11:59:38 2016 -0800

--
 .../spark/streaming/BasicOperationsSuite.scala  | 98 
 .../apache/spark/streaming/TestSuiteBase.scala  | 19 +++-
 2 files changed, 73 insertions(+), 44 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/5f8c0b74/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
--
diff --git 
a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
 
b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
index cfcbdc7..4a925fc 100644
--- 
a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
+++ 
b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
@@ -19,13 +19,13 @@ package org.apache.spark.streaming
 
 import java.util.concurrent.ConcurrentLinkedQueue
 
-import scala.collection.JavaConverters._
 import scala.collection.mutable
 import scala.language.existentials
 import scala.reflect.ClassTag
 
+import org.scalatest.concurrent.Eventually.eventually
+
 import org.apache.spark.{SparkConf, SparkException}
-import org.apache.spark.SparkContext._
 import org.apache.spark.rdd.{BlockRDD, RDD}
 import org.apache.spark.storage.StorageLevel
 import org.apache.spark.streaming.dstream.{DStream, WindowedDStream}
@@ -591,48 +591,57 @@ class BasicOperationsSuite extends TestSuiteBase {
.window(Seconds(4), Seconds(2))
 }
 
-val operatedStream = runCleanupTest(conf, operation _,
-  numExpectedOutput = cleanupTestInput.size / 2, rememberDuration = 
Seconds(3))
-val windowedStream2 = operatedStream.asInstanceOf[WindowedDStream[_]]
-val windowedStream1 = 
windowedStream2.dependencies.head.asInstanceOf[WindowedDStream[_]]
-val mappedStream = windowedStream1.dependencies.head
-
-// Checkpoint remember durations
-assert(windowedStream2.rememberDuration === rememberDuration)
-assert(windowedStream1.rememberDuration === rememberDuration + 
windowedStream2.windowDuration)
-assert(mappedStream.rememberDuration ===
-  rememberDuration + windowedStream2.windowDuration + 
windowedStream1.windowDuration)
-
-// WindowedStream2 should remember till 7 seconds: 10, 9, 8, 7
-// WindowedStream1 should remember till 4 seconds: 10, 9, 8, 7, 6, 5, 4
-// MappedStream should remember till 2 seconds:10, 9, 8, 7, 6, 5, 4, 
3, 2
-
-// WindowedStream2
-assert(windowedStream2.generatedRDDs.contains(Time(1)))
-assert(windowedStream2.generatedRDDs.contains(Time(8000)))
-assert(!windowedStream2.generatedRDDs.contains(Time(6000)))
-
-// WindowedStream1
-assert(windowedStream1.generatedRDDs.contains(Time(1)))
-assert(windowedStream1.generatedRDDs.contains(Time(4000)))
-assert(!windowedStream1.generatedRDDs.contains(Time(3000)))
-
-// MappedStream
-assert(mappedStream.generatedRDDs.contains(Time(1)))
-assert(mappedStream.generatedRDDs.contains(Time(2000)))
-assert(!mappedStream.generatedRDDs.contains(Time(1000)))
+runCleanupTest(
+conf,
+operation _,
+numExpectedOutput = cleanupTestInput.size / 2,
+rememberDuration = Seconds(3)) { operatedStream =>
+  eventually(eventuallyTimeout) {
+val windowedStream2 = operatedStream.asInstanceOf[WindowedDStream[_]]
+val windowedStream1 = 
windowedStream2.dependencies.head.asInstanceOf[WindowedDStream[_]]
+val mappedStream = windowedStream1.dependencies.head
+
+// Checkpoint remember durations
+assert(windowedStream2.rememberDuration === rememberDuration)
+assert(
+  windowedStream1.rememberDuration === rememberDurati

spark git commit: [SPARK-18954][TESTS] Fix flaky test: o.a.s.streaming.BasicOperationsSuite rdd cleanup - map and window

2016-12-21 Thread zsxwing
Repository: spark
Updated Branches:
  refs/heads/branch-2.1 162bdb910 -> 318483421


[SPARK-18954][TESTS] Fix flaky test: o.a.s.streaming.BasicOperationsSuite rdd 
cleanup - map and window

## What changes were proposed in this pull request?

The issue in this test is the cleanup of RDDs may not be able to finish before 
stopping StreamingContext. This PR basically just puts the assertions into 
`eventually` and runs it before stopping StreamingContext.

## How was this patch tested?

Jenkins

Author: Shixiong Zhu 

Closes #16362 from zsxwing/SPARK-18954.

(cherry picked from commit 078c71c2dcbb1470d22f8eb8138fb17e3d7c2414)
Signed-off-by: Shixiong Zhu 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/31848342
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/31848342
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/31848342

Branch: refs/heads/branch-2.1
Commit: 318483421adc3c2e22744c4c580917377ce40b3f
Parents: 162bdb9
Author: Shixiong Zhu 
Authored: Wed Dec 21 11:59:21 2016 -0800
Committer: Shixiong Zhu 
Committed: Wed Dec 21 11:59:28 2016 -0800

--
 .../spark/streaming/BasicOperationsSuite.scala  | 98 
 .../apache/spark/streaming/TestSuiteBase.scala  | 19 +++-
 2 files changed, 73 insertions(+), 44 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/31848342/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
--
diff --git 
a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
 
b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
index 4e702bb..a3062ac 100644
--- 
a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
+++ 
b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
@@ -19,13 +19,13 @@ package org.apache.spark.streaming
 
 import java.util.concurrent.ConcurrentLinkedQueue
 
-import scala.collection.JavaConverters._
 import scala.collection.mutable
 import scala.language.existentials
 import scala.reflect.ClassTag
 
+import org.scalatest.concurrent.Eventually.eventually
+
 import org.apache.spark.{SparkConf, SparkException}
-import org.apache.spark.SparkContext._
 import org.apache.spark.rdd.{BlockRDD, RDD}
 import org.apache.spark.storage.StorageLevel
 import org.apache.spark.streaming.dstream.{DStream, WindowedDStream}
@@ -657,48 +657,57 @@ class BasicOperationsSuite extends TestSuiteBase {
.window(Seconds(4), Seconds(2))
 }
 
-val operatedStream = runCleanupTest(conf, operation _,
-  numExpectedOutput = cleanupTestInput.size / 2, rememberDuration = 
Seconds(3))
-val windowedStream2 = operatedStream.asInstanceOf[WindowedDStream[_]]
-val windowedStream1 = 
windowedStream2.dependencies.head.asInstanceOf[WindowedDStream[_]]
-val mappedStream = windowedStream1.dependencies.head
-
-// Checkpoint remember durations
-assert(windowedStream2.rememberDuration === rememberDuration)
-assert(windowedStream1.rememberDuration === rememberDuration + 
windowedStream2.windowDuration)
-assert(mappedStream.rememberDuration ===
-  rememberDuration + windowedStream2.windowDuration + 
windowedStream1.windowDuration)
-
-// WindowedStream2 should remember till 7 seconds: 10, 9, 8, 7
-// WindowedStream1 should remember till 4 seconds: 10, 9, 8, 7, 6, 5, 4
-// MappedStream should remember till 2 seconds:10, 9, 8, 7, 6, 5, 4, 
3, 2
-
-// WindowedStream2
-assert(windowedStream2.generatedRDDs.contains(Time(1)))
-assert(windowedStream2.generatedRDDs.contains(Time(8000)))
-assert(!windowedStream2.generatedRDDs.contains(Time(6000)))
-
-// WindowedStream1
-assert(windowedStream1.generatedRDDs.contains(Time(1)))
-assert(windowedStream1.generatedRDDs.contains(Time(4000)))
-assert(!windowedStream1.generatedRDDs.contains(Time(3000)))
-
-// MappedStream
-assert(mappedStream.generatedRDDs.contains(Time(1)))
-assert(mappedStream.generatedRDDs.contains(Time(2000)))
-assert(!mappedStream.generatedRDDs.contains(Time(1000)))
+runCleanupTest(
+conf,
+operation _,
+numExpectedOutput = cleanupTestInput.size / 2,
+rememberDuration = Seconds(3)) { operatedStream =>
+  eventually(eventuallyTimeout) {
+val windowedStream2 = operatedStream.asInstanceOf[WindowedDStream[_]]
+val windowedStream1 = 
windowedStream2.dependencies.head.asInstanceOf[WindowedDStream[_]]
+val mappedStream = windowedStream1.dependencies.head
+
+// Checkpoint remember durations
+assert(windowedStream2.rememberDuration === rememberDuration)
+assert(
+  windowedStream1.rememberDuration === rememberDurati

spark git commit: [SPARK-18954][TESTS] Fix flaky test: o.a.s.streaming.BasicOperationsSuite rdd cleanup - map and window

2016-12-21 Thread zsxwing
Repository: spark
Updated Branches:
  refs/heads/master ccfe60a83 -> 078c71c2d


[SPARK-18954][TESTS] Fix flaky test: o.a.s.streaming.BasicOperationsSuite rdd 
cleanup - map and window

## What changes were proposed in this pull request?

The issue in this test is the cleanup of RDDs may not be able to finish before 
stopping StreamingContext. This PR basically just puts the assertions into 
`eventually` and runs it before stopping StreamingContext.

## How was this patch tested?

Jenkins

Author: Shixiong Zhu 

Closes #16362 from zsxwing/SPARK-18954.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/078c71c2
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/078c71c2
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/078c71c2

Branch: refs/heads/master
Commit: 078c71c2dcbb1470d22f8eb8138fb17e3d7c2414
Parents: ccfe60a
Author: Shixiong Zhu 
Authored: Wed Dec 21 11:59:21 2016 -0800
Committer: Shixiong Zhu 
Committed: Wed Dec 21 11:59:21 2016 -0800

--
 .../spark/streaming/BasicOperationsSuite.scala  | 98 
 .../apache/spark/streaming/TestSuiteBase.scala  | 19 +++-
 2 files changed, 73 insertions(+), 44 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/078c71c2/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
--
diff --git 
a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
 
b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
index 4e702bb..a3062ac 100644
--- 
a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
+++ 
b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
@@ -19,13 +19,13 @@ package org.apache.spark.streaming
 
 import java.util.concurrent.ConcurrentLinkedQueue
 
-import scala.collection.JavaConverters._
 import scala.collection.mutable
 import scala.language.existentials
 import scala.reflect.ClassTag
 
+import org.scalatest.concurrent.Eventually.eventually
+
 import org.apache.spark.{SparkConf, SparkException}
-import org.apache.spark.SparkContext._
 import org.apache.spark.rdd.{BlockRDD, RDD}
 import org.apache.spark.storage.StorageLevel
 import org.apache.spark.streaming.dstream.{DStream, WindowedDStream}
@@ -657,48 +657,57 @@ class BasicOperationsSuite extends TestSuiteBase {
.window(Seconds(4), Seconds(2))
 }
 
-val operatedStream = runCleanupTest(conf, operation _,
-  numExpectedOutput = cleanupTestInput.size / 2, rememberDuration = 
Seconds(3))
-val windowedStream2 = operatedStream.asInstanceOf[WindowedDStream[_]]
-val windowedStream1 = 
windowedStream2.dependencies.head.asInstanceOf[WindowedDStream[_]]
-val mappedStream = windowedStream1.dependencies.head
-
-// Checkpoint remember durations
-assert(windowedStream2.rememberDuration === rememberDuration)
-assert(windowedStream1.rememberDuration === rememberDuration + 
windowedStream2.windowDuration)
-assert(mappedStream.rememberDuration ===
-  rememberDuration + windowedStream2.windowDuration + 
windowedStream1.windowDuration)
-
-// WindowedStream2 should remember till 7 seconds: 10, 9, 8, 7
-// WindowedStream1 should remember till 4 seconds: 10, 9, 8, 7, 6, 5, 4
-// MappedStream should remember till 2 seconds:10, 9, 8, 7, 6, 5, 4, 
3, 2
-
-// WindowedStream2
-assert(windowedStream2.generatedRDDs.contains(Time(1)))
-assert(windowedStream2.generatedRDDs.contains(Time(8000)))
-assert(!windowedStream2.generatedRDDs.contains(Time(6000)))
-
-// WindowedStream1
-assert(windowedStream1.generatedRDDs.contains(Time(1)))
-assert(windowedStream1.generatedRDDs.contains(Time(4000)))
-assert(!windowedStream1.generatedRDDs.contains(Time(3000)))
-
-// MappedStream
-assert(mappedStream.generatedRDDs.contains(Time(1)))
-assert(mappedStream.generatedRDDs.contains(Time(2000)))
-assert(!mappedStream.generatedRDDs.contains(Time(1000)))
+runCleanupTest(
+conf,
+operation _,
+numExpectedOutput = cleanupTestInput.size / 2,
+rememberDuration = Seconds(3)) { operatedStream =>
+  eventually(eventuallyTimeout) {
+val windowedStream2 = operatedStream.asInstanceOf[WindowedDStream[_]]
+val windowedStream1 = 
windowedStream2.dependencies.head.asInstanceOf[WindowedDStream[_]]
+val mappedStream = windowedStream1.dependencies.head
+
+// Checkpoint remember durations
+assert(windowedStream2.rememberDuration === rememberDuration)
+assert(
+  windowedStream1.rememberDuration === rememberDuration + 
windowedStream2.windowDuration)
+assert(mappedStream.rememberDuration ===
+  remember

spark git commit: [SPARK-18031][TESTS] Fix flaky test ExecutorAllocationManagerSuite.basic functionality

2016-12-21 Thread tdas
Repository: spark
Updated Branches:
  refs/heads/branch-2.1 3c8861d92 -> 162bdb910


[SPARK-18031][TESTS] Fix flaky test ExecutorAllocationManagerSuite.basic 
functionality

## What changes were proposed in this pull request?

The failure is because in `test("basic functionality")`, it doesn't block until 
`ExecutorAllocationManager.manageAllocation` is called. This PR just adds 
StreamManualClock to allow the tests to block on expected wait time to make the 
test deterministic.

## How was this patch tested?

Jenkins

Author: Shixiong Zhu 

Closes #16321 from zsxwing/SPARK-18031.

(cherry picked from commit ccfe60a8304871779ff1b31b8c2d724f59d5b2af)
Signed-off-by: Tathagata Das 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/162bdb91
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/162bdb91
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/162bdb91

Branch: refs/heads/branch-2.1
Commit: 162bdb9103ecba99cd73004ede4d55ff8fc8
Parents: 3c8861d
Author: Shixiong Zhu 
Authored: Wed Dec 21 11:17:44 2016 -0800
Committer: Tathagata Das 
Committed: Wed Dec 21 11:17:53 2016 -0800

--
 .../ExecutorAllocationManagerSuite.scala| 36 +---
 1 file changed, 32 insertions(+), 4 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/162bdb91/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManagerSuite.scala
--
diff --git 
a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManagerSuite.scala
 
b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManagerSuite.scala
index b49e579..1d2bf35 100644
--- 
a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManagerSuite.scala
+++ 
b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManagerSuite.scala
@@ -36,11 +36,11 @@ class ExecutorAllocationManagerSuite extends SparkFunSuite
 
   private val batchDurationMillis = 1000L
   private var allocationClient: ExecutorAllocationClient = null
-  private var clock: ManualClock = null
+  private var clock: StreamManualClock = null
 
   before {
 allocationClient = mock[ExecutorAllocationClient]
-clock = new ManualClock()
+clock = new StreamManualClock()
   }
 
   test("basic functionality") {
@@ -57,10 +57,14 @@ class ExecutorAllocationManagerSuite extends SparkFunSuite
 reset(allocationClient)
 when(allocationClient.getExecutorIds()).thenReturn(Seq("1", "2"))
 addBatchProcTime(allocationManager, batchProcTimeMs.toLong)
-clock.advance(SCALING_INTERVAL_DEFAULT_SECS * 1000 + 1)
+val advancedTime = SCALING_INTERVAL_DEFAULT_SECS * 1000 + 1
+val expectedWaitTime = clock.getTimeMillis() + advancedTime
+clock.advance(advancedTime)
+// Make sure ExecutorAllocationManager.manageAllocation is called
 eventually(timeout(10 seconds)) {
-  body
+  assert(clock.isStreamWaitingAt(expectedWaitTime))
 }
+body
   }
 
   /** Verify that the expected number of total executor were requested */
@@ -394,3 +398,27 @@ class ExecutorAllocationManagerSuite extends SparkFunSuite
 }
   }
 }
+
+/**
+ * A special manual clock that provide `isStreamWaitingAt` to allow the user 
to check if the clock
+ * is blocking.
+ */
+class StreamManualClock(time: Long = 0L) extends ManualClock(time) with 
Serializable {
+  private var waitStartTime: Option[Long] = None
+
+  override def waitTillTime(targetTime: Long): Long = synchronized {
+try {
+  waitStartTime = Some(getTimeMillis())
+  super.waitTillTime(targetTime)
+} finally {
+  waitStartTime = None
+}
+  }
+
+  /**
+   * Returns if the clock is blocking and the time it started to block is the 
parameter `time`.
+   */
+  def isStreamWaitingAt(time: Long): Boolean = synchronized {
+waitStartTime == Some(time)
+  }
+}


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-18031][TESTS] Fix flaky test ExecutorAllocationManagerSuite.basic functionality

2016-12-21 Thread tdas
Repository: spark
Updated Branches:
  refs/heads/master 607a1e63d -> ccfe60a83


[SPARK-18031][TESTS] Fix flaky test ExecutorAllocationManagerSuite.basic 
functionality

## What changes were proposed in this pull request?

The failure is because in `test("basic functionality")`, it doesn't block until 
`ExecutorAllocationManager.manageAllocation` is called. This PR just adds 
StreamManualClock to allow the tests to block on expected wait time to make the 
test deterministic.

## How was this patch tested?

Jenkins

Author: Shixiong Zhu 

Closes #16321 from zsxwing/SPARK-18031.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ccfe60a8
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ccfe60a8
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ccfe60a8

Branch: refs/heads/master
Commit: ccfe60a8304871779ff1b31b8c2d724f59d5b2af
Parents: 607a1e6
Author: Shixiong Zhu 
Authored: Wed Dec 21 11:17:44 2016 -0800
Committer: Tathagata Das 
Committed: Wed Dec 21 11:17:44 2016 -0800

--
 .../ExecutorAllocationManagerSuite.scala| 36 +---
 1 file changed, 32 insertions(+), 4 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/ccfe60a8/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManagerSuite.scala
--
diff --git 
a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManagerSuite.scala
 
b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManagerSuite.scala
index b49e579..1d2bf35 100644
--- 
a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManagerSuite.scala
+++ 
b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManagerSuite.scala
@@ -36,11 +36,11 @@ class ExecutorAllocationManagerSuite extends SparkFunSuite
 
   private val batchDurationMillis = 1000L
   private var allocationClient: ExecutorAllocationClient = null
-  private var clock: ManualClock = null
+  private var clock: StreamManualClock = null
 
   before {
 allocationClient = mock[ExecutorAllocationClient]
-clock = new ManualClock()
+clock = new StreamManualClock()
   }
 
   test("basic functionality") {
@@ -57,10 +57,14 @@ class ExecutorAllocationManagerSuite extends SparkFunSuite
 reset(allocationClient)
 when(allocationClient.getExecutorIds()).thenReturn(Seq("1", "2"))
 addBatchProcTime(allocationManager, batchProcTimeMs.toLong)
-clock.advance(SCALING_INTERVAL_DEFAULT_SECS * 1000 + 1)
+val advancedTime = SCALING_INTERVAL_DEFAULT_SECS * 1000 + 1
+val expectedWaitTime = clock.getTimeMillis() + advancedTime
+clock.advance(advancedTime)
+// Make sure ExecutorAllocationManager.manageAllocation is called
 eventually(timeout(10 seconds)) {
-  body
+  assert(clock.isStreamWaitingAt(expectedWaitTime))
 }
+body
   }
 
   /** Verify that the expected number of total executor were requested */
@@ -394,3 +398,27 @@ class ExecutorAllocationManagerSuite extends SparkFunSuite
 }
   }
 }
+
+/**
+ * A special manual clock that provide `isStreamWaitingAt` to allow the user 
to check if the clock
+ * is blocking.
+ */
+class StreamManualClock(time: Long = 0L) extends ManualClock(time) with 
Serializable {
+  private var waitStartTime: Option[Long] = None
+
+  override def waitTillTime(targetTime: Long): Long = synchronized {
+try {
+  waitStartTime = Some(getTimeMillis())
+  super.waitTillTime(targetTime)
+} finally {
+  waitStartTime = None
+}
+  }
+
+  /**
+   * Returns if the clock is blocking and the time it started to block is the 
parameter `time`.
+   */
+  def isStreamWaitingAt(time: Long): Boolean = synchronized {
+waitStartTime == Some(time)
+  }
+}


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-18894][SS] Fix event time watermark delay threshold specified in months or years

2016-12-21 Thread zsxwing
Repository: spark
Updated Branches:
  refs/heads/branch-2.1 bc54a14b4 -> 3c8861d92


[SPARK-18894][SS] Fix event time watermark delay threshold specified in months 
or years

## What changes were proposed in this pull request?

Two changes
- Fix how delays specified in months and years are translated to milliseconds
- Following up on #16258, not show watermark when there is no watermarking in 
the query

## How was this patch tested?
Updated and new unit tests

Author: Tathagata Das 

Closes #16304 from tdas/SPARK-18834-1.

(cherry picked from commit 607a1e63dbc9269b806a9f537e1d041029333cdd)
Signed-off-by: Shixiong Zhu 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3c8861d9
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3c8861d9
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3c8861d9

Branch: refs/heads/branch-2.1
Commit: 3c8861d924e42ff84615044930fc5531201b9b12
Parents: bc54a14
Author: Tathagata Das 
Authored: Wed Dec 21 10:44:20 2016 -0800
Committer: Shixiong Zhu 
Committed: Wed Dec 21 10:44:27 2016 -0800

--
 .../streaming/EventTimeWatermarkExec.scala  |   7 +-
 .../execution/streaming/ProgressReporter.scala  |   7 +-
 .../execution/streaming/StreamExecution.scala   |   2 +-
 .../sql/streaming/EventTimeWatermarkSuite.scala | 287 +++
 .../spark/sql/streaming/WatermarkSuite.scala| 240 
 5 files changed, 299 insertions(+), 244 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/3c8861d9/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala
index e8570d0..5a9a99e 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala
@@ -84,6 +84,11 @@ case class EventTimeWatermarkExec(
 child: SparkPlan) extends SparkPlan {
 
   val eventTimeStats = new EventTimeStatsAccum()
+  val delayMs = {
+val millisPerMonth = CalendarInterval.MICROS_PER_DAY / 1000 * 31
+delay.milliseconds + delay.months * millisPerMonth
+  }
+
   sparkContext.register(eventTimeStats)
 
   override protected def doExecute(): RDD[InternalRow] = {
@@ -101,7 +106,7 @@ case class EventTimeWatermarkExec(
 if (a semanticEquals eventTime) {
   val updatedMetadata = new MetadataBuilder()
   .withMetadata(a.metadata)
-  .putLong(EventTimeWatermark.delayKey, delay.milliseconds)
+  .putLong(EventTimeWatermark.delayKey, delayMs)
   .build()
 
   a.withMetadata(updatedMetadata)

http://git-wip-us.apache.org/repos/asf/spark/blob/3c8861d9/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
index 2386f33..c5e9eae 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
@@ -25,7 +25,7 @@ import scala.collection.JavaConverters._
 
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.{DataFrame, SparkSession}
-import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.plans.logical.{EventTimeWatermark, 
LogicalPlan}
 import org.apache.spark.sql.execution.QueryExecution
 import org.apache.spark.sql.streaming._
 import org.apache.spark.sql.streaming.StreamingQueryListener.QueryProgressEvent
@@ -182,7 +182,10 @@ trait ProgressReporter extends Logging {
 
   /** Extracts statistics from the most recent query execution. */
   private def extractExecutionStats(hasNewData: Boolean): ExecutionStats = {
-val watermarkTimestamp = Map("watermark" -> 
formatTimestamp(offsetSeqMetadata.batchWatermarkMs))
+val hasEventTime = logicalPlan.collect { case e: EventTimeWatermark => e 
}.nonEmpty
+val watermarkTimestamp =
+  if (hasEventTime) Map("watermark" -> 
formatTimestamp(offsetSeqMetadata.batchWatermarkMs))
+  else Map.empty[String, String]
 
 if (!hasNewData) {
   return ExecutionStats(Map.empty, Seq.empty, watermarkTimestamp)

http://git-wip-us.apache.org/repos/asf/spark/blob/3c8861d9/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.sc

spark git commit: [SPARK-18894][SS] Fix event time watermark delay threshold specified in months or years

2016-12-21 Thread zsxwing
Repository: spark
Updated Branches:
  refs/heads/master 1a6438897 -> 607a1e63d


[SPARK-18894][SS] Fix event time watermark delay threshold specified in months 
or years

## What changes were proposed in this pull request?

Two changes
- Fix how delays specified in months and years are translated to milliseconds
- Following up on #16258, not show watermark when there is no watermarking in 
the query

## How was this patch tested?
Updated and new unit tests

Author: Tathagata Das 

Closes #16304 from tdas/SPARK-18834-1.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/607a1e63
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/607a1e63
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/607a1e63

Branch: refs/heads/master
Commit: 607a1e63dbc9269b806a9f537e1d041029333cdd
Parents: 1a64388
Author: Tathagata Das 
Authored: Wed Dec 21 10:44:20 2016 -0800
Committer: Shixiong Zhu 
Committed: Wed Dec 21 10:44:20 2016 -0800

--
 .../streaming/EventTimeWatermarkExec.scala  |   7 +-
 .../execution/streaming/ProgressReporter.scala  |   7 +-
 .../execution/streaming/StreamExecution.scala   |   2 +-
 .../sql/streaming/EventTimeWatermarkSuite.scala | 287 +++
 .../spark/sql/streaming/WatermarkSuite.scala| 240 
 5 files changed, 299 insertions(+), 244 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/607a1e63/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala
index e8570d0..5a9a99e 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala
@@ -84,6 +84,11 @@ case class EventTimeWatermarkExec(
 child: SparkPlan) extends SparkPlan {
 
   val eventTimeStats = new EventTimeStatsAccum()
+  val delayMs = {
+val millisPerMonth = CalendarInterval.MICROS_PER_DAY / 1000 * 31
+delay.milliseconds + delay.months * millisPerMonth
+  }
+
   sparkContext.register(eventTimeStats)
 
   override protected def doExecute(): RDD[InternalRow] = {
@@ -101,7 +106,7 @@ case class EventTimeWatermarkExec(
 if (a semanticEquals eventTime) {
   val updatedMetadata = new MetadataBuilder()
   .withMetadata(a.metadata)
-  .putLong(EventTimeWatermark.delayKey, delay.milliseconds)
+  .putLong(EventTimeWatermark.delayKey, delayMs)
   .build()
 
   a.withMetadata(updatedMetadata)

http://git-wip-us.apache.org/repos/asf/spark/blob/607a1e63/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
index 2386f33..c5e9eae 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
@@ -25,7 +25,7 @@ import scala.collection.JavaConverters._
 
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.{DataFrame, SparkSession}
-import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.plans.logical.{EventTimeWatermark, 
LogicalPlan}
 import org.apache.spark.sql.execution.QueryExecution
 import org.apache.spark.sql.streaming._
 import org.apache.spark.sql.streaming.StreamingQueryListener.QueryProgressEvent
@@ -182,7 +182,10 @@ trait ProgressReporter extends Logging {
 
   /** Extracts statistics from the most recent query execution. */
   private def extractExecutionStats(hasNewData: Boolean): ExecutionStats = {
-val watermarkTimestamp = Map("watermark" -> 
formatTimestamp(offsetSeqMetadata.batchWatermarkMs))
+val hasEventTime = logicalPlan.collect { case e: EventTimeWatermark => e 
}.nonEmpty
+val watermarkTimestamp =
+  if (hasEventTime) Map("watermark" -> 
formatTimestamp(offsetSeqMetadata.batchWatermarkMs))
+  else Map.empty[String, String]
 
 if (!hasNewData) {
   return ExecutionStats(Map.empty, Seq.empty, watermarkTimestamp)

http://git-wip-us.apache.org/repos/asf/spark/blob/607a1e63/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
--
diff --git 
a/sql/core/src/main/

spark git commit: [SPARK-18951] Upgrade com.thoughtworks.paranamer/paranamer to 2.6

2016-12-21 Thread yhuai
Repository: spark
Updated Branches:
  refs/heads/master b7650f11c -> 1a6438897


[SPARK-18951] Upgrade com.thoughtworks.paranamer/paranamer to 2.6

## What changes were proposed in this pull request?
I recently hit a bug of com.thoughtworks.paranamer/paranamer, which causes 
jackson fail to handle byte array defined in a case class. Then I find 
https://github.com/FasterXML/jackson-module-scala/issues/48, which suggests 
that it is caused by a bug in paranamer. Let's upgrade paranamer. Since we are 
using jackson 2.6.5 and jackson-module-paranamer 2.6.5 use 
com.thoughtworks.paranamer/paranamer 2.6, I suggests that we upgrade paranamer 
to 2.6.

Author: Yin Huai 

Closes #16359 from yhuai/SPARK-18951.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1a643889
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1a643889
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1a643889

Branch: refs/heads/master
Commit: 1a64388973711b4e567f25fa33d752066a018b49
Parents: b7650f1
Author: Yin Huai 
Authored: Wed Dec 21 09:26:13 2016 -0800
Committer: Yin Huai 
Committed: Wed Dec 21 09:26:13 2016 -0800

--
 dev/deps/spark-deps-hadoop-2.2 | 2 +-
 dev/deps/spark-deps-hadoop-2.3 | 2 +-
 dev/deps/spark-deps-hadoop-2.4 | 2 +-
 dev/deps/spark-deps-hadoop-2.6 | 2 +-
 dev/deps/spark-deps-hadoop-2.7 | 2 +-
 pom.xml| 7 ++-
 6 files changed, 11 insertions(+), 6 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/1a643889/dev/deps/spark-deps-hadoop-2.2
--
diff --git a/dev/deps/spark-deps-hadoop-2.2 b/dev/deps/spark-deps-hadoop-2.2
index afbdae0..9cbab3d8 100644
--- a/dev/deps/spark-deps-hadoop-2.2
+++ b/dev/deps/spark-deps-hadoop-2.2
@@ -128,7 +128,7 @@ objenesis-2.1.jar
 opencsv-2.3.jar
 oro-2.0.8.jar
 osgi-resource-locator-1.0.1.jar
-paranamer-2.3.jar
+paranamer-2.6.jar
 parquet-column-1.8.1.jar
 parquet-common-1.8.1.jar
 parquet-encoding-1.8.1.jar

http://git-wip-us.apache.org/repos/asf/spark/blob/1a643889/dev/deps/spark-deps-hadoop-2.3
--
diff --git a/dev/deps/spark-deps-hadoop-2.3 b/dev/deps/spark-deps-hadoop-2.3
index adf3863..63ce6c6 100644
--- a/dev/deps/spark-deps-hadoop-2.3
+++ b/dev/deps/spark-deps-hadoop-2.3
@@ -135,7 +135,7 @@ objenesis-2.1.jar
 opencsv-2.3.jar
 oro-2.0.8.jar
 osgi-resource-locator-1.0.1.jar
-paranamer-2.3.jar
+paranamer-2.6.jar
 parquet-column-1.8.1.jar
 parquet-common-1.8.1.jar
 parquet-encoding-1.8.1.jar

http://git-wip-us.apache.org/repos/asf/spark/blob/1a643889/dev/deps/spark-deps-hadoop-2.4
--
diff --git a/dev/deps/spark-deps-hadoop-2.4 b/dev/deps/spark-deps-hadoop-2.4
index 88e6b3f..122d5c2 100644
--- a/dev/deps/spark-deps-hadoop-2.4
+++ b/dev/deps/spark-deps-hadoop-2.4
@@ -135,7 +135,7 @@ objenesis-2.1.jar
 opencsv-2.3.jar
 oro-2.0.8.jar
 osgi-resource-locator-1.0.1.jar
-paranamer-2.3.jar
+paranamer-2.6.jar
 parquet-column-1.8.1.jar
 parquet-common-1.8.1.jar
 parquet-encoding-1.8.1.jar

http://git-wip-us.apache.org/repos/asf/spark/blob/1a643889/dev/deps/spark-deps-hadoop-2.6
--
diff --git a/dev/deps/spark-deps-hadoop-2.6 b/dev/deps/spark-deps-hadoop-2.6
index 15c5d9f..776aabd 100644
--- a/dev/deps/spark-deps-hadoop-2.6
+++ b/dev/deps/spark-deps-hadoop-2.6
@@ -143,7 +143,7 @@ objenesis-2.1.jar
 opencsv-2.3.jar
 oro-2.0.8.jar
 osgi-resource-locator-1.0.1.jar
-paranamer-2.3.jar
+paranamer-2.6.jar
 parquet-column-1.8.1.jar
 parquet-common-1.8.1.jar
 parquet-encoding-1.8.1.jar

http://git-wip-us.apache.org/repos/asf/spark/blob/1a643889/dev/deps/spark-deps-hadoop-2.7
--
diff --git a/dev/deps/spark-deps-hadoop-2.7 b/dev/deps/spark-deps-hadoop-2.7
index 77fb537..524e824 100644
--- a/dev/deps/spark-deps-hadoop-2.7
+++ b/dev/deps/spark-deps-hadoop-2.7
@@ -144,7 +144,7 @@ objenesis-2.1.jar
 opencsv-2.3.jar
 oro-2.0.8.jar
 osgi-resource-locator-1.0.1.jar
-paranamer-2.3.jar
+paranamer-2.6.jar
 parquet-column-1.8.1.jar
 parquet-common-1.8.1.jar
 parquet-encoding-1.8.1.jar

http://git-wip-us.apache.org/repos/asf/spark/blob/1a643889/pom.xml
--
diff --git a/pom.xml b/pom.xml
index 4f12085..72e5442 100644
--- a/pom.xml
+++ b/pom.xml
@@ -179,7 +179,7 @@
 4.5.3
 1.1
 2.52.0
-2.8
+2.6
 1.8
 1.0.0
 
@@ -1863,6 +1863,11 @@
   
 
   
+  
+com.thoughtworks.paranamer
+paranamer
+${paranamer.version}
+  
 
   
 


-

spark git commit: [SPARK-18947][SQL] SQLContext.tableNames should not call Catalog.listTables

2016-12-21 Thread wenchen
Repository: spark
Updated Branches:
  refs/heads/branch-2.0 2aae220b5 -> ef206ace2


[SPARK-18947][SQL] SQLContext.tableNames should not call Catalog.listTables

## What changes were proposed in this pull request?

It's a huge waste to call `Catalog.listTables` in `SQLContext.tableNames`, 
which only need the table names, while `Catalog.listTables` will get the table 
metadata for each table name.

## How was this patch tested?

N/A

Author: Wenchen Fan 

Closes #16352 from cloud-fan/minor.

(cherry picked from commit b7650f11c7afbdffc6f5caaafb5dcfd54f7a25ff)
Signed-off-by: Wenchen Fan 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ef206ace
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ef206ace
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ef206ace

Branch: refs/heads/branch-2.0
Commit: ef206ace24d8588782fd9bd4fdf120f20fbfe841
Parents: 2aae220
Author: Wenchen Fan 
Authored: Wed Dec 21 19:39:00 2016 +0800
Committer: Wenchen Fan 
Committed: Wed Dec 21 19:40:15 2016 +0800

--
 .../src/main/scala/org/apache/spark/sql/SQLContext.scala| 4 ++--
 .../main/scala/org/apache/spark/sql/api/r/SQLUtils.scala| 9 +
 2 files changed, 7 insertions(+), 6 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/ef206ace/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
--
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index e7627ac..6013c01 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -730,7 +730,7 @@ class SQLContext private[sql](val sparkSession: 
SparkSession)
* @since 1.3.0
*/
   def tableNames(): Array[String] = {
-sparkSession.catalog.listTables().collect().map(_.name)
+tableNames(sparkSession.catalog.currentDatabase)
   }
 
   /**
@@ -740,7 +740,7 @@ class SQLContext private[sql](val sparkSession: 
SparkSession)
* @since 1.3.0
*/
   def tableNames(databaseName: String): Array[String] = {
-sparkSession.catalog.listTables(databaseName).collect().map(_.name)
+sessionState.catalog.listTables(databaseName).map(_.table).toArray
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/ef206ace/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala
--
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala
index 08f8ded..8bc7f67 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala
@@ -276,11 +276,12 @@ private[sql] object SQLUtils extends Logging {
   }
 
   def getTableNames(sparkSession: SparkSession, databaseName: String): 
Array[String] = {
-databaseName match {
-  case n: String if n != null && n.trim.nonEmpty =>
-sparkSession.catalog.listTables(n).collect().map(_.name)
+val db = databaseName match {
+  case _ if databaseName != null && databaseName.trim.nonEmpty =>
+databaseName
   case _ =>
-sparkSession.catalog.listTables().collect().map(_.name)
+sparkSession.catalog.currentDatabase
 }
+sparkSession.sessionState.catalog.listTables(db).map(_.table).toArray
   }
 }


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-18947][SQL] SQLContext.tableNames should not call Catalog.listTables

2016-12-21 Thread wenchen
Repository: spark
Updated Branches:
  refs/heads/branch-2.1 063a98e52 -> bc54a14b4


[SPARK-18947][SQL] SQLContext.tableNames should not call Catalog.listTables

## What changes were proposed in this pull request?

It's a huge waste to call `Catalog.listTables` in `SQLContext.tableNames`, 
which only need the table names, while `Catalog.listTables` will get the table 
metadata for each table name.

## How was this patch tested?

N/A

Author: Wenchen Fan 

Closes #16352 from cloud-fan/minor.

(cherry picked from commit b7650f11c7afbdffc6f5caaafb5dcfd54f7a25ff)
Signed-off-by: Wenchen Fan 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/bc54a14b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/bc54a14b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/bc54a14b

Branch: refs/heads/branch-2.1
Commit: bc54a14b415041531f94ccc2dd35851c269e8263
Parents: 063a98e
Author: Wenchen Fan 
Authored: Wed Dec 21 19:39:00 2016 +0800
Committer: Wenchen Fan 
Committed: Wed Dec 21 19:39:55 2016 +0800

--
 .../src/main/scala/org/apache/spark/sql/SQLContext.scala| 4 ++--
 .../main/scala/org/apache/spark/sql/api/r/SQLUtils.scala| 9 +
 2 files changed, 7 insertions(+), 6 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/bc54a14b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
--
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index 6554359..1a7fd68 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -747,7 +747,7 @@ class SQLContext private[sql](val sparkSession: 
SparkSession)
* @since 1.3.0
*/
   def tableNames(): Array[String] = {
-sparkSession.catalog.listTables().collect().map(_.name)
+tableNames(sparkSession.catalog.currentDatabase)
   }
 
   /**
@@ -757,7 +757,7 @@ class SQLContext private[sql](val sparkSession: 
SparkSession)
* @since 1.3.0
*/
   def tableNames(databaseName: String): Array[String] = {
-sparkSession.catalog.listTables(databaseName).collect().map(_.name)
+sessionState.catalog.listTables(databaseName).map(_.table).toArray
   }
 
   

http://git-wip-us.apache.org/repos/asf/spark/blob/bc54a14b/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala
--
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala
index 80bbad4..e56c33e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala
@@ -276,11 +276,12 @@ private[sql] object SQLUtils extends Logging {
   }
 
   def getTableNames(sparkSession: SparkSession, databaseName: String): 
Array[String] = {
-databaseName match {
-  case n: String if n != null && n.trim.nonEmpty =>
-sparkSession.catalog.listTables(n).collect().map(_.name)
+val db = databaseName match {
+  case _ if databaseName != null && databaseName.trim.nonEmpty =>
+databaseName
   case _ =>
-sparkSession.catalog.listTables().collect().map(_.name)
+sparkSession.catalog.currentDatabase
 }
+sparkSession.sessionState.catalog.listTables(db).map(_.table).toArray
   }
 }


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-18947][SQL] SQLContext.tableNames should not call Catalog.listTables

2016-12-21 Thread wenchen
Repository: spark
Updated Branches:
  refs/heads/master ba4468bb2 -> b7650f11c


[SPARK-18947][SQL] SQLContext.tableNames should not call Catalog.listTables

## What changes were proposed in this pull request?

It's a huge waste to call `Catalog.listTables` in `SQLContext.tableNames`, 
which only need the table names, while `Catalog.listTables` will get the table 
metadata for each table name.

## How was this patch tested?

N/A

Author: Wenchen Fan 

Closes #16352 from cloud-fan/minor.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b7650f11
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b7650f11
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b7650f11

Branch: refs/heads/master
Commit: b7650f11c7afbdffc6f5caaafb5dcfd54f7a25ff
Parents: ba4468b
Author: Wenchen Fan 
Authored: Wed Dec 21 19:39:00 2016 +0800
Committer: Wenchen Fan 
Committed: Wed Dec 21 19:39:00 2016 +0800

--
 .../src/main/scala/org/apache/spark/sql/SQLContext.scala| 4 ++--
 .../main/scala/org/apache/spark/sql/api/r/SQLUtils.scala| 9 +
 2 files changed, 7 insertions(+), 6 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/b7650f11/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
--
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index 6554359..1a7fd68 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -747,7 +747,7 @@ class SQLContext private[sql](val sparkSession: 
SparkSession)
* @since 1.3.0
*/
   def tableNames(): Array[String] = {
-sparkSession.catalog.listTables().collect().map(_.name)
+tableNames(sparkSession.catalog.currentDatabase)
   }
 
   /**
@@ -757,7 +757,7 @@ class SQLContext private[sql](val sparkSession: 
SparkSession)
* @since 1.3.0
*/
   def tableNames(databaseName: String): Array[String] = {
-sparkSession.catalog.listTables(databaseName).collect().map(_.name)
+sessionState.catalog.listTables(databaseName).map(_.table).toArray
   }
 
   

http://git-wip-us.apache.org/repos/asf/spark/blob/b7650f11/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala
--
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala
index 80bbad4..e56c33e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala
@@ -276,11 +276,12 @@ private[sql] object SQLUtils extends Logging {
   }
 
   def getTableNames(sparkSession: SparkSession, databaseName: String): 
Array[String] = {
-databaseName match {
-  case n: String if n != null && n.trim.nonEmpty =>
-sparkSession.catalog.listTables(n).collect().map(_.name)
+val db = databaseName match {
+  case _ if databaseName != null && databaseName.trim.nonEmpty =>
+databaseName
   case _ =>
-sparkSession.catalog.listTables().collect().map(_.name)
+sparkSession.catalog.currentDatabase
 }
+sparkSession.sessionState.catalog.listTables(db).map(_.table).toArray
   }
 }


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-18923][DOC][BUILD] Support skipping R/Python API docs

2016-12-21 Thread srowen
Repository: spark
Updated Branches:
  refs/heads/master 24c0c9412 -> ba4468bb2


[SPARK-18923][DOC][BUILD] Support skipping R/Python API docs

## What changes were proposed in this pull request?

We can build Python API docs by `cd ./python/docs && make html for Python` and 
R API docs by `cd ./R && sh create-docs.sh for R` separately. However, `jekyll` 
fails in some environments.

This PR aims to support `SKIP_PYTHONDOC` and `SKIP_RDOC` for documentation 
build in `docs` folder. Currently, we can use `SKIP_SCALADOC` or `SKIP_API`. 
The reason providing additional options is that the Spark documentation build 
uses a number of tools to build HTML docs and API docs in Scala, Python and R. 
Specifically, for Python and R,

- Python API docs requires `sphinx`.
- R API docs requires `R` installation and `knitr` (and more others libraries).

In other words, we cannot generate Python API docs without R installation. 
Also, we cannot generate R API docs without Python `sphinx` installation. If 
Spark provides `SKIP_PYTHONDOC` and `SKIP_RDOC` like `SKIP_SCALADOC`, it would 
be more convenient.

## How was this patch tested?

Manual.

**Skipping Scala/Java/Python API Doc Build**
```bash
$ cd docs
$ SKIP_SCALADOC=1 SKIP_PYTHONDOC=1 jekyll build
$ ls api
DESCRIPTION R
```

**Skipping Scala/Java/R API Doc Build**
```bash
$ cd docs
$ SKIP_SCALADOC=1 SKIP_RDOC=1 jekyll build
$ ls api
python
```

Author: Dongjoon Hyun 

Closes #16336 from dongjoon-hyun/SPARK-18923.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ba4468bb
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ba4468bb
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ba4468bb

Branch: refs/heads/master
Commit: ba4468bb24f2e13b98e7b66b44203c20393b8ab8
Parents: 24c0c94
Author: Dongjoon Hyun 
Authored: Wed Dec 21 08:59:38 2016 +
Committer: Sean Owen 
Committed: Wed Dec 21 08:59:38 2016 +

--
 docs/README.md |  3 ++-
 docs/_plugins/copy_api_dirs.rb | 49 -
 2 files changed, 29 insertions(+), 23 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/ba4468bb/docs/README.md
--
diff --git a/docs/README.md b/docs/README.md
index ffd3b57..90e10a1 100644
--- a/docs/README.md
+++ b/docs/README.md
@@ -69,4 +69,5 @@ may take some time as it generates all of the scaladoc.  The 
jekyll plugin also
 PySpark docs using [Sphinx](http://sphinx-doc.org/).
 
 NOTE: To skip the step of building and copying over the Scala, Python, R API 
docs, run `SKIP_API=1
-jekyll`.
+jekyll`. In addition, `SKIP_SCALADOC=1`, `SKIP_PYTHONDOC=1`, and `SKIP_RDOC=1` 
can be used to skip a single
+step of the corresponding language.

http://git-wip-us.apache.org/repos/asf/spark/blob/ba4468bb/docs/_plugins/copy_api_dirs.rb
--
diff --git a/docs/_plugins/copy_api_dirs.rb b/docs/_plugins/copy_api_dirs.rb
index 71e6432..95e3ba3 100644
--- a/docs/_plugins/copy_api_dirs.rb
+++ b/docs/_plugins/copy_api_dirs.rb
@@ -113,36 +113,41 @@ if not (ENV['SKIP_API'] == '1')
 File.open(css_file, 'a') { |f| f.write("\n" + css.join()) }
   end
 
-  # Build Sphinx docs for Python
+  if not (ENV['SKIP_PYTHONDOC'] == '1')
+# Build Sphinx docs for Python
 
-  puts "Moving to python/docs directory and building sphinx."
-  cd("../python/docs")
-  system("make html") || raise("Python doc generation failed")
+puts "Moving to python/docs directory and building sphinx."
+cd("../python/docs")
+system("make html") || raise("Python doc generation failed")
 
-  puts "Moving back into home dir."
-  cd("../../")
+puts "Moving back into docs dir."
+cd("../../docs")
 
-  puts "Making directory api/python"
-  mkdir_p "docs/api/python"
+puts "Making directory api/python"
+mkdir_p "api/python"
 
-  puts "cp -r python/docs/_build/html/. docs/api/python"
-  cp_r("python/docs/_build/html/.", "docs/api/python")
+puts "cp -r ../python/docs/_build/html/. api/python"
+cp_r("../python/docs/_build/html/.", "api/python")
+  end
 
-  # Build SparkR API docs
-  puts "Moving to R directory and building roxygen docs."
-  cd("R")
-  system("./create-docs.sh") || raise("R doc generation failed")
+  if not (ENV['SKIP_RDOC'] == '1')
+# Build SparkR API docs
 
-  puts "Moving back into home dir."
-  cd("../")
+puts "Moving to R directory and building roxygen docs."
+cd("../R")
+system("./create-docs.sh") || raise("R doc generation failed")
 
-  puts "Making directory api/R"
-  mkdir_p "docs/api/R"
+puts "Moving back into docs dir."
+cd("../docs")
 
-  puts "cp -r R/pkg/html/. docs/api/R"
-  cp_r("R/pkg/html/.", "docs/api/R")
+puts "Making directory api/R"
+mkdir_p "a