spark git commit: [DOC] bucketing is applicable to all file-based data sources
Repository: spark Updated Branches: refs/heads/master 7c5b7b3a2 -> 2e861df96 [DOC] bucketing is applicable to all file-based data sources ## What changes were proposed in this pull request? Starting Spark 2.1.0, bucketing feature is available for all file-based data sources. This patch fixes some function docs that haven't yet been updated to reflect that. ## How was this patch tested? N/A Author: Reynold Xin Closes #16349 from rxin/ds-doc. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2e861df9 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2e861df9 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2e861df9 Branch: refs/heads/master Commit: 2e861df96eacd821edbbd9883121bff67611074f Parents: 7c5b7b3 Author: Reynold Xin Authored: Wed Dec 21 23:46:33 2016 -0800 Committer: Reynold Xin Committed: Wed Dec 21 23:46:33 2016 -0800 -- .../src/main/scala/org/apache/spark/sql/DataFrameWriter.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/2e861df9/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index d33f7da..9c5660a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -150,7 +150,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { * predicates on the partitioned columns. In order for partitioning to work well, the number * of distinct values in each column should typically be less than tens of thousands. * - * This was initially applicable for Parquet but in 1.5+ covers JSON, text, ORC and avro as well. + * This is applicable for all file-based data sources (e.g. Parquet, JSON) staring Spark 2.1.0. * * @since 1.4.0 */ @@ -164,7 +164,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { * Buckets the output by the given columns. If specified, the output is laid out on the file * system similar to Hive's bucketing scheme. * - * This is applicable for Parquet, JSON and ORC. + * This is applicable for all file-based data sources (e.g. Parquet, JSON) staring Spark 2.1.0. * * @since 2.0 */ @@ -178,7 +178,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { /** * Sorts the output in each bucket by the given columns. * - * This is applicable for Parquet, JSON and ORC. + * This is applicable for all file-based data sources (e.g. Parquet, JSON) staring Spark 2.1.0. * * @since 2.0 */ - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [DOC] bucketing is applicable to all file-based data sources
Repository: spark Updated Branches: refs/heads/branch-2.1 def3690f6 -> ec0d6e21e [DOC] bucketing is applicable to all file-based data sources ## What changes were proposed in this pull request? Starting Spark 2.1.0, bucketing feature is available for all file-based data sources. This patch fixes some function docs that haven't yet been updated to reflect that. ## How was this patch tested? N/A Author: Reynold Xin Closes #16349 from rxin/ds-doc. (cherry picked from commit 2e861df96eacd821edbbd9883121bff67611074f) Signed-off-by: Reynold Xin Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ec0d6e21 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ec0d6e21 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ec0d6e21 Branch: refs/heads/branch-2.1 Commit: ec0d6e21ed85164fd7eb519ec1d017497122c55c Parents: def3690 Author: Reynold Xin Authored: Wed Dec 21 23:46:33 2016 -0800 Committer: Reynold Xin Committed: Wed Dec 21 23:46:38 2016 -0800 -- .../src/main/scala/org/apache/spark/sql/DataFrameWriter.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/ec0d6e21/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index fa8e8cb..44c407d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -150,7 +150,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { * predicates on the partitioned columns. In order for partitioning to work well, the number * of distinct values in each column should typically be less than tens of thousands. * - * This was initially applicable for Parquet but in 1.5+ covers JSON, text, ORC and avro as well. + * This is applicable for all file-based data sources (e.g. Parquet, JSON) staring Spark 2.1.0. * * @since 1.4.0 */ @@ -164,7 +164,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { * Buckets the output by the given columns. If specified, the output is laid out on the file * system similar to Hive's bucketing scheme. * - * This is applicable for Parquet, JSON and ORC. + * This is applicable for all file-based data sources (e.g. Parquet, JSON) staring Spark 2.1.0. * * @since 2.0 */ @@ -178,7 +178,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { /** * Sorts the output in each bucket by the given columns. * - * This is applicable for Parquet, JSON and ORC. + * This is applicable for all file-based data sources (e.g. Parquet, JSON) staring Spark 2.1.0. * * @since 2.0 */ - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SQL] Minor readability improvement for partition handling code
Repository: spark Updated Branches: refs/heads/branch-2.1 07e2a17d1 -> def3690f6 [SQL] Minor readability improvement for partition handling code This patch includes minor changes to improve readability for partition handling code. I'm in the middle of implementing some new feature and found some naming / implicit type inference not as intuitive. This patch should have no semantic change and the changes should be covered by existing test cases. Author: Reynold Xin Closes #16378 from rxin/minor-fix. (cherry picked from commit 7c5b7b3a2e5a7c1b2d0d8ce655840cad581e47ac) Signed-off-by: Reynold Xin Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/def3690f Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/def3690f Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/def3690f Branch: refs/heads/branch-2.1 Commit: def3690f6889979226478bf9c35a240d7e0662e6 Parents: 07e2a17 Author: Reynold Xin Authored: Thu Dec 22 15:29:56 2016 +0800 Committer: Reynold Xin Committed: Wed Dec 21 23:45:16 2016 -0800 -- .../sql/execution/DataSourceScanExec.scala | 7 +- .../datasources/CatalogFileIndex.scala | 11 +-- .../sql/execution/datasources/FileFormat.scala | 3 +- .../execution/datasources/FileStatusCache.scala | 72 ++-- 4 files changed, 49 insertions(+), 44 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/def3690f/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala index e485b52..7616164 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala @@ -136,7 +136,7 @@ case class RowDataSourceScanExec( * @param outputSchema Output schema of the scan. * @param partitionFilters Predicates to use for partition pruning. * @param dataFilters Data source filters to use for filtering data within partitions. - * @param metastoreTableIdentifier + * @param metastoreTableIdentifier identifier for the table in the metastore. */ case class FileSourceScanExec( @transient relation: HadoopFsRelation, @@ -147,10 +147,10 @@ case class FileSourceScanExec( override val metastoreTableIdentifier: Option[TableIdentifier]) extends DataSourceScanExec { - val supportsBatch = relation.fileFormat.supportBatch( + val supportsBatch: Boolean = relation.fileFormat.supportBatch( relation.sparkSession, StructType.fromAttributes(output)) - val needsUnsafeRowConversion = if (relation.fileFormat.isInstanceOf[ParquetSource]) { + val needsUnsafeRowConversion: Boolean = if (relation.fileFormat.isInstanceOf[ParquetSource]) { SparkSession.getActiveSession.get.sessionState.conf.parquetVectorizedReaderEnabled } else { false @@ -516,7 +516,6 @@ case class FileSourceScanExec( } // Assign files to partitions using "First Fit Decreasing" (FFD) -// TODO: consider adding a slop factor here? splitFiles.foreach { file => if (currentSize + file.length > maxSplitBytes) { closePartition() http://git-wip-us.apache.org/repos/asf/spark/blob/def3690f/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala index 4ad91dc..1235a4b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.execution.datasources +import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.spark.sql.SparkSession @@ -37,14 +38,15 @@ class CatalogFileIndex( val table: CatalogTable, override val sizeInBytes: Long) extends FileIndex { - protected val hadoopConf = sparkSession.sessionState.newHadoopConf + protected val hadoopConf: Configuration = sparkSession.sessionState.newHadoopConf() - private val fileStatusCache = FileStatusCache.newCache(sparkSession) + /** Globally shared (not exclusive to this table) cache for file statuses to speed up listing. */ + private val fileStatusCache = FileStatusCache.getOrCreate(sparkSession) assert(table.identifier.database.isDefined, "The table identifier must be
spark git commit: [SQL] Minor readability improvement for partition handling code
Repository: spark Updated Branches: refs/heads/master ff7d82a20 -> 7c5b7b3a2 [SQL] Minor readability improvement for partition handling code ## What changes were proposed in this pull request? This patch includes minor changes to improve readability for partition handling code. I'm in the middle of implementing some new feature and found some naming / implicit type inference not as intuitive. ## How was this patch tested? This patch should have no semantic change and the changes should be covered by existing test cases. Author: Reynold Xin Closes #16378 from rxin/minor-fix. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7c5b7b3a Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7c5b7b3a Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7c5b7b3a Branch: refs/heads/master Commit: 7c5b7b3a2e5a7c1b2d0d8ce655840cad581e47ac Parents: ff7d82a Author: Reynold Xin Authored: Thu Dec 22 15:29:56 2016 +0800 Committer: Wenchen Fan Committed: Thu Dec 22 15:29:56 2016 +0800 -- .../sql/execution/DataSourceScanExec.scala | 7 +- .../datasources/CatalogFileIndex.scala | 11 +-- .../sql/execution/datasources/FileFormat.scala | 3 +- .../execution/datasources/FileStatusCache.scala | 72 ++-- 4 files changed, 49 insertions(+), 44 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/7c5b7b3a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala index e485b52..7616164 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala @@ -136,7 +136,7 @@ case class RowDataSourceScanExec( * @param outputSchema Output schema of the scan. * @param partitionFilters Predicates to use for partition pruning. * @param dataFilters Data source filters to use for filtering data within partitions. - * @param metastoreTableIdentifier + * @param metastoreTableIdentifier identifier for the table in the metastore. */ case class FileSourceScanExec( @transient relation: HadoopFsRelation, @@ -147,10 +147,10 @@ case class FileSourceScanExec( override val metastoreTableIdentifier: Option[TableIdentifier]) extends DataSourceScanExec { - val supportsBatch = relation.fileFormat.supportBatch( + val supportsBatch: Boolean = relation.fileFormat.supportBatch( relation.sparkSession, StructType.fromAttributes(output)) - val needsUnsafeRowConversion = if (relation.fileFormat.isInstanceOf[ParquetSource]) { + val needsUnsafeRowConversion: Boolean = if (relation.fileFormat.isInstanceOf[ParquetSource]) { SparkSession.getActiveSession.get.sessionState.conf.parquetVectorizedReaderEnabled } else { false @@ -516,7 +516,6 @@ case class FileSourceScanExec( } // Assign files to partitions using "First Fit Decreasing" (FFD) -// TODO: consider adding a slop factor here? splitFiles.foreach { file => if (currentSize + file.length > maxSplitBytes) { closePartition() http://git-wip-us.apache.org/repos/asf/spark/blob/7c5b7b3a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala index 4ad91dc..1235a4b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.execution.datasources +import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.spark.sql.SparkSession @@ -37,14 +38,15 @@ class CatalogFileIndex( val table: CatalogTable, override val sizeInBytes: Long) extends FileIndex { - protected val hadoopConf = sparkSession.sessionState.newHadoopConf + protected val hadoopConf: Configuration = sparkSession.sessionState.newHadoopConf() - private val fileStatusCache = FileStatusCache.newCache(sparkSession) + /** Globally shared (not exclusive to this table) cache for file statuses to speed up listing. */ + private val fileStatusCache = FileStatusCache.getOrCreate(sparkSession) assert(table.identifier.database.isDefined, "The table identifier must be qualified in CatalogFil
spark git commit: [SPARK-18908][SS] Creating StreamingQueryException should check if logicalPlan is created
Repository: spark Updated Branches: refs/heads/branch-2.1 9a3c5bd70 -> 07e2a17d1 [SPARK-18908][SS] Creating StreamingQueryException should check if logicalPlan is created ## What changes were proposed in this pull request? This PR audits places using `logicalPlan` in StreamExecution and ensures they all handles the case that `logicalPlan` cannot be created. In addition, this PR also fixes the following issues in `StreamingQueryException`: - `StreamingQueryException` and `StreamExecution` are cycle-dependent because in the `StreamingQueryException`'s constructor, it calls `StreamExecution`'s `toDebugString` which uses `StreamingQueryException`. Hence it will output `null` value in the error message. - Duplicated stack trace when calling Throwable.printStackTrace because StreamingQueryException's toString contains the stack trace. ## How was this patch tested? The updated `test("max files per trigger - incorrect values")`. I found this issue when I switched from `testStream` to the real codes to verify the failure in this test. Author: Shixiong Zhu Closes #16322 from zsxwing/SPARK-18907. (cherry picked from commit ff7d82a207e8bef7779c27378f7a50a138627341) Signed-off-by: Shixiong Zhu Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/07e2a17d Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/07e2a17d Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/07e2a17d Branch: refs/heads/branch-2.1 Commit: 07e2a17d1cb7eade93d482d18a2079e9e6f40f57 Parents: 9a3c5bd Author: Shixiong Zhu Authored: Wed Dec 21 22:02:57 2016 -0800 Committer: Shixiong Zhu Committed: Wed Dec 21 22:03:05 2016 -0800 -- .../execution/streaming/StreamExecution.scala | 141 --- .../sql/streaming/StreamingQueryException.scala | 28 +--- .../sql/streaming/FileStreamSourceSuite.scala | 39 +++-- .../spark/sql/streaming/StreamSuite.scala | 3 +- .../apache/spark/sql/streaming/StreamTest.scala | 52 +-- .../streaming/StreamingQueryListenerSuite.scala | 2 +- .../sql/streaming/StreamingQuerySuite.scala | 4 +- 7 files changed, 165 insertions(+), 104 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/07e2a17d/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index e05200d..a35950e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -31,7 +31,6 @@ import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.encoders.RowEncoder import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, CurrentBatchTimestamp, CurrentDate, CurrentTimestamp} import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan} -import org.apache.spark.sql.catalyst.util._ import org.apache.spark.sql.execution.QueryExecution import org.apache.spark.sql.execution.command.ExplainCommand import org.apache.spark.sql.streaming._ @@ -67,6 +66,7 @@ class StreamExecution( private val awaitBatchLock = new ReentrantLock(true) private val awaitBatchLockCondition = awaitBatchLock.newCondition() + private val initializationLatch = new CountDownLatch(1) private val startLatch = new CountDownLatch(1) private val terminationLatch = new CountDownLatch(1) @@ -118,9 +118,22 @@ class StreamExecution( private val prettyIdString = Option(name).map(_ + " ").getOrElse("") + s"[id = $id, runId = $runId]" + /** + * All stream sources present in the query plan. This will be set when generating logical plan. + */ + @volatile protected var sources: Seq[Source] = Seq.empty + + /** + * A list of unique sources in the query plan. This will be set when generating logical plan. + */ + @volatile private var uniqueSources: Seq[Source] = Seq.empty + override lazy val logicalPlan: LogicalPlan = { +assert(microBatchThread eq Thread.currentThread, + "logicalPlan must be initialized in StreamExecutionThread " + +s"but the current thread was ${Thread.currentThread}") var nextSourceId = 0L -analyzedPlan.transform { +val _logicalPlan = analyzedPlan.transform { case StreamingRelation(dataSource, _, output) => // Materialize source to avoid creating it in every batch val metadataPath = s"$checkpointRoot/sources/$nextSourceId" @@ -130,22 +143,18 @@ class StreamExecution( // "df.logicalPlan" has already used attributes of the previous `output`.
spark git commit: [SPARK-18908][SS] Creating StreamingQueryException should check if logicalPlan is created
Repository: spark Updated Branches: refs/heads/master e1b43dc45 -> ff7d82a20 [SPARK-18908][SS] Creating StreamingQueryException should check if logicalPlan is created ## What changes were proposed in this pull request? This PR audits places using `logicalPlan` in StreamExecution and ensures they all handles the case that `logicalPlan` cannot be created. In addition, this PR also fixes the following issues in `StreamingQueryException`: - `StreamingQueryException` and `StreamExecution` are cycle-dependent because in the `StreamingQueryException`'s constructor, it calls `StreamExecution`'s `toDebugString` which uses `StreamingQueryException`. Hence it will output `null` value in the error message. - Duplicated stack trace when calling Throwable.printStackTrace because StreamingQueryException's toString contains the stack trace. ## How was this patch tested? The updated `test("max files per trigger - incorrect values")`. I found this issue when I switched from `testStream` to the real codes to verify the failure in this test. Author: Shixiong Zhu Closes #16322 from zsxwing/SPARK-18907. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ff7d82a2 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ff7d82a2 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ff7d82a2 Branch: refs/heads/master Commit: ff7d82a207e8bef7779c27378f7a50a138627341 Parents: e1b43dc Author: Shixiong Zhu Authored: Wed Dec 21 22:02:57 2016 -0800 Committer: Shixiong Zhu Committed: Wed Dec 21 22:02:57 2016 -0800 -- .../execution/streaming/StreamExecution.scala | 141 --- .../sql/streaming/StreamingQueryException.scala | 28 +--- .../sql/streaming/FileStreamSourceSuite.scala | 39 +++-- .../spark/sql/streaming/StreamSuite.scala | 3 +- .../apache/spark/sql/streaming/StreamTest.scala | 52 +-- .../streaming/StreamingQueryListenerSuite.scala | 2 +- .../sql/streaming/StreamingQuerySuite.scala | 4 +- 7 files changed, 165 insertions(+), 104 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/ff7d82a2/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index e05200d..a35950e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -31,7 +31,6 @@ import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.encoders.RowEncoder import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, CurrentBatchTimestamp, CurrentDate, CurrentTimestamp} import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan} -import org.apache.spark.sql.catalyst.util._ import org.apache.spark.sql.execution.QueryExecution import org.apache.spark.sql.execution.command.ExplainCommand import org.apache.spark.sql.streaming._ @@ -67,6 +66,7 @@ class StreamExecution( private val awaitBatchLock = new ReentrantLock(true) private val awaitBatchLockCondition = awaitBatchLock.newCondition() + private val initializationLatch = new CountDownLatch(1) private val startLatch = new CountDownLatch(1) private val terminationLatch = new CountDownLatch(1) @@ -118,9 +118,22 @@ class StreamExecution( private val prettyIdString = Option(name).map(_ + " ").getOrElse("") + s"[id = $id, runId = $runId]" + /** + * All stream sources present in the query plan. This will be set when generating logical plan. + */ + @volatile protected var sources: Seq[Source] = Seq.empty + + /** + * A list of unique sources in the query plan. This will be set when generating logical plan. + */ + @volatile private var uniqueSources: Seq[Source] = Seq.empty + override lazy val logicalPlan: LogicalPlan = { +assert(microBatchThread eq Thread.currentThread, + "logicalPlan must be initialized in StreamExecutionThread " + +s"but the current thread was ${Thread.currentThread}") var nextSourceId = 0L -analyzedPlan.transform { +val _logicalPlan = analyzedPlan.transform { case StreamingRelation(dataSource, _, output) => // Materialize source to avoid creating it in every batch val metadataPath = s"$checkpointRoot/sources/$nextSourceId" @@ -130,22 +143,18 @@ class StreamExecution( // "df.logicalPlan" has already used attributes of the previous `output`. StreamingExecutionRelation(source, output) } +sources = _logicalPlan.collect { case s: St
spark git commit: [BUILD] make-distribution should find JAVA_HOME for non-RHEL systems
Repository: spark Updated Branches: refs/heads/master afe36516e -> e1b43dc45 [BUILD] make-distribution should find JAVA_HOME for non-RHEL systems ## What changes were proposed in this pull request? make-distribution.sh should find JAVA_HOME for Ubuntu, Mac and other non-RHEL systems ## How was this patch tested? Manually Author: Felix Cheung Closes #16363 from felixcheung/buildjava. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e1b43dc4 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e1b43dc4 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e1b43dc4 Branch: refs/heads/master Commit: e1b43dc45b136a89f105c04c3074233f52568152 Parents: afe3651 Author: Felix Cheung Authored: Wed Dec 21 17:24:53 2016 -0800 Committer: Felix Cheung Committed: Wed Dec 21 17:24:53 2016 -0800 -- dev/make-distribution.sh | 7 +++ 1 file changed, 7 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/e1b43dc4/dev/make-distribution.sh -- diff --git a/dev/make-distribution.sh b/dev/make-distribution.sh index 6ea319e..6c5ae0d 100755 --- a/dev/make-distribution.sh +++ b/dev/make-distribution.sh @@ -102,6 +102,13 @@ if [ -z "$JAVA_HOME" ]; then echo "No JAVA_HOME set, proceeding with '$JAVA_HOME' learned from rpm" fi fi + + if [ -z "$JAVA_HOME" ]; then +if [ `command -v java` ]; then + # If java is in /usr/bin/java, we want /usr + JAVA_HOME="$(dirname $(dirname $(which java)))" +fi + fi fi if [ -z "$JAVA_HOME" ]; then - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [FLAKY-TEST] InputStreamsSuite.socket input stream
Repository: spark Updated Branches: refs/heads/branch-2.1 021952d58 -> 9a3c5bd70 [FLAKY-TEST] InputStreamsSuite.socket input stream ## What changes were proposed in this pull request? https://spark-tests.appspot.com/test-details?suite_name=org.apache.spark.streaming.InputStreamsSuite&test_name=socket+input+stream ## How was this patch tested? Tested 2,000 times. Author: Burak Yavuz Closes #16343 from brkyvz/sock. (cherry picked from commit afe36516e4b4031196ee2e0a04980ac49208ea6b) Signed-off-by: Tathagata Das Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9a3c5bd7 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9a3c5bd7 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9a3c5bd7 Branch: refs/heads/branch-2.1 Commit: 9a3c5bd7082474cfb01f021aef103e44d12e2ff1 Parents: 021952d Author: Burak Yavuz Authored: Wed Dec 21 17:23:48 2016 -0800 Committer: Tathagata Das Committed: Wed Dec 21 17:23:58 2016 -0800 -- .../spark/streaming/InputStreamsSuite.scala | 55 1 file changed, 23 insertions(+), 32 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/9a3c5bd7/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala -- diff --git a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala index 9ecfa48..6fb50a4 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala @@ -67,42 +67,33 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { val expectedOutput = input.map(_.toString) for (i <- input.indices) { testServer.send(input(i).toString + "\n") - Thread.sleep(500) clock.advance(batchDuration.milliseconds) } -// Make sure we finish all batches before "stop" -if (!batchCounter.waitUntilBatchesCompleted(input.size, 3)) { - fail("Timeout: cannot finish all batches in 30 seconds") + +eventually(eventuallyTimeout) { + clock.advance(batchDuration.milliseconds) + // Verify whether data received was as expected + logInfo("") + logInfo("output.size = " + outputQueue.size) + logInfo("output") + outputQueue.asScala.foreach(x => logInfo("[" + x.mkString(",") + "]")) + logInfo("expected output.size = " + expectedOutput.size) + logInfo("expected output") + expectedOutput.foreach(x => logInfo("[" + x.mkString(",") + "]")) + logInfo("") + + // Verify whether all the elements received are as expected + // (whether the elements were received one in each interval is not verified) + val output: Array[String] = outputQueue.asScala.flatMap(x => x).toArray + assert(output.length === expectedOutput.size) + for (i <- output.indices) { +assert(output(i) === expectedOutput(i)) + } } -// Ensure progress listener has been notified of all events -ssc.sparkContext.listenerBus.waitUntilEmpty(500) - -// Verify all "InputInfo"s have been reported -assert(ssc.progressListener.numTotalReceivedRecords === input.size) -assert(ssc.progressListener.numTotalProcessedRecords === input.size) - -logInfo("Stopping server") -testServer.stop() -logInfo("Stopping context") -ssc.stop() - -// Verify whether data received was as expected -logInfo("") -logInfo("output.size = " + outputQueue.size) -logInfo("output") -outputQueue.asScala.foreach(x => logInfo("[" + x.mkString(",") + "]")) -logInfo("expected output.size = " + expectedOutput.size) -logInfo("expected output") -expectedOutput.foreach(x => logInfo("[" + x.mkString(",") + "]")) -logInfo("") - -// Verify whether all the elements received are as expected -// (whether the elements were received one in each interval is not verified) -val output: Array[String] = outputQueue.asScala.flatMap(x => x).toArray -assert(output.length === expectedOutput.size) -for (i <- output.indices) { - assert(output(i) === expectedOutput(i)) +eventually(eventuallyTimeout) { + assert(ssc.progressListener.numTotalReceivedRecords === input.length) + assert(ssc.progressListener.numTotalProcessedRecords === input.length) } } } -
spark git commit: [FLAKY-TEST] InputStreamsSuite.socket input stream
Repository: spark Updated Branches: refs/heads/master 7e8994ffd -> afe36516e [FLAKY-TEST] InputStreamsSuite.socket input stream ## What changes were proposed in this pull request? https://spark-tests.appspot.com/test-details?suite_name=org.apache.spark.streaming.InputStreamsSuite&test_name=socket+input+stream ## How was this patch tested? Tested 2,000 times. Author: Burak Yavuz Closes #16343 from brkyvz/sock. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/afe36516 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/afe36516 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/afe36516 Branch: refs/heads/master Commit: afe36516e4b4031196ee2e0a04980ac49208ea6b Parents: 7e8994f Author: Burak Yavuz Authored: Wed Dec 21 17:23:48 2016 -0800 Committer: Tathagata Das Committed: Wed Dec 21 17:23:48 2016 -0800 -- .../spark/streaming/InputStreamsSuite.scala | 55 1 file changed, 23 insertions(+), 32 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/afe36516/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala -- diff --git a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala index 9ecfa48..6fb50a4 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala @@ -67,42 +67,33 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { val expectedOutput = input.map(_.toString) for (i <- input.indices) { testServer.send(input(i).toString + "\n") - Thread.sleep(500) clock.advance(batchDuration.milliseconds) } -// Make sure we finish all batches before "stop" -if (!batchCounter.waitUntilBatchesCompleted(input.size, 3)) { - fail("Timeout: cannot finish all batches in 30 seconds") + +eventually(eventuallyTimeout) { + clock.advance(batchDuration.milliseconds) + // Verify whether data received was as expected + logInfo("") + logInfo("output.size = " + outputQueue.size) + logInfo("output") + outputQueue.asScala.foreach(x => logInfo("[" + x.mkString(",") + "]")) + logInfo("expected output.size = " + expectedOutput.size) + logInfo("expected output") + expectedOutput.foreach(x => logInfo("[" + x.mkString(",") + "]")) + logInfo("") + + // Verify whether all the elements received are as expected + // (whether the elements were received one in each interval is not verified) + val output: Array[String] = outputQueue.asScala.flatMap(x => x).toArray + assert(output.length === expectedOutput.size) + for (i <- output.indices) { +assert(output(i) === expectedOutput(i)) + } } -// Ensure progress listener has been notified of all events -ssc.sparkContext.listenerBus.waitUntilEmpty(500) - -// Verify all "InputInfo"s have been reported -assert(ssc.progressListener.numTotalReceivedRecords === input.size) -assert(ssc.progressListener.numTotalProcessedRecords === input.size) - -logInfo("Stopping server") -testServer.stop() -logInfo("Stopping context") -ssc.stop() - -// Verify whether data received was as expected -logInfo("") -logInfo("output.size = " + outputQueue.size) -logInfo("output") -outputQueue.asScala.foreach(x => logInfo("[" + x.mkString(",") + "]")) -logInfo("expected output.size = " + expectedOutput.size) -logInfo("expected output") -expectedOutput.foreach(x => logInfo("[" + x.mkString(",") + "]")) -logInfo("") - -// Verify whether all the elements received are as expected -// (whether the elements were received one in each interval is not verified) -val output: Array[String] = outputQueue.asScala.flatMap(x => x).toArray -assert(output.length === expectedOutput.size) -for (i <- output.indices) { - assert(output(i) === expectedOutput(i)) +eventually(eventuallyTimeout) { + assert(ssc.progressListener.numTotalReceivedRecords === input.length) + assert(ssc.progressListener.numTotalProcessedRecords === input.length) } } } - To unsubscribe, e-mail: commits-unsubsc
spark git commit: [SPARK-18903][SPARKR] Add API to get SparkUI URL
Repository: spark Updated Branches: refs/heads/master b41ec9977 -> 7e8994ffd [SPARK-18903][SPARKR] Add API to get SparkUI URL ## What changes were proposed in this pull request? API for SparkUI URL from SparkContext ## How was this patch tested? manual, unit tests Author: Felix Cheung Closes #16367 from felixcheung/rwebui. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7e8994ff Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7e8994ff Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7e8994ff Branch: refs/heads/master Commit: 7e8994ffd3d646adb0a769229637931e43cd12b0 Parents: b41ec99 Author: Felix Cheung Authored: Wed Dec 21 17:21:17 2016 -0800 Committer: Felix Cheung Committed: Wed Dec 21 17:21:17 2016 -0800 -- R/pkg/NAMESPACE | 1 + R/pkg/R/sparkR.R | 24 R/pkg/inst/tests/testthat/test_sparkSQL.R | 5 - 3 files changed, 29 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/7e8994ff/R/pkg/NAMESPACE -- diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE index 377f942..c3ec3f4 100644 --- a/R/pkg/NAMESPACE +++ b/R/pkg/NAMESPACE @@ -16,6 +16,7 @@ export("sparkR.stop") export("sparkR.session.stop") export("sparkR.conf") export("sparkR.version") +export("sparkR.uiWebUrl") export("print.jobj") export("sparkR.newJObject") http://git-wip-us.apache.org/repos/asf/spark/blob/7e8994ff/R/pkg/R/sparkR.R -- diff --git a/R/pkg/R/sparkR.R b/R/pkg/R/sparkR.R index c57cc8f..e9d42c1 100644 --- a/R/pkg/R/sparkR.R +++ b/R/pkg/R/sparkR.R @@ -410,6 +410,30 @@ sparkR.session <- function( sparkSession } +#' Get the URL of the SparkUI instance for the current active SparkSession +#' +#' Get the URL of the SparkUI instance for the current active SparkSession. +#' +#' @return the SparkUI URL, or NA if it is disabled, or not started. +#' @rdname sparkR.uiWebUrl +#' @name sparkR.uiWebUrl +#' @export +#' @examples +#'\dontrun{ +#' sparkR.session() +#' url <- sparkR.uiWebUrl() +#' } +#' @note sparkR.uiWebUrl since 2.2.0 +sparkR.uiWebUrl <- function() { + sc <- sparkR.callJMethod(getSparkContext(), "sc") + u <- callJMethod(sc, "uiWebUrl") + if (callJMethod(u, "isDefined")) { +callJMethod(u, "get") + } else { +NA + } +} + #' Assigns a group ID to all the jobs started by this thread until the group ID is set to a #' different value or cleared. #' http://git-wip-us.apache.org/repos/asf/spark/blob/7e8994ff/R/pkg/inst/tests/testthat/test_sparkSQL.R -- diff --git a/R/pkg/inst/tests/testthat/test_sparkSQL.R b/R/pkg/inst/tests/testthat/test_sparkSQL.R index 2e95737..4490f31 100644 --- a/R/pkg/inst/tests/testthat/test_sparkSQL.R +++ b/R/pkg/inst/tests/testthat/test_sparkSQL.R @@ -2613,7 +2613,7 @@ test_that("randomSplit", { expect_true(all(sapply(abs(counts / num - weights / sum(weights)), function(e) { e < 0.05 }))) }) -test_that("Setting and getting config on SparkSession", { +test_that("Setting and getting config on SparkSession, sparkR.conf(), sparkR.uiWebUrl()", { # first, set it to a random but known value conf <- callJMethod(sparkSession, "conf") property <- paste0("spark.testing.", as.character(runif(1))) @@ -2637,6 +2637,9 @@ test_that("Setting and getting config on SparkSession", { expect_equal(appNameValue, "sparkSession test") expect_equal(testValue, value) expect_error(sparkR.conf("completely.dummy"), "Config 'completely.dummy' is not set") + + url <- sparkR.uiWebUrl() + expect_equal(substr(url, 1, 7), "http://";) }) test_that("enableHiveSupport on SparkSession", { - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-18528][SQL] Fix a bug to initialise an iterator of aggregation buffer
Repository: spark Updated Branches: refs/heads/branch-2.0 53cd99f65 -> 080ac37fb [SPARK-18528][SQL] Fix a bug to initialise an iterator of aggregation buffer ## What changes were proposed in this pull request? This pr is to fix an `NullPointerException` issue caused by a following `limit + aggregate` query; ``` scala> val df = Seq(("a", 1), ("b", 2), ("c", 1), ("d", 5)).toDF("id", "value") scala> df.limit(2).groupBy("id").count().show WARN TaskSetManager: Lost task 0.0 in stage 9.0 (TID 8204, lvsp20hdn012.stubprod.com): java.lang.NullPointerException at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithKeys$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source) ``` The root culprit is that [`$doAgg()`](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala#L596) skips an initialization of [the buffer iterator](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala#L603); `BaseLimitExec` sets `stopEarly=true` and `$doAgg()` exits in the middle without the initialization. ## How was this patch tested? Added a test to check if no exception happens for limit + aggregates in `DataFrameAggregateSuite.scala`. Author: Takeshi YAMAMURO Closes #15980 from maropu/SPARK-18528. (cherry picked from commit b41ec997786e2be42a8a2a182212a610d08b221b) Signed-off-by: Herman van Hovell Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/080ac37f Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/080ac37f Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/080ac37f Branch: refs/heads/branch-2.0 Commit: 080ac37fb5ff5f6b8781863866e8099eb9be4dba Parents: 53cd99f Author: Takeshi YAMAMURO Authored: Thu Dec 22 01:53:33 2016 +0100 Committer: Herman van Hovell Committed: Thu Dec 22 01:54:00 2016 +0100 -- .../apache/spark/sql/execution/BufferedRowIterator.java | 10 ++ .../spark/sql/execution/WholeStageCodegenExec.scala | 2 +- .../main/scala/org/apache/spark/sql/execution/limit.scala | 6 +++--- .../org/apache/spark/sql/DataFrameAggregateSuite.scala| 8 4 files changed, 22 insertions(+), 4 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/080ac37f/sql/core/src/main/java/org/apache/spark/sql/execution/BufferedRowIterator.java -- diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/BufferedRowIterator.java b/sql/core/src/main/java/org/apache/spark/sql/execution/BufferedRowIterator.java index 086547c..730a4ae 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/BufferedRowIterator.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/BufferedRowIterator.java @@ -70,6 +70,16 @@ public abstract class BufferedRowIterator { } /** + * Returns whether this iterator should stop fetching next row from [[CodegenSupport#inputRDDs]]. + * + * If it returns true, the caller should exit the loop that [[InputAdapter]] generates. + * This interface is mainly used to limit the number of input rows. + */ + protected boolean stopEarly() { +return false; + } + + /** * Returns whether `processNext()` should stop processing next row from `input` or not. * * If it returns true, the caller should exit the loop (return from processNext()). http://git-wip-us.apache.org/repos/asf/spark/blob/080ac37f/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala index fb57ed7..697db39 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala @@ -239,7 +239,7 @@ case class InputAdapter(child: SparkPlan) extends UnaryExecNode with CodegenSupp ctx.addMutableState("scala.collection.Iterator", input, s"$input = inputs[0];") val row = ctx.freshName("row") s""" - | while ($input.hasNext()) { + | while ($input.hasNext() && !stopEarly()) { | InternalRow $row = (InternalRow) $input.next(); | ${consume(ctx, null, row).trim} | if (shouldStop()) return; http://git-wip-us.apache.org/repos/asf/spark/blob/080ac37f/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala ---
spark git commit: [SPARK-18528][SQL] Fix a bug to initialise an iterator of aggregation buffer
Repository: spark Updated Branches: refs/heads/branch-2.1 60e02a173 -> 021952d58 [SPARK-18528][SQL] Fix a bug to initialise an iterator of aggregation buffer ## What changes were proposed in this pull request? This pr is to fix an `NullPointerException` issue caused by a following `limit + aggregate` query; ``` scala> val df = Seq(("a", 1), ("b", 2), ("c", 1), ("d", 5)).toDF("id", "value") scala> df.limit(2).groupBy("id").count().show WARN TaskSetManager: Lost task 0.0 in stage 9.0 (TID 8204, lvsp20hdn012.stubprod.com): java.lang.NullPointerException at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithKeys$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source) ``` The root culprit is that [`$doAgg()`](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala#L596) skips an initialization of [the buffer iterator](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala#L603); `BaseLimitExec` sets `stopEarly=true` and `$doAgg()` exits in the middle without the initialization. ## How was this patch tested? Added a test to check if no exception happens for limit + aggregates in `DataFrameAggregateSuite.scala`. Author: Takeshi YAMAMURO Closes #15980 from maropu/SPARK-18528. (cherry picked from commit b41ec997786e2be42a8a2a182212a610d08b221b) Signed-off-by: Herman van Hovell Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/021952d5 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/021952d5 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/021952d5 Branch: refs/heads/branch-2.1 Commit: 021952d5808715d0b9d6c716f8b67cd550f7982e Parents: 60e02a1 Author: Takeshi YAMAMURO Authored: Thu Dec 22 01:53:33 2016 +0100 Committer: Herman van Hovell Committed: Thu Dec 22 01:53:44 2016 +0100 -- .../apache/spark/sql/execution/BufferedRowIterator.java | 10 ++ .../spark/sql/execution/WholeStageCodegenExec.scala | 2 +- .../main/scala/org/apache/spark/sql/execution/limit.scala | 6 +++--- .../org/apache/spark/sql/DataFrameAggregateSuite.scala| 8 4 files changed, 22 insertions(+), 4 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/021952d5/sql/core/src/main/java/org/apache/spark/sql/execution/BufferedRowIterator.java -- diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/BufferedRowIterator.java b/sql/core/src/main/java/org/apache/spark/sql/execution/BufferedRowIterator.java index 086547c..730a4ae 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/BufferedRowIterator.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/BufferedRowIterator.java @@ -70,6 +70,16 @@ public abstract class BufferedRowIterator { } /** + * Returns whether this iterator should stop fetching next row from [[CodegenSupport#inputRDDs]]. + * + * If it returns true, the caller should exit the loop that [[InputAdapter]] generates. + * This interface is mainly used to limit the number of input rows. + */ + protected boolean stopEarly() { +return false; + } + + /** * Returns whether `processNext()` should stop processing next row from `input` or not. * * If it returns true, the caller should exit the loop (return from processNext()). http://git-wip-us.apache.org/repos/asf/spark/blob/021952d5/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala index 516b9d5..2ead8f6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala @@ -241,7 +241,7 @@ case class InputAdapter(child: SparkPlan) extends UnaryExecNode with CodegenSupp ctx.addMutableState("scala.collection.Iterator", input, s"$input = inputs[0];") val row = ctx.freshName("row") s""" - | while ($input.hasNext()) { + | while ($input.hasNext() && !stopEarly()) { | InternalRow $row = (InternalRow) $input.next(); | ${consume(ctx, null, row).trim} | if (shouldStop()) return; http://git-wip-us.apache.org/repos/asf/spark/blob/021952d5/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala ---
spark git commit: [SPARK-18528][SQL] Fix a bug to initialise an iterator of aggregation buffer
Repository: spark Updated Branches: refs/heads/master 83a6ace0d -> b41ec9977 [SPARK-18528][SQL] Fix a bug to initialise an iterator of aggregation buffer ## What changes were proposed in this pull request? This pr is to fix an `NullPointerException` issue caused by a following `limit + aggregate` query; ``` scala> val df = Seq(("a", 1), ("b", 2), ("c", 1), ("d", 5)).toDF("id", "value") scala> df.limit(2).groupBy("id").count().show WARN TaskSetManager: Lost task 0.0 in stage 9.0 (TID 8204, lvsp20hdn012.stubprod.com): java.lang.NullPointerException at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithKeys$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source) ``` The root culprit is that [`$doAgg()`](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala#L596) skips an initialization of [the buffer iterator](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala#L603); `BaseLimitExec` sets `stopEarly=true` and `$doAgg()` exits in the middle without the initialization. ## How was this patch tested? Added a test to check if no exception happens for limit + aggregates in `DataFrameAggregateSuite.scala`. Author: Takeshi YAMAMURO Closes #15980 from maropu/SPARK-18528. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b41ec997 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b41ec997 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b41ec997 Branch: refs/heads/master Commit: b41ec997786e2be42a8a2a182212a610d08b221b Parents: 83a6ace Author: Takeshi YAMAMURO Authored: Thu Dec 22 01:53:33 2016 +0100 Committer: Herman van Hovell Committed: Thu Dec 22 01:53:33 2016 +0100 -- .../apache/spark/sql/execution/BufferedRowIterator.java | 10 ++ .../spark/sql/execution/WholeStageCodegenExec.scala | 2 +- .../main/scala/org/apache/spark/sql/execution/limit.scala | 6 +++--- .../org/apache/spark/sql/DataFrameAggregateSuite.scala| 8 4 files changed, 22 insertions(+), 4 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/b41ec997/sql/core/src/main/java/org/apache/spark/sql/execution/BufferedRowIterator.java -- diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/BufferedRowIterator.java b/sql/core/src/main/java/org/apache/spark/sql/execution/BufferedRowIterator.java index 086547c..730a4ae 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/BufferedRowIterator.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/BufferedRowIterator.java @@ -70,6 +70,16 @@ public abstract class BufferedRowIterator { } /** + * Returns whether this iterator should stop fetching next row from [[CodegenSupport#inputRDDs]]. + * + * If it returns true, the caller should exit the loop that [[InputAdapter]] generates. + * This interface is mainly used to limit the number of input rows. + */ + protected boolean stopEarly() { +return false; + } + + /** * Returns whether `processNext()` should stop processing next row from `input` or not. * * If it returns true, the caller should exit the loop (return from processNext()). http://git-wip-us.apache.org/repos/asf/spark/blob/b41ec997/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala index 516b9d5..2ead8f6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala @@ -241,7 +241,7 @@ case class InputAdapter(child: SparkPlan) extends UnaryExecNode with CodegenSupp ctx.addMutableState("scala.collection.Iterator", input, s"$input = inputs[0];") val row = ctx.freshName("row") s""" - | while ($input.hasNext()) { + | while ($input.hasNext() && !stopEarly()) { | InternalRow $row = (InternalRow) $input.next(); | ${consume(ctx, null, row).trim} | if (shouldStop()) return; http://git-wip-us.apache.org/repos/asf/spark/blob/b41ec997/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala b/sql/c
spark git commit: [SPARK-18234][SS] Made update mode public
Repository: spark Updated Branches: refs/heads/branch-2.1 17ef57fe8 -> 60e02a173 [SPARK-18234][SS] Made update mode public ## What changes were proposed in this pull request? Made update mode public. As part of that here are the changes. - Update DatastreamWriter to accept "update" - Changed package of InternalOutputModes from o.a.s.sql to o.a.s.sql.catalyst - Added update mode state removing with watermark to StateStoreSaveExec ## How was this patch tested? Added new tests in changed modules Author: Tathagata Das Closes #16360 from tdas/SPARK-18234. (cherry picked from commit 83a6ace0d1be44f70e768348ae6688798c84343e) Signed-off-by: Tathagata Das Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/60e02a17 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/60e02a17 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/60e02a17 Branch: refs/heads/branch-2.1 Commit: 60e02a173ddf335d58852e56611131ec4409ae8b Parents: 17ef57f Author: Tathagata Das Authored: Wed Dec 21 16:43:17 2016 -0800 Committer: Tathagata Das Committed: Wed Dec 21 16:43:25 2016 -0800 -- .../apache/spark/sql/streaming/OutputMode.java | 12 +- .../apache/spark/sql/InternalOutputModes.scala | 47 --- .../analysis/UnsupportedOperationChecker.scala | 3 +- .../streaming/InternalOutputModes.scala | 47 +++ .../analysis/UnsupportedOperationsSuite.scala | 2 +- .../sql/execution/datasources/DataSource.scala | 2 +- .../execution/streaming/StatefulAggregate.scala | 61 ++-- .../spark/sql/execution/streaming/memory.scala | 5 +- .../spark/sql/streaming/DataStreamWriter.scala | 17 +- .../execution/streaming/MemorySinkSuite.scala | 287 +++ .../sql/streaming/EventTimeWatermarkSuite.scala | 55 +++- .../sql/streaming/FileStreamSinkSuite.scala | 22 +- .../sql/streaming/FileStreamSourceSuite.scala | 2 +- .../spark/sql/streaming/MemorySinkSuite.scala | 274 -- .../spark/sql/streaming/StreamSuite.scala | 8 +- .../streaming/StreamingAggregationSuite.scala | 2 +- .../test/DataStreamReaderWriterSuite.scala | 38 ++- 17 files changed, 507 insertions(+), 377 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/60e02a17/sql/catalyst/src/main/java/org/apache/spark/sql/streaming/OutputMode.java -- diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/streaming/OutputMode.java b/sql/catalyst/src/main/java/org/apache/spark/sql/streaming/OutputMode.java index a515c1a..cf0579f 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/streaming/OutputMode.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/streaming/OutputMode.java @@ -19,7 +19,7 @@ package org.apache.spark.sql.streaming; import org.apache.spark.annotation.Experimental; import org.apache.spark.annotation.InterfaceStability; -import org.apache.spark.sql.InternalOutputModes; +import org.apache.spark.sql.catalyst.streaming.InternalOutputModes; /** * :: Experimental :: @@ -54,4 +54,14 @@ public class OutputMode { public static OutputMode Complete() { return InternalOutputModes.Complete$.MODULE$; } + + /** + * OutputMode in which only the rows that were updated in the streaming DataFrame/Dataset will + * be written to the sink every time there are some updates. + * + * @since 2.1.1 + */ + public static OutputMode Update() { +return InternalOutputModes.Update$.MODULE$; + } } http://git-wip-us.apache.org/repos/asf/spark/blob/60e02a17/sql/catalyst/src/main/scala/org/apache/spark/sql/InternalOutputModes.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/InternalOutputModes.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/InternalOutputModes.scala deleted file mode 100644 index 594c41c..000 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/InternalOutputModes.scala +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - *http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permis
spark git commit: [SPARK-18234][SS] Made update mode public
Repository: spark Updated Branches: refs/heads/master afd9bc1d8 -> 83a6ace0d [SPARK-18234][SS] Made update mode public ## What changes were proposed in this pull request? Made update mode public. As part of that here are the changes. - Update DatastreamWriter to accept "update" - Changed package of InternalOutputModes from o.a.s.sql to o.a.s.sql.catalyst - Added update mode state removing with watermark to StateStoreSaveExec ## How was this patch tested? Added new tests in changed modules Author: Tathagata Das Closes #16360 from tdas/SPARK-18234. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/83a6ace0 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/83a6ace0 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/83a6ace0 Branch: refs/heads/master Commit: 83a6ace0d1be44f70e768348ae6688798c84343e Parents: afd9bc1 Author: Tathagata Das Authored: Wed Dec 21 16:43:17 2016 -0800 Committer: Tathagata Das Committed: Wed Dec 21 16:43:17 2016 -0800 -- .../apache/spark/sql/streaming/OutputMode.java | 12 +- .../apache/spark/sql/InternalOutputModes.scala | 47 --- .../analysis/UnsupportedOperationChecker.scala | 3 +- .../streaming/InternalOutputModes.scala | 47 +++ .../analysis/UnsupportedOperationsSuite.scala | 2 +- .../sql/execution/datasources/DataSource.scala | 2 +- .../execution/streaming/StatefulAggregate.scala | 61 ++-- .../spark/sql/execution/streaming/memory.scala | 5 +- .../spark/sql/streaming/DataStreamWriter.scala | 17 +- .../execution/streaming/MemorySinkSuite.scala | 287 +++ .../sql/streaming/EventTimeWatermarkSuite.scala | 55 +++- .../sql/streaming/FileStreamSinkSuite.scala | 22 +- .../sql/streaming/FileStreamSourceSuite.scala | 2 +- .../spark/sql/streaming/MemorySinkSuite.scala | 274 -- .../spark/sql/streaming/StreamSuite.scala | 8 +- .../streaming/StreamingAggregationSuite.scala | 2 +- .../test/DataStreamReaderWriterSuite.scala | 38 ++- 17 files changed, 507 insertions(+), 377 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/83a6ace0/sql/catalyst/src/main/java/org/apache/spark/sql/streaming/OutputMode.java -- diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/streaming/OutputMode.java b/sql/catalyst/src/main/java/org/apache/spark/sql/streaming/OutputMode.java index a515c1a..cf0579f 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/streaming/OutputMode.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/streaming/OutputMode.java @@ -19,7 +19,7 @@ package org.apache.spark.sql.streaming; import org.apache.spark.annotation.Experimental; import org.apache.spark.annotation.InterfaceStability; -import org.apache.spark.sql.InternalOutputModes; +import org.apache.spark.sql.catalyst.streaming.InternalOutputModes; /** * :: Experimental :: @@ -54,4 +54,14 @@ public class OutputMode { public static OutputMode Complete() { return InternalOutputModes.Complete$.MODULE$; } + + /** + * OutputMode in which only the rows that were updated in the streaming DataFrame/Dataset will + * be written to the sink every time there are some updates. + * + * @since 2.1.1 + */ + public static OutputMode Update() { +return InternalOutputModes.Update$.MODULE$; + } } http://git-wip-us.apache.org/repos/asf/spark/blob/83a6ace0/sql/catalyst/src/main/scala/org/apache/spark/sql/InternalOutputModes.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/InternalOutputModes.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/InternalOutputModes.scala deleted file mode 100644 index 594c41c..000 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/InternalOutputModes.scala +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - *http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql - -import org.apache.spark
spark git commit: [SPARK-17807][CORE] split test-tags into test-JAR
Repository: spark Updated Branches: refs/heads/master 95efc895e -> afd9bc1d8 [SPARK-17807][CORE] split test-tags into test-JAR Remove spark-tag's compile-scope dependency (and, indirectly, spark-core's compile-scope transitive-dependency) on scalatest by splitting test-oriented tags into spark-tags' test JAR. Alternative to #16303. Author: Ryan Williams Closes #16311 from ryan-williams/tt. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/afd9bc1d Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/afd9bc1d Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/afd9bc1d Branch: refs/heads/master Commit: afd9bc1d8a85adf88c412d8bc75e46e7ecb4bcdd Parents: 95efc89 Author: Ryan Williams Authored: Wed Dec 21 16:37:20 2016 -0800 Committer: Marcelo Vanzin Committed: Wed Dec 21 16:37:20 2016 -0800 -- common/network-common/pom.xml | 12 + common/network-shuffle/pom.xml | 12 + common/network-yarn/pom.xml | 11 common/sketch/pom.xml | 12 + common/tags/pom.xml | 8 -- .../java/org/apache/spark/tags/DockerTest.java | 26 --- .../org/apache/spark/tags/ExtendedHiveTest.java | 27 .../org/apache/spark/tags/ExtendedYarnTest.java | 27 .../java/org/apache/spark/tags/DockerTest.java | 26 +++ .../org/apache/spark/tags/ExtendedHiveTest.java | 27 .../org/apache/spark/tags/ExtendedYarnTest.java | 27 common/unsafe/pom.xml | 12 + core/pom.xml| 12 + external/docker-integration-tests/pom.xml | 2 +- external/flume-sink/pom.xml | 12 + external/flume/pom.xml | 12 + external/java8-tests/pom.xml| 12 + external/kafka-0-10-sql/pom.xml | 12 + external/kafka-0-10/pom.xml | 12 + external/kafka-0-8/pom.xml | 12 + external/kinesis-asl/pom.xml| 12 + graphx/pom.xml | 12 + launcher/pom.xml| 11 mllib-local/pom.xml | 12 + mllib/pom.xml | 12 + pom.xml | 6 + repl/pom.xml| 12 + resource-managers/yarn/pom.xml | 2 ++ sql/catalyst/pom.xml| 12 + sql/core/pom.xml| 12 + sql/hive-thriftserver/pom.xml | 12 + sql/hive/pom.xml| 2 ++ streaming/pom.xml | 11 33 files changed, 352 insertions(+), 89 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/afd9bc1d/common/network-common/pom.xml -- diff --git a/common/network-common/pom.xml b/common/network-common/pom.xml index b63c0ca..8657af7 100644 --- a/common/network-common/pom.xml +++ b/common/network-common/pom.xml @@ -91,6 +91,18 @@ org.apache.spark spark-tags_${scala.binary.version} + + + + org.apache.spark + spark-tags_${scala.binary.version} + test-jar + test + + org.mockito mockito-core http://git-wip-us.apache.org/repos/asf/spark/blob/afd9bc1d/common/network-shuffle/pom.xml -- diff --git a/common/network-shuffle/pom.xml b/common/network-shuffle/pom.xml index 5fc92af..24c10fb 100644 --- a/common/network-shuffle/pom.xml +++ b/common/network-shuffle/pom.xml @@ -70,6 +70,18 @@ org.apache.spark spark-tags_${scala.binary.version} + + + + org.apache.spark + spark-tags_${scala.binary.version} + test-jar + test + + log4j log4j http://git-wip-us.apache.org/repos/asf/spark/blob/afd9bc1d/common/network-yarn/pom.xml -- diff --git a/common/network-yarn/pom.xml b/common/network-yarn/pom.xml index 9fcd636..5e5a80b 100644 --- a/common/network-yarn/pom.xml +++ b/common/network-yarn/pom.xml @@ -50,6 +50,17 @@ spark-tags_${scala.binary.version} + + + org.apache.spark + spark-tags_${scala.binary.version} + test-jar + test + + org.apache.hadoop http://git-wip-us.apache.org/repos/asf/spark/
spark git commit: [SPARK-18588][SS][KAFKA] Create a new KafkaConsumer when error happens to fix the flaky test
Repository: spark Updated Branches: refs/heads/branch-2.1 0e51bb085 -> 17ef57fe8 [SPARK-18588][SS][KAFKA] Create a new KafkaConsumer when error happens to fix the flaky test ## What changes were proposed in this pull request? When KafkaSource fails on Kafka errors, we should create a new consumer to retry rather than using the existing broken one because it's possible that the broken one will fail again. This PR also assigns a new group id to the new created consumer for a possible race condition: the broken consumer cannot talk with the Kafka cluster in `close` but the new consumer can talk to Kafka cluster. I'm not sure if this will happen or not. Just for safety to avoid that the Kafka cluster thinks there are two consumers with the same group id in a short time window. (Note: CachedKafkaConsumer doesn't need this fix since `assign` never uses the group id.) ## How was this patch tested? In https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/70370/console , it ran this flaky test 120 times and all passed. Author: Shixiong Zhu Closes #16282 from zsxwing/kafka-fix. (cherry picked from commit 95efc895e929701a605313b87ad0cd91edee2f81) Signed-off-by: Tathagata Das Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/17ef57fe Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/17ef57fe Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/17ef57fe Branch: refs/heads/branch-2.1 Commit: 17ef57fe8dab7616200fdd9c00ff29f716459321 Parents: 0e51bb0 Author: Shixiong Zhu Authored: Wed Dec 21 15:39:36 2016 -0800 Committer: Tathagata Das Committed: Wed Dec 21 15:39:54 2016 -0800 -- dev/sparktestsupport/modules.py | 3 +- .../apache/spark/sql/kafka010/KafkaSource.scala | 58 ++-- .../sql/kafka010/KafkaSourceProvider.scala | 21 +++ .../spark/sql/kafka010/KafkaSourceSuite.scala | 2 +- 4 files changed, 52 insertions(+), 32 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/17ef57fe/dev/sparktestsupport/modules.py -- diff --git a/dev/sparktestsupport/modules.py b/dev/sparktestsupport/modules.py index b34ab51..0cf078c 100644 --- a/dev/sparktestsupport/modules.py +++ b/dev/sparktestsupport/modules.py @@ -245,7 +245,8 @@ streaming_kafka_0_10 = Module( name="streaming-kafka-0-10", dependencies=[streaming], source_file_regexes=[ -"external/kafka-0-10", +# The ending "/" is necessary otherwise it will include "sql-kafka" codes +"external/kafka-0-10/", "external/kafka-0-10-assembly", ], sbt_test_goals=[ http://git-wip-us.apache.org/repos/asf/spark/blob/17ef57fe/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala -- diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala index 92ee0ed..43b8d9d 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala @@ -24,7 +24,7 @@ import java.nio.charset.StandardCharsets import scala.collection.JavaConverters._ import scala.util.control.NonFatal -import org.apache.kafka.clients.consumer.{Consumer, KafkaConsumer, OffsetOutOfRangeException} +import org.apache.kafka.clients.consumer.{Consumer, ConsumerConfig, KafkaConsumer, OffsetOutOfRangeException} import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener import org.apache.kafka.common.TopicPartition @@ -81,14 +81,16 @@ import org.apache.spark.util.UninterruptibleThread * To avoid this issue, you should make sure stopping the query before stopping the Kafka brokers * and not use wrong broker addresses. */ -private[kafka010] case class KafkaSource( +private[kafka010] class KafkaSource( sqlContext: SQLContext, consumerStrategy: ConsumerStrategy, +driverKafkaParams: ju.Map[String, Object], executorKafkaParams: ju.Map[String, Object], sourceOptions: Map[String, String], metadataPath: String, startingOffsets: StartingOffsets, -failOnDataLoss: Boolean) +failOnDataLoss: Boolean, +driverGroupIdPrefix: String) extends Source with Logging { private val sc = sqlContext.sparkContext @@ -107,11 +109,31 @@ private[kafka010] case class KafkaSource( private val maxOffsetsPerTrigger = sourceOptions.get("maxOffsetsPerTrigger").map(_.toLong) + private var groupId: String = null + + private var nextId = 0 + + private def nextGroupId(): String = { +
spark git commit: [SPARK-18588][SS][KAFKA] Create a new KafkaConsumer when error happens to fix the flaky test
Repository: spark Updated Branches: refs/heads/master 354e93618 -> 95efc895e [SPARK-18588][SS][KAFKA] Create a new KafkaConsumer when error happens to fix the flaky test ## What changes were proposed in this pull request? When KafkaSource fails on Kafka errors, we should create a new consumer to retry rather than using the existing broken one because it's possible that the broken one will fail again. This PR also assigns a new group id to the new created consumer for a possible race condition: the broken consumer cannot talk with the Kafka cluster in `close` but the new consumer can talk to Kafka cluster. I'm not sure if this will happen or not. Just for safety to avoid that the Kafka cluster thinks there are two consumers with the same group id in a short time window. (Note: CachedKafkaConsumer doesn't need this fix since `assign` never uses the group id.) ## How was this patch tested? In https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/70370/console , it ran this flaky test 120 times and all passed. Author: Shixiong Zhu Closes #16282 from zsxwing/kafka-fix. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/95efc895 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/95efc895 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/95efc895 Branch: refs/heads/master Commit: 95efc895e929701a605313b87ad0cd91edee2f81 Parents: 354e936 Author: Shixiong Zhu Authored: Wed Dec 21 15:39:36 2016 -0800 Committer: Tathagata Das Committed: Wed Dec 21 15:39:36 2016 -0800 -- dev/sparktestsupport/modules.py | 3 +- .../apache/spark/sql/kafka010/KafkaSource.scala | 58 ++-- .../sql/kafka010/KafkaSourceProvider.scala | 21 +++ .../spark/sql/kafka010/KafkaSourceSuite.scala | 2 +- 4 files changed, 52 insertions(+), 32 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/95efc895/dev/sparktestsupport/modules.py -- diff --git a/dev/sparktestsupport/modules.py b/dev/sparktestsupport/modules.py index 1a7cf9a..10ad1fe 100644 --- a/dev/sparktestsupport/modules.py +++ b/dev/sparktestsupport/modules.py @@ -245,7 +245,8 @@ streaming_kafka_0_10 = Module( name="streaming-kafka-0-10", dependencies=[streaming], source_file_regexes=[ -"external/kafka-0-10", +# The ending "/" is necessary otherwise it will include "sql-kafka" codes +"external/kafka-0-10/", "external/kafka-0-10-assembly", ], sbt_test_goals=[ http://git-wip-us.apache.org/repos/asf/spark/blob/95efc895/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala -- diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala index 92ee0ed..43b8d9d 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala @@ -24,7 +24,7 @@ import java.nio.charset.StandardCharsets import scala.collection.JavaConverters._ import scala.util.control.NonFatal -import org.apache.kafka.clients.consumer.{Consumer, KafkaConsumer, OffsetOutOfRangeException} +import org.apache.kafka.clients.consumer.{Consumer, ConsumerConfig, KafkaConsumer, OffsetOutOfRangeException} import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener import org.apache.kafka.common.TopicPartition @@ -81,14 +81,16 @@ import org.apache.spark.util.UninterruptibleThread * To avoid this issue, you should make sure stopping the query before stopping the Kafka brokers * and not use wrong broker addresses. */ -private[kafka010] case class KafkaSource( +private[kafka010] class KafkaSource( sqlContext: SQLContext, consumerStrategy: ConsumerStrategy, +driverKafkaParams: ju.Map[String, Object], executorKafkaParams: ju.Map[String, Object], sourceOptions: Map[String, String], metadataPath: String, startingOffsets: StartingOffsets, -failOnDataLoss: Boolean) +failOnDataLoss: Boolean, +driverGroupIdPrefix: String) extends Source with Logging { private val sc = sqlContext.sparkContext @@ -107,11 +109,31 @@ private[kafka010] case class KafkaSource( private val maxOffsetsPerTrigger = sourceOptions.get("maxOffsetsPerTrigger").map(_.toLong) + private var groupId: String = null + + private var nextId = 0 + + private def nextGroupId(): String = { +groupId = driverGroupIdPrefix + "-" + nextId +nextId += 1 +groupId + } + /** * A KafkaConsum
spark git commit: [SPARK-18775][SQL] Limit the max number of records written per file
Repository: spark Updated Branches: refs/heads/master 078c71c2d -> 354e93618 [SPARK-18775][SQL] Limit the max number of records written per file ## What changes were proposed in this pull request? Currently, Spark writes a single file out per task, sometimes leading to very large files. It would be great to have an option to limit the max number of records written per file in a task, to avoid humongous files. This patch introduces a new write config option `maxRecordsPerFile` (default to a session-wide setting `spark.sql.files.maxRecordsPerFile`) that limits the max number of records written to a single file. A non-positive value indicates there is no limit (same behavior as not having this flag). ## How was this patch tested? Added test cases in PartitionedWriteSuite for both dynamic partition insert and non-dynamic partition insert. Author: Reynold Xin Closes #16204 from rxin/SPARK-18775. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/354e9361 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/354e9361 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/354e9361 Branch: refs/heads/master Commit: 354e936187708a404c0349e3d8815a47953123ec Parents: 078c71c Author: Reynold Xin Authored: Wed Dec 21 23:50:35 2016 +0100 Committer: Herman van Hovell Committed: Wed Dec 21 23:50:35 2016 +0100 -- .../datasources/FileFormatWriter.scala | 109 ++- .../org/apache/spark/sql/internal/SQLConf.scala | 26 +++-- .../datasources/BucketingUtilsSuite.scala | 46 .../sql/sources/PartitionedWriteSuite.scala | 37 +++ 4 files changed, 179 insertions(+), 39 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/354e9361/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala index d560ad5..1eb4541 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala @@ -31,13 +31,12 @@ import org.apache.spark._ import org.apache.spark.internal.Logging import org.apache.spark.internal.io.{FileCommitProtocol, SparkHadoopWriterUtils} import org.apache.spark.internal.io.FileCommitProtocol.TaskCommitMessage -import org.apache.spark.sql.{Dataset, SparkSession} +import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.catalog.{BucketSpec, ExternalCatalogUtils} import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.{QueryExecution, SQLExecution, UnsafeKVExternalSorter} import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType} import org.apache.spark.util.{SerializableConfiguration, Utils} @@ -47,6 +46,13 @@ import org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter /** A helper object for writing FileFormat data out to a location. */ object FileFormatWriter extends Logging { + /** + * Max number of files a single task writes out due to file size. In most cases the number of + * files written should be very small. This is just a safe guard to protect some really bad + * settings, e.g. maxRecordsPerFile = 1. + */ + private val MAX_FILE_COUNTER = 1000 * 1000 + /** Describes how output files should be placed in the filesystem. */ case class OutputSpec( outputPath: String, customPartitionLocations: Map[TablePartitionSpec, String]) @@ -61,7 +67,8 @@ object FileFormatWriter extends Logging { val nonPartitionColumns: Seq[Attribute], val bucketSpec: Option[BucketSpec], val path: String, - val customPartitionLocations: Map[TablePartitionSpec, String]) + val customPartitionLocations: Map[TablePartitionSpec, String], + val maxRecordsPerFile: Long) extends Serializable { assert(AttributeSet(allColumns) == AttributeSet(partitionColumns ++ nonPartitionColumns), @@ -116,7 +123,10 @@ object FileFormatWriter extends Logging { nonPartitionColumns = dataColumns, bucketSpec = bucketSpec, path = outputSpec.outputPath, - customPartitionLocations = outputSpec.customPartitionLocations) + customPartitionLocations = outputSpec.customPartitionLocations, + maxRecordsPer
spark git commit: [SPARK-18949][SQL][BACKPORT-2.1] Add recoverPartitions API to Catalog
Repository: spark Updated Branches: refs/heads/branch-2.1 318483421 -> 0e51bb085 [SPARK-18949][SQL][BACKPORT-2.1] Add recoverPartitions API to Catalog ### What changes were proposed in this pull request? This PR is to backport https://github.com/apache/spark/pull/16356 to Spark 2.1.1 branch. Currently, we only have a SQL interface for recovering all the partitions in the directory of a table and update the catalog. `MSCK REPAIR TABLE` or `ALTER TABLE table RECOVER PARTITIONS`. (Actually, very hard for me to remember `MSCK` and have no clue what it means) After the new "Scalable Partition Handling", the table repair becomes much more important for making visible the data in the created data source partitioned table. Thus, this PR is to add it into the Catalog interface. After this PR, users can repair the table by ```Scala spark.catalog.recoverPartitions("testTable") ``` ### How was this patch tested? Modified the existing test cases. Author: gatorsmile Closes #16372 from gatorsmile/repairTable2.1.1. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0e51bb08 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0e51bb08 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0e51bb08 Branch: refs/heads/branch-2.1 Commit: 0e51bb085446a482c22eaef93aea513610f41f48 Parents: 3184834 Author: gatorsmile Authored: Wed Dec 21 13:55:40 2016 -0800 Committer: Reynold Xin Committed: Wed Dec 21 13:55:40 2016 -0800 -- project/MimaExcludes.scala| 5 - python/pyspark/sql/catalog.py | 5 + .../scala/org/apache/spark/sql/catalog/Catalog.scala | 7 +++ .../org/apache/spark/sql/internal/CatalogImpl.scala | 14 ++ .../hive/PartitionProviderCompatibilitySuite.scala| 6 +++--- 5 files changed, 33 insertions(+), 4 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/0e51bb08/project/MimaExcludes.scala -- diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index 978a328..6d1b4d2 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -110,7 +110,10 @@ object MimaExcludes { ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.sql.streaming.StreamingQueryException.startOffset"), ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.sql.streaming.StreamingQueryException.endOffset"), ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.sql.streaming.StreamingQueryException.this"), - ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.streaming.StreamingQueryException.query") + ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.streaming.StreamingQueryException.query"), + + // [SPARK-18949] [SQL] Add repairTable API to Catalog + ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.catalog.Catalog.recoverPartitions") ) } http://git-wip-us.apache.org/repos/asf/spark/blob/0e51bb08/python/pyspark/sql/catalog.py -- diff --git a/python/pyspark/sql/catalog.py b/python/pyspark/sql/catalog.py index a36d02e..30c7a3f 100644 --- a/python/pyspark/sql/catalog.py +++ b/python/pyspark/sql/catalog.py @@ -258,6 +258,11 @@ class Catalog(object): """Invalidate and refresh all the cached metadata of the given table.""" self._jcatalog.refreshTable(tableName) +@since('2.1.1') +def recoverPartitions(self, tableName): +"""Recover all the partitions of the given table and update the catalog.""" +self._jcatalog.recoverPartitions(tableName) + def _reset(self): """(Internal use only) Drop all existing databases (except "default"), tables, partitions and functions, and set the current database to "default". http://git-wip-us.apache.org/repos/asf/spark/blob/0e51bb08/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala index aecdda1..6b061f8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala @@ -301,6 +301,13 @@ abstract class Catalog { def dropGlobalTempView(viewName: String): Boolean /** + * Recover all the partitions in the directory of a table and update the catalog. + * + * @since 2.1.1 + */ + def recoverPartitions(tableName: String): Unit + + /** * Returns true
spark git commit: [SPARK-18700][SQL][BACKPORT-2.0] Add StripedLock for each table's relation in cache
Repository: spark Updated Branches: refs/heads/branch-2.0 5f8c0b742 -> 53cd99f65 [SPARK-18700][SQL][BACKPORT-2.0] Add StripedLock for each table's relation in cache ## What changes were proposed in this pull request? Backport of #16135 to branch-2.0 ## How was this patch tested? Because of the diff between branch-2.0 and master/2.1, here add a multi-thread access table test in `HiveMetadataCacheSuite` and check it only loading once using metrics in `HiveCatalogMetrics` Author: xuanyuanking Closes #16350 from xuanyuanking/SPARK-18700-2.0. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/53cd99f6 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/53cd99f6 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/53cd99f6 Branch: refs/heads/branch-2.0 Commit: 53cd99f65667c4d49db000101460a9d266f199e8 Parents: 5f8c0b7 Author: xuanyuanking Authored: Wed Dec 21 22:55:42 2016 +0100 Committer: Herman van Hovell Committed: Wed Dec 21 22:55:42 2016 +0100 -- .../spark/sql/hive/HiveMetastoreCatalog.scala | 147 +++ 1 file changed, 82 insertions(+), 65 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/53cd99f6/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala -- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index e7d1ed3..670400f 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.hive import scala.collection.JavaConverters._ import com.google.common.cache.{CacheBuilder, CacheLoader, LoadingCache} +import com.google.common.util.concurrent.Striped import org.apache.hadoop.fs.Path import org.apache.spark.internal.Logging @@ -65,6 +66,18 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log t.identifier.table.toLowerCase) } + /** These locks guard against multiple attempts to instantiate a table, which wastes memory. */ + private val tableCreationLocks = Striped.lazyWeakLock(100) + + /** Acquires a lock on the table cache for the duration of `f`. */ + private def withTableCreationLock[A](tableName: QualifiedTableName, f: => A): A = { +val lock = tableCreationLocks.get(tableName) +lock.lock() +try f finally { + lock.unlock() +} + } + /** A cache of Spark SQL data source tables that have been accessed. */ protected[hive] val cachedDataSourceTables: LoadingCache[QualifiedTableName, LogicalPlan] = { val cacheLoader = new CacheLoader[QualifiedTableName, LogicalPlan]() { @@ -274,77 +287,81 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log partitionPaths } - val cached = getCached( -tableIdentifier, -paths, -metastoreRelation, -metastoreSchema, -fileFormatClass, -bucketSpec, -Some(partitionSpec)) - - val hadoopFsRelation = cached.getOrElse { -val fileCatalog = new MetaStorePartitionedTableFileCatalog( - sparkSession, - new Path(metastoreRelation.catalogTable.storage.locationUri.get), - partitionSpec) - -val inferredSchema = if (fileType.equals("parquet")) { - val inferredSchema = -defaultSource.inferSchema(sparkSession, options, fileCatalog.allFiles()) - inferredSchema.map { inferred => -ParquetFileFormat.mergeMetastoreParquetSchema(metastoreSchema, inferred) - }.getOrElse(metastoreSchema) -} else { - defaultSource.inferSchema(sparkSession, options, fileCatalog.allFiles()).get -} + withTableCreationLock(tableIdentifier, { +val cached = getCached( + tableIdentifier, + paths, + metastoreRelation, + metastoreSchema, + fileFormatClass, + bucketSpec, + Some(partitionSpec)) + +val hadoopFsRelation = cached.getOrElse { + val fileCatalog = new MetaStorePartitionedTableFileCatalog( +sparkSession, +new Path(metastoreRelation.catalogTable.storage.locationUri.get), +partitionSpec) + + val inferredSchema = if (fileType.equals("parquet")) { +val inferredSchema = + defaultSource.inferSchema(sparkSession, options, fileCatalog.allFiles()) +inferredSchema.map { inferred => + ParquetFileFormat.mergeMetastoreParquetSchema(metastoreSchema, inferred) +}.getOrElse(metastoreSchema) +
spark git commit: [SPARK-18954][TESTS] Fix flaky test: o.a.s.streaming.BasicOperationsSuite rdd cleanup - map and window
Repository: spark Updated Branches: refs/heads/branch-2.0 ef206ace2 -> 5f8c0b742 [SPARK-18954][TESTS] Fix flaky test: o.a.s.streaming.BasicOperationsSuite rdd cleanup - map and window ## What changes were proposed in this pull request? The issue in this test is the cleanup of RDDs may not be able to finish before stopping StreamingContext. This PR basically just puts the assertions into `eventually` and runs it before stopping StreamingContext. ## How was this patch tested? Jenkins Author: Shixiong Zhu Closes #16362 from zsxwing/SPARK-18954. (cherry picked from commit 078c71c2dcbb1470d22f8eb8138fb17e3d7c2414) Signed-off-by: Shixiong Zhu Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5f8c0b74 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5f8c0b74 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5f8c0b74 Branch: refs/heads/branch-2.0 Commit: 5f8c0b742d9a6bc82dd257075ba4c58b1257a476 Parents: ef206ac Author: Shixiong Zhu Authored: Wed Dec 21 11:59:21 2016 -0800 Committer: Shixiong Zhu Committed: Wed Dec 21 11:59:38 2016 -0800 -- .../spark/streaming/BasicOperationsSuite.scala | 98 .../apache/spark/streaming/TestSuiteBase.scala | 19 +++- 2 files changed, 73 insertions(+), 44 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/5f8c0b74/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala -- diff --git a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala index cfcbdc7..4a925fc 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala @@ -19,13 +19,13 @@ package org.apache.spark.streaming import java.util.concurrent.ConcurrentLinkedQueue -import scala.collection.JavaConverters._ import scala.collection.mutable import scala.language.existentials import scala.reflect.ClassTag +import org.scalatest.concurrent.Eventually.eventually + import org.apache.spark.{SparkConf, SparkException} -import org.apache.spark.SparkContext._ import org.apache.spark.rdd.{BlockRDD, RDD} import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.dstream.{DStream, WindowedDStream} @@ -591,48 +591,57 @@ class BasicOperationsSuite extends TestSuiteBase { .window(Seconds(4), Seconds(2)) } -val operatedStream = runCleanupTest(conf, operation _, - numExpectedOutput = cleanupTestInput.size / 2, rememberDuration = Seconds(3)) -val windowedStream2 = operatedStream.asInstanceOf[WindowedDStream[_]] -val windowedStream1 = windowedStream2.dependencies.head.asInstanceOf[WindowedDStream[_]] -val mappedStream = windowedStream1.dependencies.head - -// Checkpoint remember durations -assert(windowedStream2.rememberDuration === rememberDuration) -assert(windowedStream1.rememberDuration === rememberDuration + windowedStream2.windowDuration) -assert(mappedStream.rememberDuration === - rememberDuration + windowedStream2.windowDuration + windowedStream1.windowDuration) - -// WindowedStream2 should remember till 7 seconds: 10, 9, 8, 7 -// WindowedStream1 should remember till 4 seconds: 10, 9, 8, 7, 6, 5, 4 -// MappedStream should remember till 2 seconds:10, 9, 8, 7, 6, 5, 4, 3, 2 - -// WindowedStream2 -assert(windowedStream2.generatedRDDs.contains(Time(1))) -assert(windowedStream2.generatedRDDs.contains(Time(8000))) -assert(!windowedStream2.generatedRDDs.contains(Time(6000))) - -// WindowedStream1 -assert(windowedStream1.generatedRDDs.contains(Time(1))) -assert(windowedStream1.generatedRDDs.contains(Time(4000))) -assert(!windowedStream1.generatedRDDs.contains(Time(3000))) - -// MappedStream -assert(mappedStream.generatedRDDs.contains(Time(1))) -assert(mappedStream.generatedRDDs.contains(Time(2000))) -assert(!mappedStream.generatedRDDs.contains(Time(1000))) +runCleanupTest( +conf, +operation _, +numExpectedOutput = cleanupTestInput.size / 2, +rememberDuration = Seconds(3)) { operatedStream => + eventually(eventuallyTimeout) { +val windowedStream2 = operatedStream.asInstanceOf[WindowedDStream[_]] +val windowedStream1 = windowedStream2.dependencies.head.asInstanceOf[WindowedDStream[_]] +val mappedStream = windowedStream1.dependencies.head + +// Checkpoint remember durations +assert(windowedStream2.rememberDuration === rememberDuration) +assert( + windowedStream1.rememberDuration === rememberDurati
spark git commit: [SPARK-18954][TESTS] Fix flaky test: o.a.s.streaming.BasicOperationsSuite rdd cleanup - map and window
Repository: spark Updated Branches: refs/heads/branch-2.1 162bdb910 -> 318483421 [SPARK-18954][TESTS] Fix flaky test: o.a.s.streaming.BasicOperationsSuite rdd cleanup - map and window ## What changes were proposed in this pull request? The issue in this test is the cleanup of RDDs may not be able to finish before stopping StreamingContext. This PR basically just puts the assertions into `eventually` and runs it before stopping StreamingContext. ## How was this patch tested? Jenkins Author: Shixiong Zhu Closes #16362 from zsxwing/SPARK-18954. (cherry picked from commit 078c71c2dcbb1470d22f8eb8138fb17e3d7c2414) Signed-off-by: Shixiong Zhu Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/31848342 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/31848342 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/31848342 Branch: refs/heads/branch-2.1 Commit: 318483421adc3c2e22744c4c580917377ce40b3f Parents: 162bdb9 Author: Shixiong Zhu Authored: Wed Dec 21 11:59:21 2016 -0800 Committer: Shixiong Zhu Committed: Wed Dec 21 11:59:28 2016 -0800 -- .../spark/streaming/BasicOperationsSuite.scala | 98 .../apache/spark/streaming/TestSuiteBase.scala | 19 +++- 2 files changed, 73 insertions(+), 44 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/31848342/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala -- diff --git a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala index 4e702bb..a3062ac 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala @@ -19,13 +19,13 @@ package org.apache.spark.streaming import java.util.concurrent.ConcurrentLinkedQueue -import scala.collection.JavaConverters._ import scala.collection.mutable import scala.language.existentials import scala.reflect.ClassTag +import org.scalatest.concurrent.Eventually.eventually + import org.apache.spark.{SparkConf, SparkException} -import org.apache.spark.SparkContext._ import org.apache.spark.rdd.{BlockRDD, RDD} import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.dstream.{DStream, WindowedDStream} @@ -657,48 +657,57 @@ class BasicOperationsSuite extends TestSuiteBase { .window(Seconds(4), Seconds(2)) } -val operatedStream = runCleanupTest(conf, operation _, - numExpectedOutput = cleanupTestInput.size / 2, rememberDuration = Seconds(3)) -val windowedStream2 = operatedStream.asInstanceOf[WindowedDStream[_]] -val windowedStream1 = windowedStream2.dependencies.head.asInstanceOf[WindowedDStream[_]] -val mappedStream = windowedStream1.dependencies.head - -// Checkpoint remember durations -assert(windowedStream2.rememberDuration === rememberDuration) -assert(windowedStream1.rememberDuration === rememberDuration + windowedStream2.windowDuration) -assert(mappedStream.rememberDuration === - rememberDuration + windowedStream2.windowDuration + windowedStream1.windowDuration) - -// WindowedStream2 should remember till 7 seconds: 10, 9, 8, 7 -// WindowedStream1 should remember till 4 seconds: 10, 9, 8, 7, 6, 5, 4 -// MappedStream should remember till 2 seconds:10, 9, 8, 7, 6, 5, 4, 3, 2 - -// WindowedStream2 -assert(windowedStream2.generatedRDDs.contains(Time(1))) -assert(windowedStream2.generatedRDDs.contains(Time(8000))) -assert(!windowedStream2.generatedRDDs.contains(Time(6000))) - -// WindowedStream1 -assert(windowedStream1.generatedRDDs.contains(Time(1))) -assert(windowedStream1.generatedRDDs.contains(Time(4000))) -assert(!windowedStream1.generatedRDDs.contains(Time(3000))) - -// MappedStream -assert(mappedStream.generatedRDDs.contains(Time(1))) -assert(mappedStream.generatedRDDs.contains(Time(2000))) -assert(!mappedStream.generatedRDDs.contains(Time(1000))) +runCleanupTest( +conf, +operation _, +numExpectedOutput = cleanupTestInput.size / 2, +rememberDuration = Seconds(3)) { operatedStream => + eventually(eventuallyTimeout) { +val windowedStream2 = operatedStream.asInstanceOf[WindowedDStream[_]] +val windowedStream1 = windowedStream2.dependencies.head.asInstanceOf[WindowedDStream[_]] +val mappedStream = windowedStream1.dependencies.head + +// Checkpoint remember durations +assert(windowedStream2.rememberDuration === rememberDuration) +assert( + windowedStream1.rememberDuration === rememberDurati
spark git commit: [SPARK-18954][TESTS] Fix flaky test: o.a.s.streaming.BasicOperationsSuite rdd cleanup - map and window
Repository: spark Updated Branches: refs/heads/master ccfe60a83 -> 078c71c2d [SPARK-18954][TESTS] Fix flaky test: o.a.s.streaming.BasicOperationsSuite rdd cleanup - map and window ## What changes were proposed in this pull request? The issue in this test is the cleanup of RDDs may not be able to finish before stopping StreamingContext. This PR basically just puts the assertions into `eventually` and runs it before stopping StreamingContext. ## How was this patch tested? Jenkins Author: Shixiong Zhu Closes #16362 from zsxwing/SPARK-18954. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/078c71c2 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/078c71c2 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/078c71c2 Branch: refs/heads/master Commit: 078c71c2dcbb1470d22f8eb8138fb17e3d7c2414 Parents: ccfe60a Author: Shixiong Zhu Authored: Wed Dec 21 11:59:21 2016 -0800 Committer: Shixiong Zhu Committed: Wed Dec 21 11:59:21 2016 -0800 -- .../spark/streaming/BasicOperationsSuite.scala | 98 .../apache/spark/streaming/TestSuiteBase.scala | 19 +++- 2 files changed, 73 insertions(+), 44 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/078c71c2/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala -- diff --git a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala index 4e702bb..a3062ac 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala @@ -19,13 +19,13 @@ package org.apache.spark.streaming import java.util.concurrent.ConcurrentLinkedQueue -import scala.collection.JavaConverters._ import scala.collection.mutable import scala.language.existentials import scala.reflect.ClassTag +import org.scalatest.concurrent.Eventually.eventually + import org.apache.spark.{SparkConf, SparkException} -import org.apache.spark.SparkContext._ import org.apache.spark.rdd.{BlockRDD, RDD} import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.dstream.{DStream, WindowedDStream} @@ -657,48 +657,57 @@ class BasicOperationsSuite extends TestSuiteBase { .window(Seconds(4), Seconds(2)) } -val operatedStream = runCleanupTest(conf, operation _, - numExpectedOutput = cleanupTestInput.size / 2, rememberDuration = Seconds(3)) -val windowedStream2 = operatedStream.asInstanceOf[WindowedDStream[_]] -val windowedStream1 = windowedStream2.dependencies.head.asInstanceOf[WindowedDStream[_]] -val mappedStream = windowedStream1.dependencies.head - -// Checkpoint remember durations -assert(windowedStream2.rememberDuration === rememberDuration) -assert(windowedStream1.rememberDuration === rememberDuration + windowedStream2.windowDuration) -assert(mappedStream.rememberDuration === - rememberDuration + windowedStream2.windowDuration + windowedStream1.windowDuration) - -// WindowedStream2 should remember till 7 seconds: 10, 9, 8, 7 -// WindowedStream1 should remember till 4 seconds: 10, 9, 8, 7, 6, 5, 4 -// MappedStream should remember till 2 seconds:10, 9, 8, 7, 6, 5, 4, 3, 2 - -// WindowedStream2 -assert(windowedStream2.generatedRDDs.contains(Time(1))) -assert(windowedStream2.generatedRDDs.contains(Time(8000))) -assert(!windowedStream2.generatedRDDs.contains(Time(6000))) - -// WindowedStream1 -assert(windowedStream1.generatedRDDs.contains(Time(1))) -assert(windowedStream1.generatedRDDs.contains(Time(4000))) -assert(!windowedStream1.generatedRDDs.contains(Time(3000))) - -// MappedStream -assert(mappedStream.generatedRDDs.contains(Time(1))) -assert(mappedStream.generatedRDDs.contains(Time(2000))) -assert(!mappedStream.generatedRDDs.contains(Time(1000))) +runCleanupTest( +conf, +operation _, +numExpectedOutput = cleanupTestInput.size / 2, +rememberDuration = Seconds(3)) { operatedStream => + eventually(eventuallyTimeout) { +val windowedStream2 = operatedStream.asInstanceOf[WindowedDStream[_]] +val windowedStream1 = windowedStream2.dependencies.head.asInstanceOf[WindowedDStream[_]] +val mappedStream = windowedStream1.dependencies.head + +// Checkpoint remember durations +assert(windowedStream2.rememberDuration === rememberDuration) +assert( + windowedStream1.rememberDuration === rememberDuration + windowedStream2.windowDuration) +assert(mappedStream.rememberDuration === + remember
spark git commit: [SPARK-18031][TESTS] Fix flaky test ExecutorAllocationManagerSuite.basic functionality
Repository: spark Updated Branches: refs/heads/branch-2.1 3c8861d92 -> 162bdb910 [SPARK-18031][TESTS] Fix flaky test ExecutorAllocationManagerSuite.basic functionality ## What changes were proposed in this pull request? The failure is because in `test("basic functionality")`, it doesn't block until `ExecutorAllocationManager.manageAllocation` is called. This PR just adds StreamManualClock to allow the tests to block on expected wait time to make the test deterministic. ## How was this patch tested? Jenkins Author: Shixiong Zhu Closes #16321 from zsxwing/SPARK-18031. (cherry picked from commit ccfe60a8304871779ff1b31b8c2d724f59d5b2af) Signed-off-by: Tathagata Das Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/162bdb91 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/162bdb91 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/162bdb91 Branch: refs/heads/branch-2.1 Commit: 162bdb9103ecba99cd73004ede4d55ff8fc8 Parents: 3c8861d Author: Shixiong Zhu Authored: Wed Dec 21 11:17:44 2016 -0800 Committer: Tathagata Das Committed: Wed Dec 21 11:17:53 2016 -0800 -- .../ExecutorAllocationManagerSuite.scala| 36 +--- 1 file changed, 32 insertions(+), 4 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/162bdb91/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManagerSuite.scala -- diff --git a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManagerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManagerSuite.scala index b49e579..1d2bf35 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManagerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManagerSuite.scala @@ -36,11 +36,11 @@ class ExecutorAllocationManagerSuite extends SparkFunSuite private val batchDurationMillis = 1000L private var allocationClient: ExecutorAllocationClient = null - private var clock: ManualClock = null + private var clock: StreamManualClock = null before { allocationClient = mock[ExecutorAllocationClient] -clock = new ManualClock() +clock = new StreamManualClock() } test("basic functionality") { @@ -57,10 +57,14 @@ class ExecutorAllocationManagerSuite extends SparkFunSuite reset(allocationClient) when(allocationClient.getExecutorIds()).thenReturn(Seq("1", "2")) addBatchProcTime(allocationManager, batchProcTimeMs.toLong) -clock.advance(SCALING_INTERVAL_DEFAULT_SECS * 1000 + 1) +val advancedTime = SCALING_INTERVAL_DEFAULT_SECS * 1000 + 1 +val expectedWaitTime = clock.getTimeMillis() + advancedTime +clock.advance(advancedTime) +// Make sure ExecutorAllocationManager.manageAllocation is called eventually(timeout(10 seconds)) { - body + assert(clock.isStreamWaitingAt(expectedWaitTime)) } +body } /** Verify that the expected number of total executor were requested */ @@ -394,3 +398,27 @@ class ExecutorAllocationManagerSuite extends SparkFunSuite } } } + +/** + * A special manual clock that provide `isStreamWaitingAt` to allow the user to check if the clock + * is blocking. + */ +class StreamManualClock(time: Long = 0L) extends ManualClock(time) with Serializable { + private var waitStartTime: Option[Long] = None + + override def waitTillTime(targetTime: Long): Long = synchronized { +try { + waitStartTime = Some(getTimeMillis()) + super.waitTillTime(targetTime) +} finally { + waitStartTime = None +} + } + + /** + * Returns if the clock is blocking and the time it started to block is the parameter `time`. + */ + def isStreamWaitingAt(time: Long): Boolean = synchronized { +waitStartTime == Some(time) + } +} - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-18031][TESTS] Fix flaky test ExecutorAllocationManagerSuite.basic functionality
Repository: spark Updated Branches: refs/heads/master 607a1e63d -> ccfe60a83 [SPARK-18031][TESTS] Fix flaky test ExecutorAllocationManagerSuite.basic functionality ## What changes were proposed in this pull request? The failure is because in `test("basic functionality")`, it doesn't block until `ExecutorAllocationManager.manageAllocation` is called. This PR just adds StreamManualClock to allow the tests to block on expected wait time to make the test deterministic. ## How was this patch tested? Jenkins Author: Shixiong Zhu Closes #16321 from zsxwing/SPARK-18031. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ccfe60a8 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ccfe60a8 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ccfe60a8 Branch: refs/heads/master Commit: ccfe60a8304871779ff1b31b8c2d724f59d5b2af Parents: 607a1e6 Author: Shixiong Zhu Authored: Wed Dec 21 11:17:44 2016 -0800 Committer: Tathagata Das Committed: Wed Dec 21 11:17:44 2016 -0800 -- .../ExecutorAllocationManagerSuite.scala| 36 +--- 1 file changed, 32 insertions(+), 4 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/ccfe60a8/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManagerSuite.scala -- diff --git a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManagerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManagerSuite.scala index b49e579..1d2bf35 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManagerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManagerSuite.scala @@ -36,11 +36,11 @@ class ExecutorAllocationManagerSuite extends SparkFunSuite private val batchDurationMillis = 1000L private var allocationClient: ExecutorAllocationClient = null - private var clock: ManualClock = null + private var clock: StreamManualClock = null before { allocationClient = mock[ExecutorAllocationClient] -clock = new ManualClock() +clock = new StreamManualClock() } test("basic functionality") { @@ -57,10 +57,14 @@ class ExecutorAllocationManagerSuite extends SparkFunSuite reset(allocationClient) when(allocationClient.getExecutorIds()).thenReturn(Seq("1", "2")) addBatchProcTime(allocationManager, batchProcTimeMs.toLong) -clock.advance(SCALING_INTERVAL_DEFAULT_SECS * 1000 + 1) +val advancedTime = SCALING_INTERVAL_DEFAULT_SECS * 1000 + 1 +val expectedWaitTime = clock.getTimeMillis() + advancedTime +clock.advance(advancedTime) +// Make sure ExecutorAllocationManager.manageAllocation is called eventually(timeout(10 seconds)) { - body + assert(clock.isStreamWaitingAt(expectedWaitTime)) } +body } /** Verify that the expected number of total executor were requested */ @@ -394,3 +398,27 @@ class ExecutorAllocationManagerSuite extends SparkFunSuite } } } + +/** + * A special manual clock that provide `isStreamWaitingAt` to allow the user to check if the clock + * is blocking. + */ +class StreamManualClock(time: Long = 0L) extends ManualClock(time) with Serializable { + private var waitStartTime: Option[Long] = None + + override def waitTillTime(targetTime: Long): Long = synchronized { +try { + waitStartTime = Some(getTimeMillis()) + super.waitTillTime(targetTime) +} finally { + waitStartTime = None +} + } + + /** + * Returns if the clock is blocking and the time it started to block is the parameter `time`. + */ + def isStreamWaitingAt(time: Long): Boolean = synchronized { +waitStartTime == Some(time) + } +} - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-18894][SS] Fix event time watermark delay threshold specified in months or years
Repository: spark Updated Branches: refs/heads/branch-2.1 bc54a14b4 -> 3c8861d92 [SPARK-18894][SS] Fix event time watermark delay threshold specified in months or years ## What changes were proposed in this pull request? Two changes - Fix how delays specified in months and years are translated to milliseconds - Following up on #16258, not show watermark when there is no watermarking in the query ## How was this patch tested? Updated and new unit tests Author: Tathagata Das Closes #16304 from tdas/SPARK-18834-1. (cherry picked from commit 607a1e63dbc9269b806a9f537e1d041029333cdd) Signed-off-by: Shixiong Zhu Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3c8861d9 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3c8861d9 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3c8861d9 Branch: refs/heads/branch-2.1 Commit: 3c8861d924e42ff84615044930fc5531201b9b12 Parents: bc54a14 Author: Tathagata Das Authored: Wed Dec 21 10:44:20 2016 -0800 Committer: Shixiong Zhu Committed: Wed Dec 21 10:44:27 2016 -0800 -- .../streaming/EventTimeWatermarkExec.scala | 7 +- .../execution/streaming/ProgressReporter.scala | 7 +- .../execution/streaming/StreamExecution.scala | 2 +- .../sql/streaming/EventTimeWatermarkSuite.scala | 287 +++ .../spark/sql/streaming/WatermarkSuite.scala| 240 5 files changed, 299 insertions(+), 244 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/3c8861d9/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala index e8570d0..5a9a99e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala @@ -84,6 +84,11 @@ case class EventTimeWatermarkExec( child: SparkPlan) extends SparkPlan { val eventTimeStats = new EventTimeStatsAccum() + val delayMs = { +val millisPerMonth = CalendarInterval.MICROS_PER_DAY / 1000 * 31 +delay.milliseconds + delay.months * millisPerMonth + } + sparkContext.register(eventTimeStats) override protected def doExecute(): RDD[InternalRow] = { @@ -101,7 +106,7 @@ case class EventTimeWatermarkExec( if (a semanticEquals eventTime) { val updatedMetadata = new MetadataBuilder() .withMetadata(a.metadata) - .putLong(EventTimeWatermark.delayKey, delay.milliseconds) + .putLong(EventTimeWatermark.delayKey, delayMs) .build() a.withMetadata(updatedMetadata) http://git-wip-us.apache.org/repos/asf/spark/blob/3c8861d9/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala index 2386f33..c5e9eae 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala @@ -25,7 +25,7 @@ import scala.collection.JavaConverters._ import org.apache.spark.internal.Logging import org.apache.spark.sql.{DataFrame, SparkSession} -import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.plans.logical.{EventTimeWatermark, LogicalPlan} import org.apache.spark.sql.execution.QueryExecution import org.apache.spark.sql.streaming._ import org.apache.spark.sql.streaming.StreamingQueryListener.QueryProgressEvent @@ -182,7 +182,10 @@ trait ProgressReporter extends Logging { /** Extracts statistics from the most recent query execution. */ private def extractExecutionStats(hasNewData: Boolean): ExecutionStats = { -val watermarkTimestamp = Map("watermark" -> formatTimestamp(offsetSeqMetadata.batchWatermarkMs)) +val hasEventTime = logicalPlan.collect { case e: EventTimeWatermark => e }.nonEmpty +val watermarkTimestamp = + if (hasEventTime) Map("watermark" -> formatTimestamp(offsetSeqMetadata.batchWatermarkMs)) + else Map.empty[String, String] if (!hasNewData) { return ExecutionStats(Map.empty, Seq.empty, watermarkTimestamp) http://git-wip-us.apache.org/repos/asf/spark/blob/3c8861d9/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.sc
spark git commit: [SPARK-18894][SS] Fix event time watermark delay threshold specified in months or years
Repository: spark Updated Branches: refs/heads/master 1a6438897 -> 607a1e63d [SPARK-18894][SS] Fix event time watermark delay threshold specified in months or years ## What changes were proposed in this pull request? Two changes - Fix how delays specified in months and years are translated to milliseconds - Following up on #16258, not show watermark when there is no watermarking in the query ## How was this patch tested? Updated and new unit tests Author: Tathagata Das Closes #16304 from tdas/SPARK-18834-1. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/607a1e63 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/607a1e63 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/607a1e63 Branch: refs/heads/master Commit: 607a1e63dbc9269b806a9f537e1d041029333cdd Parents: 1a64388 Author: Tathagata Das Authored: Wed Dec 21 10:44:20 2016 -0800 Committer: Shixiong Zhu Committed: Wed Dec 21 10:44:20 2016 -0800 -- .../streaming/EventTimeWatermarkExec.scala | 7 +- .../execution/streaming/ProgressReporter.scala | 7 +- .../execution/streaming/StreamExecution.scala | 2 +- .../sql/streaming/EventTimeWatermarkSuite.scala | 287 +++ .../spark/sql/streaming/WatermarkSuite.scala| 240 5 files changed, 299 insertions(+), 244 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/607a1e63/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala index e8570d0..5a9a99e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala @@ -84,6 +84,11 @@ case class EventTimeWatermarkExec( child: SparkPlan) extends SparkPlan { val eventTimeStats = new EventTimeStatsAccum() + val delayMs = { +val millisPerMonth = CalendarInterval.MICROS_PER_DAY / 1000 * 31 +delay.milliseconds + delay.months * millisPerMonth + } + sparkContext.register(eventTimeStats) override protected def doExecute(): RDD[InternalRow] = { @@ -101,7 +106,7 @@ case class EventTimeWatermarkExec( if (a semanticEquals eventTime) { val updatedMetadata = new MetadataBuilder() .withMetadata(a.metadata) - .putLong(EventTimeWatermark.delayKey, delay.milliseconds) + .putLong(EventTimeWatermark.delayKey, delayMs) .build() a.withMetadata(updatedMetadata) http://git-wip-us.apache.org/repos/asf/spark/blob/607a1e63/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala index 2386f33..c5e9eae 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala @@ -25,7 +25,7 @@ import scala.collection.JavaConverters._ import org.apache.spark.internal.Logging import org.apache.spark.sql.{DataFrame, SparkSession} -import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.plans.logical.{EventTimeWatermark, LogicalPlan} import org.apache.spark.sql.execution.QueryExecution import org.apache.spark.sql.streaming._ import org.apache.spark.sql.streaming.StreamingQueryListener.QueryProgressEvent @@ -182,7 +182,10 @@ trait ProgressReporter extends Logging { /** Extracts statistics from the most recent query execution. */ private def extractExecutionStats(hasNewData: Boolean): ExecutionStats = { -val watermarkTimestamp = Map("watermark" -> formatTimestamp(offsetSeqMetadata.batchWatermarkMs)) +val hasEventTime = logicalPlan.collect { case e: EventTimeWatermark => e }.nonEmpty +val watermarkTimestamp = + if (hasEventTime) Map("watermark" -> formatTimestamp(offsetSeqMetadata.batchWatermarkMs)) + else Map.empty[String, String] if (!hasNewData) { return ExecutionStats(Map.empty, Seq.empty, watermarkTimestamp) http://git-wip-us.apache.org/repos/asf/spark/blob/607a1e63/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala -- diff --git a/sql/core/src/main/
spark git commit: [SPARK-18951] Upgrade com.thoughtworks.paranamer/paranamer to 2.6
Repository: spark Updated Branches: refs/heads/master b7650f11c -> 1a6438897 [SPARK-18951] Upgrade com.thoughtworks.paranamer/paranamer to 2.6 ## What changes were proposed in this pull request? I recently hit a bug of com.thoughtworks.paranamer/paranamer, which causes jackson fail to handle byte array defined in a case class. Then I find https://github.com/FasterXML/jackson-module-scala/issues/48, which suggests that it is caused by a bug in paranamer. Let's upgrade paranamer. Since we are using jackson 2.6.5 and jackson-module-paranamer 2.6.5 use com.thoughtworks.paranamer/paranamer 2.6, I suggests that we upgrade paranamer to 2.6. Author: Yin Huai Closes #16359 from yhuai/SPARK-18951. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1a643889 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1a643889 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1a643889 Branch: refs/heads/master Commit: 1a64388973711b4e567f25fa33d752066a018b49 Parents: b7650f1 Author: Yin Huai Authored: Wed Dec 21 09:26:13 2016 -0800 Committer: Yin Huai Committed: Wed Dec 21 09:26:13 2016 -0800 -- dev/deps/spark-deps-hadoop-2.2 | 2 +- dev/deps/spark-deps-hadoop-2.3 | 2 +- dev/deps/spark-deps-hadoop-2.4 | 2 +- dev/deps/spark-deps-hadoop-2.6 | 2 +- dev/deps/spark-deps-hadoop-2.7 | 2 +- pom.xml| 7 ++- 6 files changed, 11 insertions(+), 6 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/1a643889/dev/deps/spark-deps-hadoop-2.2 -- diff --git a/dev/deps/spark-deps-hadoop-2.2 b/dev/deps/spark-deps-hadoop-2.2 index afbdae0..9cbab3d8 100644 --- a/dev/deps/spark-deps-hadoop-2.2 +++ b/dev/deps/spark-deps-hadoop-2.2 @@ -128,7 +128,7 @@ objenesis-2.1.jar opencsv-2.3.jar oro-2.0.8.jar osgi-resource-locator-1.0.1.jar -paranamer-2.3.jar +paranamer-2.6.jar parquet-column-1.8.1.jar parquet-common-1.8.1.jar parquet-encoding-1.8.1.jar http://git-wip-us.apache.org/repos/asf/spark/blob/1a643889/dev/deps/spark-deps-hadoop-2.3 -- diff --git a/dev/deps/spark-deps-hadoop-2.3 b/dev/deps/spark-deps-hadoop-2.3 index adf3863..63ce6c6 100644 --- a/dev/deps/spark-deps-hadoop-2.3 +++ b/dev/deps/spark-deps-hadoop-2.3 @@ -135,7 +135,7 @@ objenesis-2.1.jar opencsv-2.3.jar oro-2.0.8.jar osgi-resource-locator-1.0.1.jar -paranamer-2.3.jar +paranamer-2.6.jar parquet-column-1.8.1.jar parquet-common-1.8.1.jar parquet-encoding-1.8.1.jar http://git-wip-us.apache.org/repos/asf/spark/blob/1a643889/dev/deps/spark-deps-hadoop-2.4 -- diff --git a/dev/deps/spark-deps-hadoop-2.4 b/dev/deps/spark-deps-hadoop-2.4 index 88e6b3f..122d5c2 100644 --- a/dev/deps/spark-deps-hadoop-2.4 +++ b/dev/deps/spark-deps-hadoop-2.4 @@ -135,7 +135,7 @@ objenesis-2.1.jar opencsv-2.3.jar oro-2.0.8.jar osgi-resource-locator-1.0.1.jar -paranamer-2.3.jar +paranamer-2.6.jar parquet-column-1.8.1.jar parquet-common-1.8.1.jar parquet-encoding-1.8.1.jar http://git-wip-us.apache.org/repos/asf/spark/blob/1a643889/dev/deps/spark-deps-hadoop-2.6 -- diff --git a/dev/deps/spark-deps-hadoop-2.6 b/dev/deps/spark-deps-hadoop-2.6 index 15c5d9f..776aabd 100644 --- a/dev/deps/spark-deps-hadoop-2.6 +++ b/dev/deps/spark-deps-hadoop-2.6 @@ -143,7 +143,7 @@ objenesis-2.1.jar opencsv-2.3.jar oro-2.0.8.jar osgi-resource-locator-1.0.1.jar -paranamer-2.3.jar +paranamer-2.6.jar parquet-column-1.8.1.jar parquet-common-1.8.1.jar parquet-encoding-1.8.1.jar http://git-wip-us.apache.org/repos/asf/spark/blob/1a643889/dev/deps/spark-deps-hadoop-2.7 -- diff --git a/dev/deps/spark-deps-hadoop-2.7 b/dev/deps/spark-deps-hadoop-2.7 index 77fb537..524e824 100644 --- a/dev/deps/spark-deps-hadoop-2.7 +++ b/dev/deps/spark-deps-hadoop-2.7 @@ -144,7 +144,7 @@ objenesis-2.1.jar opencsv-2.3.jar oro-2.0.8.jar osgi-resource-locator-1.0.1.jar -paranamer-2.3.jar +paranamer-2.6.jar parquet-column-1.8.1.jar parquet-common-1.8.1.jar parquet-encoding-1.8.1.jar http://git-wip-us.apache.org/repos/asf/spark/blob/1a643889/pom.xml -- diff --git a/pom.xml b/pom.xml index 4f12085..72e5442 100644 --- a/pom.xml +++ b/pom.xml @@ -179,7 +179,7 @@ 4.5.3 1.1 2.52.0 -2.8 +2.6 1.8 1.0.0 @@ -1863,6 +1863,11 @@ + +com.thoughtworks.paranamer +paranamer +${paranamer.version} + -
spark git commit: [SPARK-18947][SQL] SQLContext.tableNames should not call Catalog.listTables
Repository: spark Updated Branches: refs/heads/branch-2.0 2aae220b5 -> ef206ace2 [SPARK-18947][SQL] SQLContext.tableNames should not call Catalog.listTables ## What changes were proposed in this pull request? It's a huge waste to call `Catalog.listTables` in `SQLContext.tableNames`, which only need the table names, while `Catalog.listTables` will get the table metadata for each table name. ## How was this patch tested? N/A Author: Wenchen Fan Closes #16352 from cloud-fan/minor. (cherry picked from commit b7650f11c7afbdffc6f5caaafb5dcfd54f7a25ff) Signed-off-by: Wenchen Fan Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ef206ace Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ef206ace Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ef206ace Branch: refs/heads/branch-2.0 Commit: ef206ace24d8588782fd9bd4fdf120f20fbfe841 Parents: 2aae220 Author: Wenchen Fan Authored: Wed Dec 21 19:39:00 2016 +0800 Committer: Wenchen Fan Committed: Wed Dec 21 19:40:15 2016 +0800 -- .../src/main/scala/org/apache/spark/sql/SQLContext.scala| 4 ++-- .../main/scala/org/apache/spark/sql/api/r/SQLUtils.scala| 9 + 2 files changed, 7 insertions(+), 6 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/ef206ace/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index e7627ac..6013c01 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -730,7 +730,7 @@ class SQLContext private[sql](val sparkSession: SparkSession) * @since 1.3.0 */ def tableNames(): Array[String] = { -sparkSession.catalog.listTables().collect().map(_.name) +tableNames(sparkSession.catalog.currentDatabase) } /** @@ -740,7 +740,7 @@ class SQLContext private[sql](val sparkSession: SparkSession) * @since 1.3.0 */ def tableNames(databaseName: String): Array[String] = { -sparkSession.catalog.listTables(databaseName).collect().map(_.name) +sessionState.catalog.listTables(databaseName).map(_.table).toArray } /** http://git-wip-us.apache.org/repos/asf/spark/blob/ef206ace/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala index 08f8ded..8bc7f67 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala @@ -276,11 +276,12 @@ private[sql] object SQLUtils extends Logging { } def getTableNames(sparkSession: SparkSession, databaseName: String): Array[String] = { -databaseName match { - case n: String if n != null && n.trim.nonEmpty => -sparkSession.catalog.listTables(n).collect().map(_.name) +val db = databaseName match { + case _ if databaseName != null && databaseName.trim.nonEmpty => +databaseName case _ => -sparkSession.catalog.listTables().collect().map(_.name) +sparkSession.catalog.currentDatabase } +sparkSession.sessionState.catalog.listTables(db).map(_.table).toArray } } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-18947][SQL] SQLContext.tableNames should not call Catalog.listTables
Repository: spark Updated Branches: refs/heads/branch-2.1 063a98e52 -> bc54a14b4 [SPARK-18947][SQL] SQLContext.tableNames should not call Catalog.listTables ## What changes were proposed in this pull request? It's a huge waste to call `Catalog.listTables` in `SQLContext.tableNames`, which only need the table names, while `Catalog.listTables` will get the table metadata for each table name. ## How was this patch tested? N/A Author: Wenchen Fan Closes #16352 from cloud-fan/minor. (cherry picked from commit b7650f11c7afbdffc6f5caaafb5dcfd54f7a25ff) Signed-off-by: Wenchen Fan Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/bc54a14b Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/bc54a14b Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/bc54a14b Branch: refs/heads/branch-2.1 Commit: bc54a14b415041531f94ccc2dd35851c269e8263 Parents: 063a98e Author: Wenchen Fan Authored: Wed Dec 21 19:39:00 2016 +0800 Committer: Wenchen Fan Committed: Wed Dec 21 19:39:55 2016 +0800 -- .../src/main/scala/org/apache/spark/sql/SQLContext.scala| 4 ++-- .../main/scala/org/apache/spark/sql/api/r/SQLUtils.scala| 9 + 2 files changed, 7 insertions(+), 6 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/bc54a14b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index 6554359..1a7fd68 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -747,7 +747,7 @@ class SQLContext private[sql](val sparkSession: SparkSession) * @since 1.3.0 */ def tableNames(): Array[String] = { -sparkSession.catalog.listTables().collect().map(_.name) +tableNames(sparkSession.catalog.currentDatabase) } /** @@ -757,7 +757,7 @@ class SQLContext private[sql](val sparkSession: SparkSession) * @since 1.3.0 */ def tableNames(databaseName: String): Array[String] = { -sparkSession.catalog.listTables(databaseName).collect().map(_.name) +sessionState.catalog.listTables(databaseName).map(_.table).toArray } http://git-wip-us.apache.org/repos/asf/spark/blob/bc54a14b/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala index 80bbad4..e56c33e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala @@ -276,11 +276,12 @@ private[sql] object SQLUtils extends Logging { } def getTableNames(sparkSession: SparkSession, databaseName: String): Array[String] = { -databaseName match { - case n: String if n != null && n.trim.nonEmpty => -sparkSession.catalog.listTables(n).collect().map(_.name) +val db = databaseName match { + case _ if databaseName != null && databaseName.trim.nonEmpty => +databaseName case _ => -sparkSession.catalog.listTables().collect().map(_.name) +sparkSession.catalog.currentDatabase } +sparkSession.sessionState.catalog.listTables(db).map(_.table).toArray } } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-18947][SQL] SQLContext.tableNames should not call Catalog.listTables
Repository: spark Updated Branches: refs/heads/master ba4468bb2 -> b7650f11c [SPARK-18947][SQL] SQLContext.tableNames should not call Catalog.listTables ## What changes were proposed in this pull request? It's a huge waste to call `Catalog.listTables` in `SQLContext.tableNames`, which only need the table names, while `Catalog.listTables` will get the table metadata for each table name. ## How was this patch tested? N/A Author: Wenchen Fan Closes #16352 from cloud-fan/minor. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b7650f11 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b7650f11 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b7650f11 Branch: refs/heads/master Commit: b7650f11c7afbdffc6f5caaafb5dcfd54f7a25ff Parents: ba4468b Author: Wenchen Fan Authored: Wed Dec 21 19:39:00 2016 +0800 Committer: Wenchen Fan Committed: Wed Dec 21 19:39:00 2016 +0800 -- .../src/main/scala/org/apache/spark/sql/SQLContext.scala| 4 ++-- .../main/scala/org/apache/spark/sql/api/r/SQLUtils.scala| 9 + 2 files changed, 7 insertions(+), 6 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/b7650f11/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index 6554359..1a7fd68 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -747,7 +747,7 @@ class SQLContext private[sql](val sparkSession: SparkSession) * @since 1.3.0 */ def tableNames(): Array[String] = { -sparkSession.catalog.listTables().collect().map(_.name) +tableNames(sparkSession.catalog.currentDatabase) } /** @@ -757,7 +757,7 @@ class SQLContext private[sql](val sparkSession: SparkSession) * @since 1.3.0 */ def tableNames(databaseName: String): Array[String] = { -sparkSession.catalog.listTables(databaseName).collect().map(_.name) +sessionState.catalog.listTables(databaseName).map(_.table).toArray } http://git-wip-us.apache.org/repos/asf/spark/blob/b7650f11/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala index 80bbad4..e56c33e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala @@ -276,11 +276,12 @@ private[sql] object SQLUtils extends Logging { } def getTableNames(sparkSession: SparkSession, databaseName: String): Array[String] = { -databaseName match { - case n: String if n != null && n.trim.nonEmpty => -sparkSession.catalog.listTables(n).collect().map(_.name) +val db = databaseName match { + case _ if databaseName != null && databaseName.trim.nonEmpty => +databaseName case _ => -sparkSession.catalog.listTables().collect().map(_.name) +sparkSession.catalog.currentDatabase } +sparkSession.sessionState.catalog.listTables(db).map(_.table).toArray } } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-18923][DOC][BUILD] Support skipping R/Python API docs
Repository: spark Updated Branches: refs/heads/master 24c0c9412 -> ba4468bb2 [SPARK-18923][DOC][BUILD] Support skipping R/Python API docs ## What changes were proposed in this pull request? We can build Python API docs by `cd ./python/docs && make html for Python` and R API docs by `cd ./R && sh create-docs.sh for R` separately. However, `jekyll` fails in some environments. This PR aims to support `SKIP_PYTHONDOC` and `SKIP_RDOC` for documentation build in `docs` folder. Currently, we can use `SKIP_SCALADOC` or `SKIP_API`. The reason providing additional options is that the Spark documentation build uses a number of tools to build HTML docs and API docs in Scala, Python and R. Specifically, for Python and R, - Python API docs requires `sphinx`. - R API docs requires `R` installation and `knitr` (and more others libraries). In other words, we cannot generate Python API docs without R installation. Also, we cannot generate R API docs without Python `sphinx` installation. If Spark provides `SKIP_PYTHONDOC` and `SKIP_RDOC` like `SKIP_SCALADOC`, it would be more convenient. ## How was this patch tested? Manual. **Skipping Scala/Java/Python API Doc Build** ```bash $ cd docs $ SKIP_SCALADOC=1 SKIP_PYTHONDOC=1 jekyll build $ ls api DESCRIPTION R ``` **Skipping Scala/Java/R API Doc Build** ```bash $ cd docs $ SKIP_SCALADOC=1 SKIP_RDOC=1 jekyll build $ ls api python ``` Author: Dongjoon Hyun Closes #16336 from dongjoon-hyun/SPARK-18923. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ba4468bb Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ba4468bb Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ba4468bb Branch: refs/heads/master Commit: ba4468bb24f2e13b98e7b66b44203c20393b8ab8 Parents: 24c0c94 Author: Dongjoon Hyun Authored: Wed Dec 21 08:59:38 2016 + Committer: Sean Owen Committed: Wed Dec 21 08:59:38 2016 + -- docs/README.md | 3 ++- docs/_plugins/copy_api_dirs.rb | 49 - 2 files changed, 29 insertions(+), 23 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/ba4468bb/docs/README.md -- diff --git a/docs/README.md b/docs/README.md index ffd3b57..90e10a1 100644 --- a/docs/README.md +++ b/docs/README.md @@ -69,4 +69,5 @@ may take some time as it generates all of the scaladoc. The jekyll plugin also PySpark docs using [Sphinx](http://sphinx-doc.org/). NOTE: To skip the step of building and copying over the Scala, Python, R API docs, run `SKIP_API=1 -jekyll`. +jekyll`. In addition, `SKIP_SCALADOC=1`, `SKIP_PYTHONDOC=1`, and `SKIP_RDOC=1` can be used to skip a single +step of the corresponding language. http://git-wip-us.apache.org/repos/asf/spark/blob/ba4468bb/docs/_plugins/copy_api_dirs.rb -- diff --git a/docs/_plugins/copy_api_dirs.rb b/docs/_plugins/copy_api_dirs.rb index 71e6432..95e3ba3 100644 --- a/docs/_plugins/copy_api_dirs.rb +++ b/docs/_plugins/copy_api_dirs.rb @@ -113,36 +113,41 @@ if not (ENV['SKIP_API'] == '1') File.open(css_file, 'a') { |f| f.write("\n" + css.join()) } end - # Build Sphinx docs for Python + if not (ENV['SKIP_PYTHONDOC'] == '1') +# Build Sphinx docs for Python - puts "Moving to python/docs directory and building sphinx." - cd("../python/docs") - system("make html") || raise("Python doc generation failed") +puts "Moving to python/docs directory and building sphinx." +cd("../python/docs") +system("make html") || raise("Python doc generation failed") - puts "Moving back into home dir." - cd("../../") +puts "Moving back into docs dir." +cd("../../docs") - puts "Making directory api/python" - mkdir_p "docs/api/python" +puts "Making directory api/python" +mkdir_p "api/python" - puts "cp -r python/docs/_build/html/. docs/api/python" - cp_r("python/docs/_build/html/.", "docs/api/python") +puts "cp -r ../python/docs/_build/html/. api/python" +cp_r("../python/docs/_build/html/.", "api/python") + end - # Build SparkR API docs - puts "Moving to R directory and building roxygen docs." - cd("R") - system("./create-docs.sh") || raise("R doc generation failed") + if not (ENV['SKIP_RDOC'] == '1') +# Build SparkR API docs - puts "Moving back into home dir." - cd("../") +puts "Moving to R directory and building roxygen docs." +cd("../R") +system("./create-docs.sh") || raise("R doc generation failed") - puts "Making directory api/R" - mkdir_p "docs/api/R" +puts "Moving back into docs dir." +cd("../docs") - puts "cp -r R/pkg/html/. docs/api/R" - cp_r("R/pkg/html/.", "docs/api/R") +puts "Making directory api/R" +mkdir_p "a