spark git commit: [SPARK-17640][SQL] Avoid using -1 as the default batchId for FileStreamSource.FileEntry
Repository: spark Updated Branches: refs/heads/branch-2.0 54d4eee51 -> d3f90e71a [SPARK-17640][SQL] Avoid using -1 as the default batchId for FileStreamSource.FileEntry ## What changes were proposed in this pull request? Avoid using -1 as the default batchId for FileStreamSource.FileEntry so that we can make sure not writing any FileEntry(..., batchId = -1) into the log. This also avoids people misusing it in future (#15203 is an example). ## How was this patch tested? Jenkins. Author: Shixiong Zhu Closes #15206 from zsxwing/cleanup. (cherry picked from commit 62ccf27ab4b55e734646678ae78b7e812262d14b) Signed-off-by: Shixiong Zhu Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d3f90e71 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d3f90e71 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d3f90e71 Branch: refs/heads/branch-2.0 Commit: d3f90e71af57162afc0648adbc52b810a883ceac Parents: 54d4eee Author: Shixiong Zhu Authored: Thu Sep 22 23:35:08 2016 -0700 Committer: Shixiong Zhu Committed: Thu Sep 22 23:35:15 2016 -0700 -- .../execution/streaming/FileStreamSource.scala | 37 ++-- .../streaming/FileStreamSourceSuite.scala | 24 ++--- 2 files changed, 31 insertions(+), 30 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/d3f90e71/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala index 4515f9a..8c3e718 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala @@ -59,7 +59,7 @@ class FileStreamSource( val seenFiles = new SeenFilesMap(sourceOptions.maxFileAgeMs) metadataLog.allFiles().foreach { entry => -seenFiles.add(entry) +seenFiles.add(entry.path, entry.timestamp) } seenFiles.purge() @@ -73,14 +73,16 @@ class FileStreamSource( */ private def fetchMaxOffset(): LongOffset = synchronized { // All the new files found - ignore aged files and files that we have seen. -val newFiles = fetchAllFiles().filter(seenFiles.isNewFile) +val newFiles = fetchAllFiles().filter { + case (path, timestamp) => seenFiles.isNewFile(path, timestamp) +} // Obey user's setting to limit the number of files in this batch trigger. val batchFiles = if (maxFilesPerBatch.nonEmpty) newFiles.take(maxFilesPerBatch.get) else newFiles batchFiles.foreach { file => - seenFiles.add(file) + seenFiles.add(file._1, file._2) logDebug(s"New file: $file") } val numPurged = seenFiles.purge() @@ -95,7 +97,9 @@ class FileStreamSource( if (batchFiles.nonEmpty) { maxBatchId += 1 - metadataLog.add(maxBatchId, batchFiles.map(_.copy(batchId = maxBatchId)).toArray) + metadataLog.add(maxBatchId, batchFiles.map { case (path, timestamp) => +FileEntry(path = path, timestamp = timestamp, batchId = maxBatchId) + }.toArray) logInfo(s"Max batch id increased to $maxBatchId with ${batchFiles.size} new files") } @@ -140,12 +144,12 @@ class FileStreamSource( /** * Returns a list of files found, sorted by their timestamp. */ - private def fetchAllFiles(): Seq[FileEntry] = { + private def fetchAllFiles(): Seq[(String, Long)] = { val startTime = System.nanoTime val globbedPaths = SparkHadoopUtil.get.globPathIfNecessary(qualifiedBasePath) val catalog = new ListingFileCatalog(sparkSession, globbedPaths, options, Some(new StructType)) val files = catalog.allFiles().sortBy(_.getModificationTime).map { status => - FileEntry(status.getPath.toUri.toString, status.getModificationTime) + (status.getPath.toUri.toString, status.getModificationTime) } val endTime = System.nanoTime val listingTimeMs = (endTime.toDouble - startTime) / 100 @@ -172,10 +176,7 @@ object FileStreamSource { /** Timestamp for file modification time, in ms since January 1, 1970 UTC. */ type Timestamp = Long - val NOT_SET = -1L - - case class FileEntry(path: String, timestamp: Timestamp, batchId: Long = NOT_SET) -extends Serializable + case class FileEntry(path: String, timestamp: Timestamp, batchId: Long) extends Serializable /** * A custom hash map used to track the list of files seen. This map is not thread-safe. @@ -196,10 +197,10 @@ object FileStreamSource { private var lastPurgeTimestamp: Timestamp = 0L /** Add a new file to the map.
spark git commit: [SPARK-17640][SQL] Avoid using -1 as the default batchId for FileStreamSource.FileEntry
Repository: spark Updated Branches: refs/heads/master 947b8c6e3 -> 62ccf27ab [SPARK-17640][SQL] Avoid using -1 as the default batchId for FileStreamSource.FileEntry ## What changes were proposed in this pull request? Avoid using -1 as the default batchId for FileStreamSource.FileEntry so that we can make sure not writing any FileEntry(..., batchId = -1) into the log. This also avoids people misusing it in future (#15203 is an example). ## How was this patch tested? Jenkins. Author: Shixiong Zhu Closes #15206 from zsxwing/cleanup. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/62ccf27a Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/62ccf27a Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/62ccf27a Branch: refs/heads/master Commit: 62ccf27ab4b55e734646678ae78b7e812262d14b Parents: 947b8c6 Author: Shixiong Zhu Authored: Thu Sep 22 23:35:08 2016 -0700 Committer: Shixiong Zhu Committed: Thu Sep 22 23:35:08 2016 -0700 -- .../execution/streaming/FileStreamSource.scala | 37 ++-- .../streaming/FileStreamSourceSuite.scala | 24 ++--- 2 files changed, 31 insertions(+), 30 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/62ccf27a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala index 5ebc083..be02327 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala @@ -59,7 +59,7 @@ class FileStreamSource( val seenFiles = new SeenFilesMap(sourceOptions.maxFileAgeMs) metadataLog.allFiles().foreach { entry => -seenFiles.add(entry) +seenFiles.add(entry.path, entry.timestamp) } seenFiles.purge() @@ -73,14 +73,16 @@ class FileStreamSource( */ private def fetchMaxOffset(): LongOffset = synchronized { // All the new files found - ignore aged files and files that we have seen. -val newFiles = fetchAllFiles().filter(seenFiles.isNewFile) +val newFiles = fetchAllFiles().filter { + case (path, timestamp) => seenFiles.isNewFile(path, timestamp) +} // Obey user's setting to limit the number of files in this batch trigger. val batchFiles = if (maxFilesPerBatch.nonEmpty) newFiles.take(maxFilesPerBatch.get) else newFiles batchFiles.foreach { file => - seenFiles.add(file) + seenFiles.add(file._1, file._2) logDebug(s"New file: $file") } val numPurged = seenFiles.purge() @@ -95,7 +97,9 @@ class FileStreamSource( if (batchFiles.nonEmpty) { maxBatchId += 1 - metadataLog.add(maxBatchId, batchFiles.map(_.copy(batchId = maxBatchId)).toArray) + metadataLog.add(maxBatchId, batchFiles.map { case (path, timestamp) => +FileEntry(path = path, timestamp = timestamp, batchId = maxBatchId) + }.toArray) logInfo(s"Max batch id increased to $maxBatchId with ${batchFiles.size} new files") } @@ -140,12 +144,12 @@ class FileStreamSource( /** * Returns a list of files found, sorted by their timestamp. */ - private def fetchAllFiles(): Seq[FileEntry] = { + private def fetchAllFiles(): Seq[(String, Long)] = { val startTime = System.nanoTime val globbedPaths = SparkHadoopUtil.get.globPathIfNecessary(qualifiedBasePath) val catalog = new ListingFileCatalog(sparkSession, globbedPaths, options, Some(new StructType)) val files = catalog.allFiles().sortBy(_.getModificationTime).map { status => - FileEntry(status.getPath.toUri.toString, status.getModificationTime) + (status.getPath.toUri.toString, status.getModificationTime) } val endTime = System.nanoTime val listingTimeMs = (endTime.toDouble - startTime) / 100 @@ -172,10 +176,7 @@ object FileStreamSource { /** Timestamp for file modification time, in ms since January 1, 1970 UTC. */ type Timestamp = Long - val NOT_SET = -1L - - case class FileEntry(path: String, timestamp: Timestamp, batchId: Long = NOT_SET) -extends Serializable + case class FileEntry(path: String, timestamp: Timestamp, batchId: Long) extends Serializable /** * A custom hash map used to track the list of files seen. This map is not thread-safe. @@ -196,10 +197,10 @@ object FileStreamSource { private var lastPurgeTimestamp: Timestamp = 0L /** Add a new file to the map. */ -def add(file: FileEntry): Unit = { - map.put(file.path, file.timestamp) - if (file.times
spark git commit: [SPARK-16240][ML] ML persistence backward compatibility for LDA - 2.0 backport
Repository: spark Updated Branches: refs/heads/branch-2.0 22216d6bd -> 54d4eee51 [SPARK-16240][ML] ML persistence backward compatibility for LDA - 2.0 backport ## What changes were proposed in this pull request? Allow Spark 2.x to load instances of LDA, LocalLDAModel, and DistributedLDAModel saved from Spark 1.6. Backport of https://github.com/apache/spark/pull/15034 for branch-2.0 ## How was this patch tested? I tested this manually, saving the 3 types from 1.6 and loading them into master (2.x). In the future, we can add generic tests for testing backwards compatibility across all ML models in SPARK-15573. Author: Gayathri Murali Author: Joseph K. Bradley Closes #15205 from jkbradley/lda-backward-2.0. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/54d4eee5 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/54d4eee5 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/54d4eee5 Branch: refs/heads/branch-2.0 Commit: 54d4eee51eca364d9334141f62e0478343345d06 Parents: 22216d6 Author: Gayathri Murali Authored: Thu Sep 22 22:44:20 2016 -0700 Committer: Joseph K. Bradley Committed: Thu Sep 22 22:44:20 2016 -0700 -- .../org/apache/spark/ml/clustering/LDA.scala| 86 project/MimaExcludes.scala | 3 + 2 files changed, 72 insertions(+), 17 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/54d4eee5/mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala b/mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala index 034f2c3..8e23325 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala @@ -18,6 +18,9 @@ package org.apache.spark.ml.clustering import org.apache.hadoop.fs.Path +import org.json4s.DefaultFormats +import org.json4s.JsonAST.JObject +import org.json4s.jackson.JsonMethods._ import org.apache.spark.annotation.{DeveloperApi, Experimental, Since} import org.apache.spark.internal.Logging @@ -26,19 +29,21 @@ import org.apache.spark.ml.linalg.{Matrix, Vector, Vectors, VectorUDT} import org.apache.spark.ml.param._ import org.apache.spark.ml.param.shared.{HasCheckpointInterval, HasFeaturesCol, HasMaxIter, HasSeed} import org.apache.spark.ml.util._ +import org.apache.spark.ml.util.DefaultParamsReader.Metadata import org.apache.spark.mllib.clustering.{DistributedLDAModel => OldDistributedLDAModel, EMLDAOptimizer => OldEMLDAOptimizer, LDA => OldLDA, LDAModel => OldLDAModel, LDAOptimizer => OldLDAOptimizer, LocalLDAModel => OldLocalLDAModel, OnlineLDAOptimizer => OldOnlineLDAOptimizer} import org.apache.spark.mllib.impl.PeriodicCheckpointer -import org.apache.spark.mllib.linalg.{Matrices => OldMatrices, Vector => OldVector, - Vectors => OldVectors} +import org.apache.spark.mllib.linalg.{Vector => OldVector, Vectors => OldVectors} import org.apache.spark.mllib.linalg.MatrixImplicits._ import org.apache.spark.mllib.linalg.VectorImplicits._ +import org.apache.spark.mllib.util.MLUtils import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession} import org.apache.spark.sql.functions.{col, monotonically_increasing_id, udf} import org.apache.spark.sql.types.StructType +import org.apache.spark.util.VersionUtils private[clustering] trait LDAParams extends Params with HasFeaturesCol with HasMaxIter @@ -80,6 +85,7 @@ private[clustering] trait LDAParams extends Params with HasFeaturesCol with HasM * - Values should be >= 0 * - default = uniformly (1.0 / k), following the implementation from * [[https://github.com/Blei-Lab/onlineldavb]]. + * * @group param */ @Since("1.6.0") @@ -121,6 +127,7 @@ private[clustering] trait LDAParams extends Params with HasFeaturesCol with HasM * - Value should be >= 0 * - default = (1.0 / k), following the implementation from * [[https://github.com/Blei-Lab/onlineldavb]]. + * * @group param */ @Since("1.6.0") @@ -354,6 +361,39 @@ private[clustering] trait LDAParams extends Params with HasFeaturesCol with HasM } } +private object LDAParams { + + /** + * Equivalent to [[DefaultParamsReader.getAndSetParams()]], but handles [[LDA]] and [[LDAModel]] + * formats saved with Spark 1.6, which differ from the formats in Spark 2.0+. + * + * @param model[[LDA]] or [[LDAModel]] instance. This instance will be modified with + * [[Param]] values extracted from metadata. + * @param metadata Loaded model metadata + */ + def getAndSetParams(model: LDAParams, metadata: Metadata): U
spark git commit: [SPARK-16719][ML] Random Forests should communicate fewer trees on each iteration
Repository: spark Updated Branches: refs/heads/master a4aeb7677 -> 947b8c6e3 [SPARK-16719][ML] Random Forests should communicate fewer trees on each iteration ## What changes were proposed in this pull request? RandomForest currently sends the entire forest to each worker on each iteration. This is because (a) the node queue is FIFO and (b) the closure references the entire array of trees (topNodes). (a) causes RFs to handle splits in many trees, especially early on in learning. (b) sends all trees explicitly. This PR: (a) Change the RF node queue to be FILO (a stack), so that RFs tend to focus on 1 or a few trees before focusing on others. (b) Change topNodes to pass only the trees required on that iteration. ## How was this patch tested? Unit tests: * Existing tests for correctness of tree learning * Manually modifying code and running tests to verify that a small number of trees are communicated on each iteration * This last item is hard to test via unit tests given the current APIs. Author: Joseph K. Bradley Closes #14359 from jkbradley/rfs-fewer-trees. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/947b8c6e Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/947b8c6e Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/947b8c6e Branch: refs/heads/master Commit: 947b8c6e3acd671d501f0ed6c077aac8e51ccede Parents: a4aeb76 Author: Joseph K. Bradley Authored: Thu Sep 22 22:27:28 2016 -0700 Committer: Joseph K. Bradley Committed: Thu Sep 22 22:27:28 2016 -0700 -- .../spark/ml/tree/impl/RandomForest.scala | 54 .../spark/ml/tree/impl/RandomForestSuite.scala | 26 +- 2 files changed, 46 insertions(+), 34 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/947b8c6e/mllib/src/main/scala/org/apache/spark/ml/tree/impl/RandomForest.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/ml/tree/impl/RandomForest.scala b/mllib/src/main/scala/org/apache/spark/ml/tree/impl/RandomForest.scala index 71c8c42..0b7ad92 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/tree/impl/RandomForest.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/tree/impl/RandomForest.scala @@ -51,7 +51,7 @@ import org.apache.spark.util.random.{SamplingUtils, XORShiftRandom} * findSplits() method during initialization, after which each continuous feature becomes * an ordered discretized feature with at most maxBins possible values. * - * The main loop in the algorithm operates on a queue of nodes (nodeQueue). These nodes + * The main loop in the algorithm operates on a queue of nodes (nodeStack). These nodes * lie at the periphery of the tree being trained. If multiple trees are being trained at once, * then this queue contains nodes from all of them. Each iteration works roughly as follows: * On the master node: @@ -161,31 +161,42 @@ private[spark] object RandomForest extends Logging { None } -// FIFO queue of nodes to train: (treeIndex, node) -val nodeQueue = new mutable.Queue[(Int, LearningNode)]() +/* + Stack of nodes to train: (treeIndex, node) + The reason this is a stack is that we train many trees at once, but we want to focus on + completing trees, rather than training all simultaneously. If we are splitting nodes from + 1 tree, then the new nodes to split will be put at the top of this stack, so we will continue + training the same tree in the next iteration. This focus allows us to send fewer trees to + workers on each iteration; see topNodesForGroup below. + */ +val nodeStack = new mutable.Stack[(Int, LearningNode)] val rng = new Random() rng.setSeed(seed) // Allocate and queue root nodes. val topNodes = Array.fill[LearningNode](numTrees)(LearningNode.emptyNode(nodeIndex = 1)) -Range(0, numTrees).foreach(treeIndex => nodeQueue.enqueue((treeIndex, topNodes(treeIndex +Range(0, numTrees).foreach(treeIndex => nodeStack.push((treeIndex, topNodes(treeIndex timer.stop("init") -while (nodeQueue.nonEmpty) { +while (nodeStack.nonEmpty) { // Collect some nodes to split, and choose features for each node (if subsampling). // Each group of nodes may come from one or multiple trees, and at multiple levels. val (nodesForGroup, treeToNodeToIndexInfo) = -RandomForest.selectNodesToSplit(nodeQueue, maxMemoryUsage, metadata, rng) +RandomForest.selectNodesToSplit(nodeStack, maxMemoryUsage, metadata, rng) // Sanity check (should never occur): assert(nodesForGroup.nonEmpty, s"RandomForest selected empty nodesForGroup. Error for unknown reason.") + // Only send tree
spark git commit: [SPARK-17639][BUILD] Add jce.jar to buildclasspath when building.
Repository: spark Updated Branches: refs/heads/master 79159a1e8 -> a4aeb7677 [SPARK-17639][BUILD] Add jce.jar to buildclasspath when building. This was missing, preventing code that uses javax.crypto to properly compile in Spark. Author: Marcelo Vanzin Closes #15204 from vanzin/SPARK-17639. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a4aeb767 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a4aeb767 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a4aeb767 Branch: refs/heads/master Commit: a4aeb7677bc07d0b83f82de62dcffd7867d19d9b Parents: 79159a1 Author: Marcelo Vanzin Authored: Thu Sep 22 21:35:25 2016 -0700 Committer: Marcelo Vanzin Committed: Thu Sep 22 21:35:25 2016 -0700 -- core/pom.xml | 4 +--- pom.xml | 7 --- project/SparkBuild.scala | 4 ++-- 3 files changed, 7 insertions(+), 8 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/a4aeb767/core/pom.xml -- diff --git a/core/pom.xml b/core/pom.xml index 3c8138f..9a4f234 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -417,7 +417,6 @@ -\ .bat @@ -429,7 +428,6 @@ -/ .sh @@ -450,7 +448,7 @@ - ..${path.separator}R${path.separator}install-dev${script.extension} + ..${file.separator}R${file.separator}install-dev${script.extension} http://git-wip-us.apache.org/repos/asf/spark/blob/a4aeb767/pom.xml -- diff --git a/pom.xml b/pom.xml index 8afc39b..8408f4b 100644 --- a/pom.xml +++ b/pom.xml @@ -2617,8 +2617,9 @@ -bootclasspath - ${env.JAVA_7_HOME}/jre/lib/rt.jar + ${env.JAVA_7_HOME}/jre/lib/rt.jar${path.separator}${env.JAVA_7_HOME}/jre/lib/jce.jar +true @@ -2633,7 +2634,7 @@ -javabootclasspath - ${env.JAVA_7_HOME}/jre/lib/rt.jar + ${env.JAVA_7_HOME}/jre/lib/rt.jar${path.separator}${env.JAVA_7_HOME}/jre/lib/jce.jar @@ -2642,7 +2643,7 @@ -javabootclasspath - ${env.JAVA_7_HOME}/jre/lib/rt.jar + ${env.JAVA_7_HOME}/jre/lib/rt.jar${path.separator}${env.JAVA_7_HOME}/jre/lib/jce.jar http://git-wip-us.apache.org/repos/asf/spark/blob/a4aeb767/project/SparkBuild.scala -- diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index a39c93e..8e47e7f 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -280,7 +280,7 @@ object SparkBuild extends PomBuild { "-target", javacJVMVersion.value ) ++ sys.env.get("JAVA_7_HOME").toSeq.flatMap { jdk7 => if (javacJVMVersion.value == "1.7") { -Seq("-bootclasspath", s"$jdk7/jre/lib/rt.jar") +Seq("-bootclasspath", s"$jdk7/jre/lib/rt.jar${File.pathSeparator}$jdk7/jre/lib/jce.jar") } else { Nil } @@ -291,7 +291,7 @@ object SparkBuild extends PomBuild { "-sourcepath", (baseDirectory in ThisBuild).value.getAbsolutePath // Required for relative source links in scaladoc ) ++ sys.env.get("JAVA_7_HOME").toSeq.flatMap { jdk7 => if (javacJVMVersion.value == "1.7") { -Seq("-javabootclasspath", s"$jdk7/jre/lib/rt.jar") +Seq("-javabootclasspath", s"$jdk7/jre/lib/rt.jar${File.pathSeparator}$jdk7/jre/lib/jce.jar") } else { Nil } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-17502][17609][SQL][BACKPORT][2.0] Fix Multiple Bugs in DDL Statements on Temporary Views
Repository: spark Updated Branches: refs/heads/branch-2.0 c393d86d1 -> 22216d6bd [SPARK-17502][17609][SQL][BACKPORT][2.0] Fix Multiple Bugs in DDL Statements on Temporary Views ### What changes were proposed in this pull request? This PR is to backport https://github.com/apache/spark/pull/15054 and https://github.com/apache/spark/pull/15160 to Spark 2.0. - When the permanent tables/views do not exist but the temporary view exists, the expected error should be `NoSuchTableException` for partition-related ALTER TABLE commands. However, it always reports a confusing error message. For example, ``` Partition spec is invalid. The spec (a, b) must match the partition spec () defined in table '`testview`'; ``` - When the permanent tables/views do not exist but the temporary view exists, the expected error should be `NoSuchTableException` for `ALTER TABLE ... UNSET TBLPROPERTIES`. However, it reports a missing table property. For example, ``` Attempted to unset non-existent property 'p' in table '`testView`'; ``` - When `ANALYZE TABLE` is called on a view or a temporary view, we should issue an error message. However, it reports a strange error: ``` ANALYZE TABLE is not supported for Project ``` - When inserting into a temporary view that is generated from `Range`, we will get the following error message: ``` assertion failed: No plan for 'InsertIntoTable Range (0, 10, step=1, splits=Some(1)), false, false +- Project [1 AS 1#20] +- OneRowRelation$ ``` This PR is to fix the above four issues. There is no place in Spark SQL that need `SessionCatalog.tableExists` to check temp views, so this PR makes `SessionCatalog.tableExists` only check permanent table/view and removes some hacks. ### How was this patch tested? Added multiple test cases Author: gatorsmile Closes #15174 from gatorsmile/PR15054Backport. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/22216d6b Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/22216d6b Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/22216d6b Branch: refs/heads/branch-2.0 Commit: 22216d6bd4270095f175d9f4333afe07e07a7303 Parents: c393d86 Author: gatorsmile Authored: Fri Sep 23 09:56:40 2016 +0800 Committer: Wenchen Fan Committed: Fri Sep 23 09:56:40 2016 +0800 -- .../sql/catalyst/analysis/CheckAnalysis.scala | 1 + .../sql/catalyst/catalog/SessionCatalog.scala | 101 + .../catalyst/catalog/SessionCatalogSuite.scala | 37 +++ .../org/apache/spark/sql/DataFrameWriter.scala | 9 +- .../execution/command/AnalyzeTableCommand.scala | 5 +- .../command/createDataSourceTables.scala| 13 +-- .../spark/sql/execution/command/ddl.scala | 73 +--- .../spark/sql/execution/command/tables.scala| 110 +++ .../apache/spark/sql/internal/CatalogImpl.scala | 2 +- .../spark/sql/execution/command/DDLSuite.scala | 4 +- .../sql/hive/MetastoreDataSourcesSuite.scala| 2 +- .../sql/hive/execution/HiveCommandSuite.scala | 17 ++- .../spark/sql/hive/execution/HiveDDLSuite.scala | 6 +- .../spark/sql/hive/execution/SQLViewSuite.scala | 59 -- 14 files changed, 219 insertions(+), 220 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/22216d6b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala index 8b87a4e..790566c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala @@ -342,6 +342,7 @@ trait CheckAnalysis extends PredicateHelper { case InsertIntoTable(t, _, _, _, _) if !t.isInstanceOf[LeafNode] || + t.isInstanceOf[Range] || t == OneRowRelation || t.isInstanceOf[LocalRelation] => failAnalysis(s"Inserting into an RDD-based table is not allowed.") http://git-wip-us.apache.org/repos/asf/spark/blob/22216d6b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index ecb4dab..f455cc9 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/c
[1/2] spark git commit: Preparing Spark release v2.0.1-rc2
Repository: spark Updated Branches: refs/heads/branch-2.0 c2cb84165 -> c393d86d1 Preparing Spark release v2.0.1-rc2 Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/04141ad4 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/04141ad4 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/04141ad4 Branch: refs/heads/branch-2.0 Commit: 04141ad49806a48afccc236b699827997142bd57 Parents: c2cb841 Author: Patrick Wendell Authored: Thu Sep 22 17:43:50 2016 -0700 Committer: Patrick Wendell Committed: Thu Sep 22 17:43:50 2016 -0700 -- assembly/pom.xml | 2 +- common/network-common/pom.xml | 2 +- common/network-shuffle/pom.xml| 2 +- common/network-yarn/pom.xml | 2 +- common/sketch/pom.xml | 2 +- common/tags/pom.xml | 2 +- common/unsafe/pom.xml | 2 +- core/pom.xml | 2 +- examples/pom.xml | 2 +- external/docker-integration-tests/pom.xml | 2 +- external/flume-assembly/pom.xml | 2 +- external/flume-sink/pom.xml | 2 +- external/flume/pom.xml| 2 +- external/java8-tests/pom.xml | 2 +- external/kafka-0-10-assembly/pom.xml | 2 +- external/kafka-0-10/pom.xml | 2 +- external/kafka-0-8-assembly/pom.xml | 2 +- external/kafka-0-8/pom.xml| 2 +- external/kinesis-asl-assembly/pom.xml | 2 +- external/kinesis-asl/pom.xml | 2 +- external/spark-ganglia-lgpl/pom.xml | 2 +- graphx/pom.xml| 2 +- launcher/pom.xml | 2 +- mllib-local/pom.xml | 2 +- mllib/pom.xml | 2 +- pom.xml | 2 +- repl/pom.xml | 2 +- sql/catalyst/pom.xml | 2 +- sql/core/pom.xml | 2 +- sql/hive-thriftserver/pom.xml | 2 +- sql/hive/pom.xml | 2 +- streaming/pom.xml | 2 +- tools/pom.xml | 2 +- yarn/pom.xml | 2 +- 34 files changed, 34 insertions(+), 34 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/04141ad4/assembly/pom.xml -- diff --git a/assembly/pom.xml b/assembly/pom.xml index ca6daa2..6db3a59 100644 --- a/assembly/pom.xml +++ b/assembly/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.11 -2.0.2-SNAPSHOT +2.0.1 ../pom.xml http://git-wip-us.apache.org/repos/asf/spark/blob/04141ad4/common/network-common/pom.xml -- diff --git a/common/network-common/pom.xml b/common/network-common/pom.xml index c727f54..269b845 100644 --- a/common/network-common/pom.xml +++ b/common/network-common/pom.xml @@ -22,7 +22,7 @@ org.apache.spark spark-parent_2.11 -2.0.2-SNAPSHOT +2.0.1 ../../pom.xml http://git-wip-us.apache.org/repos/asf/spark/blob/04141ad4/common/network-shuffle/pom.xml -- diff --git a/common/network-shuffle/pom.xml b/common/network-shuffle/pom.xml index e335a89..20cf29e 100644 --- a/common/network-shuffle/pom.xml +++ b/common/network-shuffle/pom.xml @@ -22,7 +22,7 @@ org.apache.spark spark-parent_2.11 -2.0.2-SNAPSHOT +2.0.1 ../../pom.xml http://git-wip-us.apache.org/repos/asf/spark/blob/04141ad4/common/network-yarn/pom.xml -- diff --git a/common/network-yarn/pom.xml b/common/network-yarn/pom.xml index 8e64f56..25cc328 100644 --- a/common/network-yarn/pom.xml +++ b/common/network-yarn/pom.xml @@ -22,7 +22,7 @@ org.apache.spark spark-parent_2.11 -2.0.2-SNAPSHOT +2.0.1 ../../pom.xml http://git-wip-us.apache.org/repos/asf/spark/blob/04141ad4/common/sketch/pom.xml -- diff --git a/common/sketch/pom.xml b/common/sketch/pom.xml index 94c75d6..37a5d09 100644 --- a/common/sketch/pom.xml +++ b/common/sketch/pom.xml @@ -22,7 +22,7 @@ org.apache.spark spark-parent_2.11 -2.0.2-SNAPSHOT +2.0.1 ../../pom.xml http://git-wip-us.apache.org/repos/asf/spark/blob/04141ad4/common/tags/pom.xml -- diff --git a/common/tags/pom.xml b/common/tags/pom.xml index 6ff14d2..ab287f3 100644 --- a/common/tags/pom.xml +++ b/common/tags/pom.xml @@ -22,7
[2/2] spark git commit: Preparing development version 2.0.2-SNAPSHOT
Preparing development version 2.0.2-SNAPSHOT Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c393d86d Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c393d86d Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c393d86d Branch: refs/heads/branch-2.0 Commit: c393d86d188bd94b8713c4e0f0885b3adf49176e Parents: 04141ad Author: Patrick Wendell Authored: Thu Sep 22 17:43:58 2016 -0700 Committer: Patrick Wendell Committed: Thu Sep 22 17:43:58 2016 -0700 -- assembly/pom.xml | 2 +- common/network-common/pom.xml | 2 +- common/network-shuffle/pom.xml| 2 +- common/network-yarn/pom.xml | 2 +- common/sketch/pom.xml | 2 +- common/tags/pom.xml | 2 +- common/unsafe/pom.xml | 2 +- core/pom.xml | 2 +- examples/pom.xml | 2 +- external/docker-integration-tests/pom.xml | 2 +- external/flume-assembly/pom.xml | 2 +- external/flume-sink/pom.xml | 2 +- external/flume/pom.xml| 2 +- external/java8-tests/pom.xml | 2 +- external/kafka-0-10-assembly/pom.xml | 2 +- external/kafka-0-10/pom.xml | 2 +- external/kafka-0-8-assembly/pom.xml | 2 +- external/kafka-0-8/pom.xml| 2 +- external/kinesis-asl-assembly/pom.xml | 2 +- external/kinesis-asl/pom.xml | 2 +- external/spark-ganglia-lgpl/pom.xml | 2 +- graphx/pom.xml| 2 +- launcher/pom.xml | 2 +- mllib-local/pom.xml | 2 +- mllib/pom.xml | 2 +- pom.xml | 2 +- repl/pom.xml | 2 +- sql/catalyst/pom.xml | 2 +- sql/core/pom.xml | 2 +- sql/hive-thriftserver/pom.xml | 2 +- sql/hive/pom.xml | 2 +- streaming/pom.xml | 2 +- tools/pom.xml | 2 +- yarn/pom.xml | 2 +- 34 files changed, 34 insertions(+), 34 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/c393d86d/assembly/pom.xml -- diff --git a/assembly/pom.xml b/assembly/pom.xml index 6db3a59..ca6daa2 100644 --- a/assembly/pom.xml +++ b/assembly/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.11 -2.0.1 +2.0.2-SNAPSHOT ../pom.xml http://git-wip-us.apache.org/repos/asf/spark/blob/c393d86d/common/network-common/pom.xml -- diff --git a/common/network-common/pom.xml b/common/network-common/pom.xml index 269b845..c727f54 100644 --- a/common/network-common/pom.xml +++ b/common/network-common/pom.xml @@ -22,7 +22,7 @@ org.apache.spark spark-parent_2.11 -2.0.1 +2.0.2-SNAPSHOT ../../pom.xml http://git-wip-us.apache.org/repos/asf/spark/blob/c393d86d/common/network-shuffle/pom.xml -- diff --git a/common/network-shuffle/pom.xml b/common/network-shuffle/pom.xml index 20cf29e..e335a89 100644 --- a/common/network-shuffle/pom.xml +++ b/common/network-shuffle/pom.xml @@ -22,7 +22,7 @@ org.apache.spark spark-parent_2.11 -2.0.1 +2.0.2-SNAPSHOT ../../pom.xml http://git-wip-us.apache.org/repos/asf/spark/blob/c393d86d/common/network-yarn/pom.xml -- diff --git a/common/network-yarn/pom.xml b/common/network-yarn/pom.xml index 25cc328..8e64f56 100644 --- a/common/network-yarn/pom.xml +++ b/common/network-yarn/pom.xml @@ -22,7 +22,7 @@ org.apache.spark spark-parent_2.11 -2.0.1 +2.0.2-SNAPSHOT ../../pom.xml http://git-wip-us.apache.org/repos/asf/spark/blob/c393d86d/common/sketch/pom.xml -- diff --git a/common/sketch/pom.xml b/common/sketch/pom.xml index 37a5d09..94c75d6 100644 --- a/common/sketch/pom.xml +++ b/common/sketch/pom.xml @@ -22,7 +22,7 @@ org.apache.spark spark-parent_2.11 -2.0.1 +2.0.2-SNAPSHOT ../../pom.xml http://git-wip-us.apache.org/repos/asf/spark/blob/c393d86d/common/tags/pom.xml -- diff --git a/common/tags/pom.xml b/common/tags/pom.xml index ab287f3..6ff14d2 100644 --- a/common/tags/pom.xml +++ b/common/tags/pom.xml @@ -22,7 +22,7 @@ org.apache.spark spark-parent_2.11 -2.0.1 +2
[spark] Git Push Summary
Repository: spark Updated Tags: refs/tags/v2.0.1-rc2 [created] 04141ad49 - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-17599][SPARK-17569] Backport and to Spark 2.0 branch
Repository: spark Updated Branches: refs/heads/branch-2.0 0a593db36 -> c2cb84165 [SPARK-17599][SPARK-17569] Backport and to Spark 2.0 branch ## What changes were proposed in this pull request? This Backports PR #15153 and PR #15122 to Spark 2.0 branch for Structured Streaming. It is structured a bit differently because similar code paths already existed in the 2.0 branch. The unit test makes sure that both behaviors don't break. Author: Burak Yavuz Closes #15202 from brkyvz/backports-to-streaming. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c2cb8416 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c2cb8416 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c2cb8416 Branch: refs/heads/branch-2.0 Commit: c2cb84165960998821c53d6a45507df639aa1425 Parents: 0a593db Author: Burak Yavuz Authored: Thu Sep 22 17:22:04 2016 -0700 Committer: Reynold Xin Committed: Thu Sep 22 17:22:04 2016 -0700 -- .../execution/streaming/FileStreamSource.scala | 3 +- .../datasources/FileCatalogSuite.scala | 12 + .../streaming/FileStreamSourceSuite.scala | 51 +++- 3 files changed, 64 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/c2cb8416/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala index 0dc08b1..4515f9a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala @@ -133,7 +133,8 @@ class FileStreamSource( userSpecifiedSchema = Some(schema), className = fileFormatClassName, options = sourceOptions.optionMapWithoutPath) -Dataset.ofRows(sparkSession, LogicalRelation(newDataSource.resolveRelation())) +Dataset.ofRows(sparkSession, LogicalRelation(newDataSource.resolveRelation( + checkPathExist = false))) } /** http://git-wip-us.apache.org/repos/asf/spark/blob/c2cb8416/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileCatalogSuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileCatalogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileCatalogSuite.scala index 563f340..4f12df9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileCatalogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileCatalogSuite.scala @@ -70,6 +70,18 @@ class FileCatalogSuite extends SharedSQLContext { } } + test("ListingFileCatalog: folders that don't exist don't throw exceptions") { +withTempDir { dir => + val deletedFolder = new File(dir, "deleted") + assert(!deletedFolder.exists()) + val catalog1 = new ListingFileCatalog( +spark, Seq(new Path(deletedFolder.getCanonicalPath)), Map.empty, None, +ignoreFileNotFound = true) + // doesn't throw an exception + assert(catalog1.listLeafFiles(catalog1.paths).isEmpty) +} + } + test("SPARK-17613 - PartitioningAwareFileCatalog: base path w/o '/' at end") { class MockCatalog( override val paths: Seq[Path]) extends PartitioningAwareFileCatalog(spark, Map.empty, None) { http://git-wip-us.apache.org/repos/asf/spark/blob/c2cb8416/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceSuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceSuite.scala index c6db2fd..dfe4bb8 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceSuite.scala @@ -17,9 +17,19 @@ package org.apache.spark.sql.execution.streaming +import java.io.{File, FileNotFoundException} +import java.net.URI + +import scala.util.Random + +import org.apache.hadoop.fs.{FileStatus, Path, RawLocalFileSystem} + import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.execution.streaming.ExistsThrowsExceptionFileSystem._ +import org.apache.spark.sql.test.SharedSQLContext +import org.apache.spark.sql.types.StructType -class FileStreamSourceSuite extends SparkFunSuite {
spark git commit: [SPARK-17635][SQL] Remove hardcode "agg_plan" in HashAggregateExec
Repository: spark Updated Branches: refs/heads/master a16619683 -> 79159a1e8 [SPARK-17635][SQL] Remove hardcode "agg_plan" in HashAggregateExec ## What changes were proposed in this pull request? "agg_plan" are hardcoded in HashAggregateExec, which have potential issue, so removing them. ## How was this patch tested? existing tests. Author: Yucai Yu Closes #15199 from yucai/agg_plan. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/79159a1e Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/79159a1e Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/79159a1e Branch: refs/heads/master Commit: 79159a1e87f19fb08a36857fc30b600ee7fdc52b Parents: a166196 Author: Yucai Yu Authored: Thu Sep 22 17:22:56 2016 -0700 Committer: Reynold Xin Committed: Thu Sep 22 17:22:56 2016 -0700 -- .../apache/spark/sql/execution/aggregate/HashAggregateExec.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/79159a1e/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala index 59e132d..06199ef 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala @@ -552,7 +552,7 @@ case class HashAggregateExec( } else { ctx.addMutableState(fastHashMapClassName, fastHashMapTerm, s"$fastHashMapTerm = new $fastHashMapClassName(" + -s"agg_plan.getTaskMemoryManager(), agg_plan.getEmptyAggregationBuffer());") +s"$thisPlan.getTaskMemoryManager(), $thisPlan.getEmptyAggregationBuffer());") ctx.addMutableState( "org.apache.spark.unsafe.KVIterator", iterTermForFastHashMap, "") - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-17569][SPARK-17569][TEST] Make the unit test added for work again
Repository: spark Updated Branches: refs/heads/master f4f6bd8c9 -> a16619683 [SPARK-17569][SPARK-17569][TEST] Make the unit test added for work again ## What changes were proposed in this pull request? A [PR](https://github.com/apache/spark/commit/a6aade0042d9c065669f46d2dac40ec6ce361e63) was merged concurrently that made the unit test for PR #15122 not test anything anymore. This PR fixes the test. ## How was this patch tested? Changed line https://github.com/apache/spark/blob/0d634875026ccf1eaf984996e9460d7673561f80/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala#L137 from `false` to `true` and made sure the unit test failed. Author: Burak Yavuz Closes #15203 from brkyvz/fix-test. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a1661968 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a1661968 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a1661968 Branch: refs/heads/master Commit: a1661968310de35e710e3b6784f63a77c44453fc Parents: f4f6bd8 Author: Burak Yavuz Authored: Thu Sep 22 16:50:22 2016 -0700 Committer: Shixiong Zhu Committed: Thu Sep 22 16:50:22 2016 -0700 -- .../spark/sql/execution/streaming/FileStreamSourceSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/a1661968/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceSuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceSuite.scala index e8fa6a5..0795a05 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceSuite.scala @@ -92,7 +92,7 @@ class FileStreamSourceSuite extends SparkFunSuite with SharedSQLContext { val dir = new File(temp, "dir") // use non-existent directory to test whether log make the dir val metadataLog = new FileStreamSourceLog(FileStreamSourceLog.VERSION, spark, dir.getAbsolutePath) - assert(metadataLog.add(0, Array(FileEntry(s"$scheme:///file1", 100L + assert(metadataLog.add(0, Array(FileEntry(s"$scheme:///file1", 100L, 0 val newSource = new FileStreamSource(spark, s"$scheme:///", "parquet", StructType(Nil), dir.getAbsolutePath, Map.empty) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-16240][ML] ML persistence backward compatibility for LDA
Repository: spark Updated Branches: refs/heads/master 0d6348750 -> f4f6bd8c9 [SPARK-16240][ML] ML persistence backward compatibility for LDA ## What changes were proposed in this pull request? Allow Spark 2.x to load instances of LDA, LocalLDAModel, and DistributedLDAModel saved from Spark 1.6. ## How was this patch tested? I tested this manually, saving the 3 types from 1.6 and loading them into master (2.x). In the future, we can add generic tests for testing backwards compatibility across all ML models in SPARK-15573. Author: Joseph K. Bradley Closes #15034 from jkbradley/lda-backwards. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f4f6bd8c Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f4f6bd8c Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f4f6bd8c Branch: refs/heads/master Commit: f4f6bd8c9884e3919509907307fda774f56b5ecc Parents: 0d63487 Author: Gayathri Murali Authored: Thu Sep 22 16:34:42 2016 -0700 Committer: Joseph K. Bradley Committed: Thu Sep 22 16:34:42 2016 -0700 -- .../org/apache/spark/ml/clustering/LDA.scala| 86 project/MimaExcludes.scala | 4 +- 2 files changed, 72 insertions(+), 18 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/f4f6bd8c/mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala b/mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala index b5a764b..7773802 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala @@ -18,6 +18,9 @@ package org.apache.spark.ml.clustering import org.apache.hadoop.fs.Path +import org.json4s.DefaultFormats +import org.json4s.JsonAST.JObject +import org.json4s.jackson.JsonMethods._ import org.apache.spark.annotation.{DeveloperApi, Experimental, Since} import org.apache.spark.internal.Logging @@ -26,19 +29,21 @@ import org.apache.spark.ml.linalg.{Matrix, Vector, Vectors, VectorUDT} import org.apache.spark.ml.param._ import org.apache.spark.ml.param.shared.{HasCheckpointInterval, HasFeaturesCol, HasMaxIter, HasSeed} import org.apache.spark.ml.util._ +import org.apache.spark.ml.util.DefaultParamsReader.Metadata import org.apache.spark.mllib.clustering.{DistributedLDAModel => OldDistributedLDAModel, EMLDAOptimizer => OldEMLDAOptimizer, LDA => OldLDA, LDAModel => OldLDAModel, LDAOptimizer => OldLDAOptimizer, LocalLDAModel => OldLocalLDAModel, OnlineLDAOptimizer => OldOnlineLDAOptimizer} import org.apache.spark.mllib.impl.PeriodicCheckpointer -import org.apache.spark.mllib.linalg.{Matrices => OldMatrices, Vector => OldVector, - Vectors => OldVectors} +import org.apache.spark.mllib.linalg.{Vector => OldVector, Vectors => OldVectors} import org.apache.spark.mllib.linalg.MatrixImplicits._ import org.apache.spark.mllib.linalg.VectorImplicits._ +import org.apache.spark.mllib.util.MLUtils import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession} import org.apache.spark.sql.functions.{col, monotonically_increasing_id, udf} import org.apache.spark.sql.types.StructType +import org.apache.spark.util.VersionUtils private[clustering] trait LDAParams extends Params with HasFeaturesCol with HasMaxIter @@ -80,6 +85,7 @@ private[clustering] trait LDAParams extends Params with HasFeaturesCol with HasM * - Values should be >= 0 * - default = uniformly (1.0 / k), following the implementation from * [[https://github.com/Blei-Lab/onlineldavb]]. + * * @group param */ @Since("1.6.0") @@ -121,6 +127,7 @@ private[clustering] trait LDAParams extends Params with HasFeaturesCol with HasM * - Value should be >= 0 * - default = (1.0 / k), following the implementation from * [[https://github.com/Blei-Lab/onlineldavb]]. + * * @group param */ @Since("1.6.0") @@ -354,6 +361,39 @@ private[clustering] trait LDAParams extends Params with HasFeaturesCol with HasM } } +private object LDAParams { + + /** + * Equivalent to [[DefaultParamsReader.getAndSetParams()]], but handles [[LDA]] and [[LDAModel]] + * formats saved with Spark 1.6, which differ from the formats in Spark 2.0+. + * + * @param model[[LDA]] or [[LDAModel]] instance. This instance will be modified with + * [[Param]] values extracted from metadata. + * @param metadata Loaded model metadata + */ + def getAndSetParams(model: LDAParams, metadata: Metadata): Unit = { +VersionUtils.majorMinorVersion(metadata.sparkVersion) match { + case (1, 6) => +implicit val f
spark git commit: [SPARK-17616][SQL] Support a single distinct aggregate combined with a non-partial aggregate
Repository: spark Updated Branches: refs/heads/branch-2.0 47fc0b9f4 -> 0a593db36 [SPARK-17616][SQL] Support a single distinct aggregate combined with a non-partial aggregate We currently cannot execute an aggregate that contains a single distinct aggregate function and an one or more non-partially plannable aggregate functions, for example: ```sql select grp, collect_list(col1), count(distinct col2) from tbl_a group by 1 ``` This is a regression from Spark 1.6. This is caused by the fact that the single distinct aggregation code path assumes that all aggregates can be planned in two phases (is partially aggregatable). This PR works around this issue by triggering the `RewriteDistinctAggregates` in such cases (this is similar to the approach taken in 1.6). Created `RewriteDistinctAggregatesSuite` which checks if the aggregates with distinct aggregate functions get rewritten into two `Aggregates` and an `Expand`. Added a regression test to `DataFrameAggregateSuite`. Author: Herman van Hovell Closes #15187 from hvanhovell/SPARK-17616. (cherry picked from commit 0d634875026ccf1eaf984996e9460d7673561f80) Signed-off-by: Herman van Hovell Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0a593db3 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0a593db3 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0a593db3 Branch: refs/heads/branch-2.0 Commit: 0a593db360b3b7771f45f482cf45e8500f0faa76 Parents: 47fc0b9 Author: Herman van Hovell Authored: Thu Sep 22 14:29:27 2016 -0700 Committer: Herman van Hovell Committed: Thu Sep 22 16:22:31 2016 -0700 -- .../optimizer/RewriteDistinctAggregates.scala | 18 ++-- .../RewriteDistinctAggregatesSuite.scala| 94 .../spark/sql/DataFrameAggregateSuite.scala | 8 ++ 3 files changed, 111 insertions(+), 9 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/0a593db3/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteDistinctAggregates.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteDistinctAggregates.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteDistinctAggregates.scala index 0f43e7b..d6a39ec 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteDistinctAggregates.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteDistinctAggregates.scala @@ -119,14 +119,16 @@ object RewriteDistinctAggregates extends Rule[LogicalPlan] { .filter(_.isDistinct) .groupBy(_.aggregateFunction.children.toSet) -// Aggregation strategy can handle the query with single distinct -if (distinctAggGroups.size > 1) { +// Check if the aggregates contains functions that do not support partial aggregation. +val existsNonPartial = aggExpressions.exists(!_.aggregateFunction.supportsPartial) + +// Aggregation strategy can handle queries with a single distinct group and partial aggregates. +if (distinctAggGroups.size > 1 || (distinctAggGroups.size == 1 && existsNonPartial)) { // Create the attributes for the grouping id and the group by clause. - val gid = -new AttributeReference("gid", IntegerType, false)(isGenerated = true) + val gid = AttributeReference("gid", IntegerType, nullable = false)(isGenerated = true) val groupByMap = a.groupingExpressions.collect { case ne: NamedExpression => ne -> ne.toAttribute -case e => e -> new AttributeReference(e.sql, e.dataType, e.nullable)() +case e => e -> AttributeReference(e.sql, e.dataType, e.nullable)() } val groupByAttrs = groupByMap.map(_._2) @@ -135,9 +137,7 @@ object RewriteDistinctAggregates extends Rule[LogicalPlan] { def patchAggregateFunctionChildren( af: AggregateFunction)( attrs: Expression => Expression): AggregateFunction = { -af.withNewChildren(af.children.map { - case afc => attrs(afc) -}).asInstanceOf[AggregateFunction] + af.withNewChildren(af.children.map(attrs)).asInstanceOf[AggregateFunction] } // Setup unique distinct aggregate children. @@ -265,5 +265,5 @@ object RewriteDistinctAggregates extends Rule[LogicalPlan] { // NamedExpression. This is done to prevent collisions between distinct and regular aggregate // children, in this case attribute reuse causes the input of the regular aggregate to bound to // the (nulled out) input of the distinct aggregate. -e -> new AttributeReference(e.sql, e.dataType, true)() +e -> AttributeReference(e.sql, e.dataType, nullable = true)() } http:/
spark git commit: [SPARK-17616][SQL] Support a single distinct aggregate combined with a non-partial aggregate
Repository: spark Updated Branches: refs/heads/master 3cdae0ff2 -> 0d6348750 [SPARK-17616][SQL] Support a single distinct aggregate combined with a non-partial aggregate ## What changes were proposed in this pull request? We currently cannot execute an aggregate that contains a single distinct aggregate function and an one or more non-partially plannable aggregate functions, for example: ```sql select grp, collect_list(col1), count(distinct col2) from tbl_a group by 1 ``` This is a regression from Spark 1.6. This is caused by the fact that the single distinct aggregation code path assumes that all aggregates can be planned in two phases (is partially aggregatable). This PR works around this issue by triggering the `RewriteDistinctAggregates` in such cases (this is similar to the approach taken in 1.6). ## How was this patch tested? Created `RewriteDistinctAggregatesSuite` which checks if the aggregates with distinct aggregate functions get rewritten into two `Aggregates` and an `Expand`. Added a regression test to `DataFrameAggregateSuite`. Author: Herman van Hovell Closes #15187 from hvanhovell/SPARK-17616. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0d634875 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0d634875 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0d634875 Branch: refs/heads/master Commit: 0d634875026ccf1eaf984996e9460d7673561f80 Parents: 3cdae0f Author: Herman van Hovell Authored: Thu Sep 22 14:29:27 2016 -0700 Committer: Herman van Hovell Committed: Thu Sep 22 14:29:27 2016 -0700 -- .../optimizer/RewriteDistinctAggregates.scala | 18 ++-- .../RewriteDistinctAggregatesSuite.scala| 94 .../spark/sql/DataFrameAggregateSuite.scala | 8 ++ 3 files changed, 111 insertions(+), 9 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/0d634875/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteDistinctAggregates.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteDistinctAggregates.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteDistinctAggregates.scala index 0f43e7b..d6a39ec 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteDistinctAggregates.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteDistinctAggregates.scala @@ -119,14 +119,16 @@ object RewriteDistinctAggregates extends Rule[LogicalPlan] { .filter(_.isDistinct) .groupBy(_.aggregateFunction.children.toSet) -// Aggregation strategy can handle the query with single distinct -if (distinctAggGroups.size > 1) { +// Check if the aggregates contains functions that do not support partial aggregation. +val existsNonPartial = aggExpressions.exists(!_.aggregateFunction.supportsPartial) + +// Aggregation strategy can handle queries with a single distinct group and partial aggregates. +if (distinctAggGroups.size > 1 || (distinctAggGroups.size == 1 && existsNonPartial)) { // Create the attributes for the grouping id and the group by clause. - val gid = -new AttributeReference("gid", IntegerType, false)(isGenerated = true) + val gid = AttributeReference("gid", IntegerType, nullable = false)(isGenerated = true) val groupByMap = a.groupingExpressions.collect { case ne: NamedExpression => ne -> ne.toAttribute -case e => e -> new AttributeReference(e.sql, e.dataType, e.nullable)() +case e => e -> AttributeReference(e.sql, e.dataType, e.nullable)() } val groupByAttrs = groupByMap.map(_._2) @@ -135,9 +137,7 @@ object RewriteDistinctAggregates extends Rule[LogicalPlan] { def patchAggregateFunctionChildren( af: AggregateFunction)( attrs: Expression => Expression): AggregateFunction = { -af.withNewChildren(af.children.map { - case afc => attrs(afc) -}).asInstanceOf[AggregateFunction] + af.withNewChildren(af.children.map(attrs)).asInstanceOf[AggregateFunction] } // Setup unique distinct aggregate children. @@ -265,5 +265,5 @@ object RewriteDistinctAggregates extends Rule[LogicalPlan] { // NamedExpression. This is done to prevent collisions between distinct and regular aggregate // children, in this case attribute reuse causes the input of the regular aggregate to bound to // the (nulled out) input of the distinct aggregate. -e -> new AttributeReference(e.sql, e.dataType, true)() +e -> AttributeReference(e.sql, e.dataType, nullable = true)() } http://git-wip-us.apache.org/repos/a
spark git commit: [SPARK-17638][STREAMING] Stop JVM StreamingContext when the Python process is dead
Repository: spark Updated Branches: refs/heads/branch-2.0 243bdb11d -> 47fc0b9f4 [SPARK-17638][STREAMING] Stop JVM StreamingContext when the Python process is dead ## What changes were proposed in this pull request? When the Python process is dead, the JVM StreamingContext is still running. Hence we will see a lot of Py4jException before the JVM process exits. It's better to stop the JVM StreamingContext to avoid those annoying logs. ## How was this patch tested? Jenkins Author: Shixiong Zhu Closes #15201 from zsxwing/stop-jvm-ssc. (cherry picked from commit 3cdae0ff2f45643df7bc198cb48623526c7eb1a6) Signed-off-by: Shixiong Zhu Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/47fc0b9f Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/47fc0b9f Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/47fc0b9f Branch: refs/heads/branch-2.0 Commit: 47fc0b9f40d814bc8e19f86dad591d4aed467222 Parents: 243bdb1 Author: Shixiong Zhu Authored: Thu Sep 22 14:26:45 2016 -0700 Committer: Shixiong Zhu Committed: Thu Sep 22 14:26:53 2016 -0700 -- .../streaming/api/python/PythonDStream.scala| 33 ++-- .../streaming/scheduler/JobGenerator.scala | 2 ++ .../streaming/scheduler/JobScheduler.scala | 2 ++ 3 files changed, 35 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/47fc0b9f/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala -- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala index aeff4d7..46bfc60 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala @@ -24,11 +24,14 @@ import java.util.{ArrayList => JArrayList, List => JList} import scala.collection.JavaConverters._ import scala.language.existentials +import py4j.Py4JException + import org.apache.spark.SparkException import org.apache.spark.api.java._ +import org.apache.spark.internal.Logging import org.apache.spark.rdd.RDD import org.apache.spark.storage.StorageLevel -import org.apache.spark.streaming.{Duration, Interval, Time} +import org.apache.spark.streaming.{Duration, Interval, StreamingContext, Time} import org.apache.spark.streaming.api.java._ import org.apache.spark.streaming.dstream._ import org.apache.spark.util.Utils @@ -157,7 +160,7 @@ private[python] object PythonTransformFunctionSerializer { /** * Helper functions, which are called from Python via Py4J. */ -private[python] object PythonDStream { +private[streaming] object PythonDStream { /** * can not access PythonTransformFunctionSerializer.register() via Py4j @@ -184,6 +187,32 @@ private[python] object PythonDStream { rdds.asScala.foreach(queue.add) queue } + + /** + * Stop [[StreamingContext]] if the Python process crashes (E.g., OOM) in case the user cannot + * stop it in the Python side. + */ + def stopStreamingContextIfPythonProcessIsDead(e: Throwable): Unit = { +// These two special messages are from: +// scalastyle:off +// https://github.com/bartdag/py4j/blob/5cbb15a21f857e8cf334ce5f675f5543472f72eb/py4j-java/src/main/java/py4j/CallbackClient.java#L218 +// https://github.com/bartdag/py4j/blob/5cbb15a21f857e8cf334ce5f675f5543472f72eb/py4j-java/src/main/java/py4j/CallbackClient.java#L340 +// scalastyle:on +if (e.isInstanceOf[Py4JException] && + ("Cannot obtain a new communication channel" == e.getMessage || +"Error while obtaining a new communication channel" == e.getMessage)) { + // Start a new thread to stop StreamingContext to avoid deadlock. + new Thread("Stop-StreamingContext") with Logging { +setDaemon(true) + +override def run(): Unit = { + logError( +"Cannot connect to Python process. It's probably dead. Stopping StreamingContext.", e) + StreamingContext.getActive().foreach(_.stop(stopSparkContext = false)) +} + }.start() +} + } } /** http://git-wip-us.apache.org/repos/asf/spark/blob/47fc0b9f/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala -- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala index 19c88f1..4489a53 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala +++ b/streaming/src/main/scala/org/a
spark git commit: [SPARK-17638][STREAMING] Stop JVM StreamingContext when the Python process is dead
Repository: spark Updated Branches: refs/heads/master 85d609cf2 -> 3cdae0ff2 [SPARK-17638][STREAMING] Stop JVM StreamingContext when the Python process is dead ## What changes were proposed in this pull request? When the Python process is dead, the JVM StreamingContext is still running. Hence we will see a lot of Py4jException before the JVM process exits. It's better to stop the JVM StreamingContext to avoid those annoying logs. ## How was this patch tested? Jenkins Author: Shixiong Zhu Closes #15201 from zsxwing/stop-jvm-ssc. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3cdae0ff Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3cdae0ff Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3cdae0ff Branch: refs/heads/master Commit: 3cdae0ff2f45643df7bc198cb48623526c7eb1a6 Parents: 85d609c Author: Shixiong Zhu Authored: Thu Sep 22 14:26:45 2016 -0700 Committer: Shixiong Zhu Committed: Thu Sep 22 14:26:45 2016 -0700 -- .../streaming/api/python/PythonDStream.scala| 33 ++-- .../streaming/scheduler/JobGenerator.scala | 2 ++ .../streaming/scheduler/JobScheduler.scala | 2 ++ 3 files changed, 35 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/3cdae0ff/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala -- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala index aeff4d7..46bfc60 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala @@ -24,11 +24,14 @@ import java.util.{ArrayList => JArrayList, List => JList} import scala.collection.JavaConverters._ import scala.language.existentials +import py4j.Py4JException + import org.apache.spark.SparkException import org.apache.spark.api.java._ +import org.apache.spark.internal.Logging import org.apache.spark.rdd.RDD import org.apache.spark.storage.StorageLevel -import org.apache.spark.streaming.{Duration, Interval, Time} +import org.apache.spark.streaming.{Duration, Interval, StreamingContext, Time} import org.apache.spark.streaming.api.java._ import org.apache.spark.streaming.dstream._ import org.apache.spark.util.Utils @@ -157,7 +160,7 @@ private[python] object PythonTransformFunctionSerializer { /** * Helper functions, which are called from Python via Py4J. */ -private[python] object PythonDStream { +private[streaming] object PythonDStream { /** * can not access PythonTransformFunctionSerializer.register() via Py4j @@ -184,6 +187,32 @@ private[python] object PythonDStream { rdds.asScala.foreach(queue.add) queue } + + /** + * Stop [[StreamingContext]] if the Python process crashes (E.g., OOM) in case the user cannot + * stop it in the Python side. + */ + def stopStreamingContextIfPythonProcessIsDead(e: Throwable): Unit = { +// These two special messages are from: +// scalastyle:off +// https://github.com/bartdag/py4j/blob/5cbb15a21f857e8cf334ce5f675f5543472f72eb/py4j-java/src/main/java/py4j/CallbackClient.java#L218 +// https://github.com/bartdag/py4j/blob/5cbb15a21f857e8cf334ce5f675f5543472f72eb/py4j-java/src/main/java/py4j/CallbackClient.java#L340 +// scalastyle:on +if (e.isInstanceOf[Py4JException] && + ("Cannot obtain a new communication channel" == e.getMessage || +"Error while obtaining a new communication channel" == e.getMessage)) { + // Start a new thread to stop StreamingContext to avoid deadlock. + new Thread("Stop-StreamingContext") with Logging { +setDaemon(true) + +override def run(): Unit = { + logError( +"Cannot connect to Python process. It's probably dead. Stopping StreamingContext.", e) + StreamingContext.getActive().foreach(_.stop(stopSparkContext = false)) +} + }.start() +} + } } /** http://git-wip-us.apache.org/repos/asf/spark/blob/3cdae0ff/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala -- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala index 10d64f9..8d83dc8 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala @@ -22,6 +22,7 @@ import scala.util.{Failure, Success, T
spark git commit: [SPARK-17613] S3A base paths with no '/' at the end return empty DataFrames
Repository: spark Updated Branches: refs/heads/branch-2.0 f14f47f07 -> 243bdb11d [SPARK-17613] S3A base paths with no '/' at the end return empty DataFrames Consider you have a bucket as `s3a://some-bucket` and under it you have files: ``` s3a://some-bucket/file1.parquet s3a://some-bucket/file2.parquet ``` Getting the parent path of `s3a://some-bucket/file1.parquet` yields `s3a://some-bucket/` and the ListingFileCatalog uses this as the key in the hash map. When catalog.allFiles is called, we use `s3a://some-bucket` (no slash at the end) to get the list of files, and we're left with an empty list! This PR fixes this by adding a `/` at the end of the `URI` iff the given `Path` doesn't have a parent, i.e. is the root. This is a no-op if the path already had a `/` at the end, and is handled through the Hadoop Path, path merging semantics. Unit test in `FileCatalogSuite`. Author: Burak Yavuz Closes #15169 from brkyvz/SPARK-17613. (cherry picked from commit 85d609cf25c1da2df3cd4f5d5aeaf3cbcf0d674c) Signed-off-by: Josh Rosen Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/243bdb11 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/243bdb11 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/243bdb11 Branch: refs/heads/branch-2.0 Commit: 243bdb11d89ee379acae1ea1ed78df10797e86d1 Parents: f14f47f Author: Burak Yavuz Authored: Thu Sep 22 13:05:41 2016 -0700 Committer: Josh Rosen Committed: Thu Sep 22 13:06:15 2016 -0700 -- .../PartitioningAwareFileCatalog.scala | 10 - .../datasources/FileCatalogSuite.scala | 45 +++- 2 files changed, 53 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/243bdb11/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala index cef9d4d..2130c27 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala @@ -76,7 +76,15 @@ abstract class PartitioningAwareFileCatalog( paths.flatMap { path => // Make the path qualified (consistent with listLeafFiles and listLeafFilesInParallel). val fs = path.getFileSystem(hadoopConf) -val qualifiedPath = fs.makeQualified(path) +val qualifiedPathPre = fs.makeQualified(path) +val qualifiedPath: Path = if (qualifiedPathPre.isRoot && !qualifiedPathPre.isAbsolute) { + // SPARK-17613: Always append `Path.SEPARATOR` to the end of parent directories, + // because the `leafFile.getParent` would have returned an absolute path with the + // separator at the end. + new Path(qualifiedPathPre, Path.SEPARATOR) +} else { + qualifiedPathPre +} // There are three cases possible with each path // 1. The path is a directory and has children files in it. Then it must be present in http://git-wip-us.apache.org/repos/asf/spark/blob/243bdb11/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileCatalogSuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileCatalogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileCatalogSuite.scala index 0d9ea51..563f340 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileCatalogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileCatalogSuite.scala @@ -18,10 +18,12 @@ package org.apache.spark.sql.execution.datasources import java.io.File +import java.net.URI +import scala.collection.mutable import scala.language.reflectiveCalls -import org.apache.hadoop.fs.Path +import org.apache.hadoop.fs.{FileStatus, Path, RawLocalFileSystem} import org.apache.spark.sql.catalyst.util._ import org.apache.spark.sql.test.SharedSQLContext @@ -67,4 +69,45 @@ class FileCatalogSuite extends SharedSQLContext { } } + + test("SPARK-17613 - PartitioningAwareFileCatalog: base path w/o '/' at end") { +class MockCatalog( + override val paths: Seq[Path]) extends PartitioningAwareFileCatalog(spark, Map.empty, None) { + + override def refresh(): Unit = {} + + override def leafFiles: mutable.LinkedHashMap[Path, FileStatus] = mutable.LinkedHashMap( +new Path("
spark git commit: [SPARK-17613] S3A base paths with no '/' at the end return empty DataFrames
Repository: spark Updated Branches: refs/heads/master 9f24a17c5 -> 85d609cf2 [SPARK-17613] S3A base paths with no '/' at the end return empty DataFrames ## What changes were proposed in this pull request? Consider you have a bucket as `s3a://some-bucket` and under it you have files: ``` s3a://some-bucket/file1.parquet s3a://some-bucket/file2.parquet ``` Getting the parent path of `s3a://some-bucket/file1.parquet` yields `s3a://some-bucket/` and the ListingFileCatalog uses this as the key in the hash map. When catalog.allFiles is called, we use `s3a://some-bucket` (no slash at the end) to get the list of files, and we're left with an empty list! This PR fixes this by adding a `/` at the end of the `URI` iff the given `Path` doesn't have a parent, i.e. is the root. This is a no-op if the path already had a `/` at the end, and is handled through the Hadoop Path, path merging semantics. ## How was this patch tested? Unit test in `FileCatalogSuite`. Author: Burak Yavuz Closes #15169 from brkyvz/SPARK-17613. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/85d609cf Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/85d609cf Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/85d609cf Branch: refs/heads/master Commit: 85d609cf25c1da2df3cd4f5d5aeaf3cbcf0d674c Parents: 9f24a17 Author: Burak Yavuz Authored: Thu Sep 22 13:05:41 2016 -0700 Committer: Josh Rosen Committed: Thu Sep 22 13:05:41 2016 -0700 -- .../PartitioningAwareFileCatalog.scala | 10 - .../datasources/FileCatalogSuite.scala | 45 +++- 2 files changed, 53 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/85d609cf/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala index d2d5b56..702ba97 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala @@ -76,7 +76,15 @@ abstract class PartitioningAwareFileCatalog( paths.flatMap { path => // Make the path qualified (consistent with listLeafFiles and listLeafFilesInParallel). val fs = path.getFileSystem(hadoopConf) -val qualifiedPath = fs.makeQualified(path) +val qualifiedPathPre = fs.makeQualified(path) +val qualifiedPath: Path = if (qualifiedPathPre.isRoot && !qualifiedPathPre.isAbsolute) { + // SPARK-17613: Always append `Path.SEPARATOR` to the end of parent directories, + // because the `leafFile.getParent` would have returned an absolute path with the + // separator at the end. + new Path(qualifiedPathPre, Path.SEPARATOR) +} else { + qualifiedPathPre +} // There are three cases possible with each path // 1. The path is a directory and has children files in it. Then it must be present in http://git-wip-us.apache.org/repos/asf/spark/blob/85d609cf/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileCatalogSuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileCatalogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileCatalogSuite.scala index 5c8d322..fa3abd0 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileCatalogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileCatalogSuite.scala @@ -18,10 +18,12 @@ package org.apache.spark.sql.execution.datasources import java.io.File +import java.net.URI +import scala.collection.mutable import scala.language.reflectiveCalls -import org.apache.hadoop.fs.Path +import org.apache.hadoop.fs.{FileStatus, Path, RawLocalFileSystem} import org.apache.spark.sql.catalyst.util._ import org.apache.spark.sql.test.SharedSQLContext @@ -78,4 +80,45 @@ class FileCatalogSuite extends SharedSQLContext { assert(catalog1.listLeafFiles(catalog1.paths).isEmpty) } } + + test("SPARK-17613 - PartitioningAwareFileCatalog: base path w/o '/' at end") { +class MockCatalog( + override val paths: Seq[Path]) extends PartitioningAwareFileCatalog(spark, Map.empty, None) { + + override def refresh(): Unit = {} + + override def leafFiles: mutable.LinkedHashMap[Path, FileStatus] = mut
spark git commit: Skip building R vignettes if Spark is not built
Repository: spark Updated Branches: refs/heads/branch-2.0 b25a8e6e1 -> f14f47f07 Skip building R vignettes if Spark is not built ## What changes were proposed in this pull request? When we build the docs separately we don't have the JAR files from the Spark build in the same tree. As the SparkR vignettes need to launch a SparkContext to be built, we skip building them if JAR files don't exist ## How was this patch tested? To test this we can run the following: ``` build/mvn -DskipTests -Psparkr clean ./R/create-docs.sh ``` You should see a line `Skipping R vignettes as Spark JARs not found` at the end Author: Shivaram Venkataraman Closes #15200 from shivaram/sparkr-vignette-skip. (cherry picked from commit 9f24a17c59b1130d97efa7d313c06577f7344338) Signed-off-by: Reynold Xin Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f14f47f0 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f14f47f0 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f14f47f0 Branch: refs/heads/branch-2.0 Commit: f14f47f072a392df0ebe908f1c57b6eb858105b7 Parents: b25a8e6 Author: Shivaram Venkataraman Authored: Thu Sep 22 11:52:42 2016 -0700 Committer: Reynold Xin Committed: Thu Sep 22 11:54:51 2016 -0700 -- R/create-docs.sh | 25 ++--- 1 file changed, 22 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/f14f47f0/R/create-docs.sh -- diff --git a/R/create-docs.sh b/R/create-docs.sh index 0dfba22..69ffc5f 100755 --- a/R/create-docs.sh +++ b/R/create-docs.sh @@ -30,6 +30,13 @@ set -e # Figure out where the script is export FWDIR="$(cd "`dirname "$0"`"; pwd)" +export SPARK_HOME="$(cd "`dirname "$0"`"/..; pwd)" + +# Required for setting SPARK_SCALA_VERSION +. "${SPARK_HOME}"/bin/load-spark-env.sh + +echo "Using Scala $SPARK_SCALA_VERSION" + pushd $FWDIR # Install the package (this will also generate the Rd files) @@ -45,9 +52,21 @@ Rscript -e 'libDir <- "../../lib"; library(SparkR, lib.loc=libDir); library(knit popd -# render creates SparkR vignettes -Rscript -e 'library(rmarkdown); paths <- .libPaths(); .libPaths(c("lib", paths)); Sys.setenv(SPARK_HOME=tools::file_path_as_absolute("..")); render("pkg/vignettes/sparkr-vignettes.Rmd"); .libPaths(paths)' +# Find Spark jars. +if [ -f "${SPARK_HOME}/RELEASE" ]; then + SPARK_JARS_DIR="${SPARK_HOME}/jars" +else + SPARK_JARS_DIR="${SPARK_HOME}/assembly/target/scala-$SPARK_SCALA_VERSION/jars" +fi + +# Only create vignettes if Spark JARs exist +if [ -d "$SPARK_JARS_DIR" ]; then + # render creates SparkR vignettes + Rscript -e 'library(rmarkdown); paths <- .libPaths(); .libPaths(c("lib", paths)); Sys.setenv(SPARK_HOME=tools::file_path_as_absolute("..")); render("pkg/vignettes/sparkr-vignettes.Rmd"); .libPaths(paths)' -find pkg/vignettes/. -not -name '.' -not -name '*.Rmd' -not -name '*.md' -not -name '*.pdf' -not -name '*.html' -delete + find pkg/vignettes/. -not -name '.' -not -name '*.Rmd' -not -name '*.md' -not -name '*.pdf' -not -name '*.html' -delete +else + echo "Skipping R vignettes as Spark JARs not found in $SPARK_HOME" +fi popd - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: Skip building R vignettes if Spark is not built
Repository: spark Updated Branches: refs/heads/master 17b72d31e -> 9f24a17c5 Skip building R vignettes if Spark is not built ## What changes were proposed in this pull request? When we build the docs separately we don't have the JAR files from the Spark build in the same tree. As the SparkR vignettes need to launch a SparkContext to be built, we skip building them if JAR files don't exist ## How was this patch tested? To test this we can run the following: ``` build/mvn -DskipTests -Psparkr clean ./R/create-docs.sh ``` You should see a line `Skipping R vignettes as Spark JARs not found` at the end Author: Shivaram Venkataraman Closes #15200 from shivaram/sparkr-vignette-skip. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9f24a17c Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9f24a17c Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9f24a17c Branch: refs/heads/master Commit: 9f24a17c59b1130d97efa7d313c06577f7344338 Parents: 17b72d3 Author: Shivaram Venkataraman Authored: Thu Sep 22 11:52:42 2016 -0700 Committer: Reynold Xin Committed: Thu Sep 22 11:52:42 2016 -0700 -- R/create-docs.sh | 25 ++--- 1 file changed, 22 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/9f24a17c/R/create-docs.sh -- diff --git a/R/create-docs.sh b/R/create-docs.sh index 0dfba22..69ffc5f 100755 --- a/R/create-docs.sh +++ b/R/create-docs.sh @@ -30,6 +30,13 @@ set -e # Figure out where the script is export FWDIR="$(cd "`dirname "$0"`"; pwd)" +export SPARK_HOME="$(cd "`dirname "$0"`"/..; pwd)" + +# Required for setting SPARK_SCALA_VERSION +. "${SPARK_HOME}"/bin/load-spark-env.sh + +echo "Using Scala $SPARK_SCALA_VERSION" + pushd $FWDIR # Install the package (this will also generate the Rd files) @@ -45,9 +52,21 @@ Rscript -e 'libDir <- "../../lib"; library(SparkR, lib.loc=libDir); library(knit popd -# render creates SparkR vignettes -Rscript -e 'library(rmarkdown); paths <- .libPaths(); .libPaths(c("lib", paths)); Sys.setenv(SPARK_HOME=tools::file_path_as_absolute("..")); render("pkg/vignettes/sparkr-vignettes.Rmd"); .libPaths(paths)' +# Find Spark jars. +if [ -f "${SPARK_HOME}/RELEASE" ]; then + SPARK_JARS_DIR="${SPARK_HOME}/jars" +else + SPARK_JARS_DIR="${SPARK_HOME}/assembly/target/scala-$SPARK_SCALA_VERSION/jars" +fi + +# Only create vignettes if Spark JARs exist +if [ -d "$SPARK_JARS_DIR" ]; then + # render creates SparkR vignettes + Rscript -e 'library(rmarkdown); paths <- .libPaths(); .libPaths(c("lib", paths)); Sys.setenv(SPARK_HOME=tools::file_path_as_absolute("..")); render("pkg/vignettes/sparkr-vignettes.Rmd"); .libPaths(paths)' -find pkg/vignettes/. -not -name '.' -not -name '*.Rmd' -not -name '*.md' -not -name '*.pdf' -not -name '*.html' -delete + find pkg/vignettes/. -not -name '.' -not -name '*.Rmd' -not -name '*.md' -not -name '*.pdf' -not -name '*.html' -delete +else + echo "Skipping R vignettes as Spark JARs not found in $SPARK_HOME" +fi popd - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-17485] Prevent failed remote reads of cached blocks from failing entire job (branch-1.6 backport)
Repository: spark Updated Branches: refs/heads/branch-1.6 ce0a222f5 -> 94524cef4 [SPARK-17485] Prevent failed remote reads of cached blocks from failing entire job (branch-1.6 backport) This patch is a branch-1.6 backport of #15037: ## What changes were proposed in this pull request? In Spark's `RDD.getOrCompute` we first try to read a local copy of a cached RDD block, then a remote copy, and only fall back to recomputing the block if no cached copy (local or remote) can be read. This logic works correctly in the case where no remote copies of the block exist, but if there _are_ remote copies and reads of those copies fail (due to network issues or internal Spark bugs) then the BlockManager will throw a `BlockFetchException` that will fail the task (and which could possibly fail the whole job if the read failures keep occurring). In the cases of TorrentBroadcast and task result fetching we really do want to fail the entire job in case no remote blocks can be fetched, but this logic is inappropriate for reads of cached RDD blocks because those can/should be recomputed in case cached blocks are unavailable. Therefore, I think that the `BlockManager.getRemoteBytes()` method should never throw on remote fetch errors and, instead, should handle failures by returning `None`. ## How was this patch tested? Block manager changes should be covered by modified tests in `BlockManagerSuite`: the old tests expected exceptions to be thrown on failed remote reads, while the modified tests now expect `None` to be returned from the `getRemote*` method. I also manually inspected all usages of `BlockManager.getRemoteValues()`, `getRemoteBytes()`, and `get()` to verify that they correctly pattern-match on the result and handle `None`. Note that these `None` branches are already exercised because the old `getRemoteBytes` returned `None` when no remote locations for the block could be found (which could occur if an executor died and its block manager de-registered with the master). Author: Josh Rosen Closes #15186 from JoshRosen/SPARK-17485-branch-1.6-backport. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/94524cef Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/94524cef Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/94524cef Branch: refs/heads/branch-1.6 Commit: 94524cef4cf367a0e73ebe0e919cc21f25f1043f Parents: ce0a222 Author: Josh Rosen Authored: Thu Sep 22 11:05:35 2016 -0700 Committer: Josh Rosen Committed: Thu Sep 22 11:05:35 2016 -0700 -- .../spark/storage/BlockFetchException.scala | 24 .../org/apache/spark/storage/BlockManager.scala | 3 ++- .../spark/storage/BlockManagerSuite.scala | 7 +++--- 3 files changed, 5 insertions(+), 29 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/94524cef/core/src/main/scala/org/apache/spark/storage/BlockFetchException.scala -- diff --git a/core/src/main/scala/org/apache/spark/storage/BlockFetchException.scala b/core/src/main/scala/org/apache/spark/storage/BlockFetchException.scala deleted file mode 100644 index f6e46ae..000 --- a/core/src/main/scala/org/apache/spark/storage/BlockFetchException.scala +++ /dev/null @@ -1,24 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - *http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.storage - -import org.apache.spark.SparkException - -private[spark] -case class BlockFetchException(messages: String, throwable: Throwable) - extends SparkException(messages, throwable) http://git-wip-us.apache.org/repos/asf/spark/blob/94524cef/core/src/main/scala/org/apache/spark/storage/BlockManager.scala -- diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 339ee144..1fc6f39 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/o
spark git commit: [SPARK-17365][CORE] Remove/Kill multiple executors together to reduce RPC call time.
Repository: spark Updated Branches: refs/heads/master 8a02410a9 -> 17b72d31e [SPARK-17365][CORE] Remove/Kill multiple executors together to reduce RPC call time. ## What changes were proposed in this pull request? We are killing multiple executors together instead of iterating over expensive RPC calls to kill single executor. ## How was this patch tested? Executed sample spark job to observe executors being killed/removed with dynamic allocation enabled. Author: Dhruve Ashar Author: Dhruve Ashar Closes #15152 from dhruve/impr/SPARK-17365. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/17b72d31 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/17b72d31 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/17b72d31 Branch: refs/heads/master Commit: 17b72d31e0c59711eddeb525becb8085930eadcc Parents: 8a02410 Author: Dhruve Ashar Authored: Thu Sep 22 10:10:37 2016 -0700 Committer: Marcelo Vanzin Committed: Thu Sep 22 10:10:37 2016 -0700 -- .../apache/spark/ExecutorAllocationClient.scala | 9 +- .../spark/ExecutorAllocationManager.scala | 86 +--- .../scala/org/apache/spark/SparkContext.scala | 24 ++-- .../cluster/CoarseGrainedSchedulerBackend.scala | 12 +- ...pache.spark.scheduler.ExternalClusterManager | 3 +- .../spark/ExecutorAllocationManagerSuite.scala | 135 +-- .../StandaloneDynamicAllocationSuite.scala | 6 +- project/MimaExcludes.scala | 3 + .../scheduler/ExecutorAllocationManager.scala | 2 +- .../streaming/scheduler/JobScheduler.scala | 9 +- .../ExecutorAllocationManagerSuite.scala| 5 +- 11 files changed, 239 insertions(+), 55 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/17b72d31/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala -- diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala index 8baddf4..5d47f62 100644 --- a/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala +++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala @@ -54,13 +54,16 @@ private[spark] trait ExecutorAllocationClient { /** * Request that the cluster manager kill the specified executors. - * @return whether the request is acknowledged by the cluster manager. + * @return the ids of the executors acknowledged by the cluster manager to be removed. */ - def killExecutors(executorIds: Seq[String]): Boolean + def killExecutors(executorIds: Seq[String]): Seq[String] /** * Request that the cluster manager kill the specified executor. * @return whether the request is acknowledged by the cluster manager. */ - def killExecutor(executorId: String): Boolean = killExecutors(Seq(executorId)) + def killExecutor(executorId: String): Boolean = { +val killedExecutors = killExecutors(Seq(executorId)) +killedExecutors.nonEmpty && killedExecutors(0).equals(executorId) + } } http://git-wip-us.apache.org/repos/asf/spark/blob/17b72d31/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala -- diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala index 6f320c5..1366251 100644 --- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala +++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala @@ -20,6 +20,7 @@ package org.apache.spark import java.util.concurrent.TimeUnit import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer import scala.util.control.ControlThrowable import com.codahale.metrics.{Gauge, MetricRegistry} @@ -279,14 +280,18 @@ private[spark] class ExecutorAllocationManager( updateAndSyncNumExecutorsTarget(now) +val executorIdsToBeRemoved = ArrayBuffer[String]() removeTimes.retain { case (executorId, expireTime) => val expired = now >= expireTime if (expired) { initializing = false -removeExecutor(executorId) +executorIdsToBeRemoved += executorId } !expired } +if (executorIdsToBeRemoved.nonEmpty) { + removeExecutors(executorIdsToBeRemoved) +} } /** @@ -392,10 +397,66 @@ private[spark] class ExecutorAllocationManager( } /** + * Request the cluster manager to remove the given executors. + * Returns the list of executors which are removed. + */ + private def removeExecutors(executors: Seq[String]): Seq[String] = synchronized { +val executorIdsToBeRemoved = new ArrayBuffer[String]
spark git commit: [SQL][MINOR] correct the comment of SortBasedAggregationIterator.safeProj
Repository: spark Updated Branches: refs/heads/master 72d9fba26 -> 8a02410a9 [SQL][MINOR] correct the comment of SortBasedAggregationIterator.safeProj ## What changes were proposed in this pull request? This comment went stale long time ago, this PR fixes it according to my understanding. ## How was this patch tested? N/A Author: Wenchen Fan Closes #15095 from cloud-fan/update-comment. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8a02410a Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8a02410a Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8a02410a Branch: refs/heads/master Commit: 8a02410a92429bff50d6ce082f873cea9e9fa91e Parents: 72d9fba Author: Wenchen Fan Authored: Thu Sep 22 23:25:32 2016 +0800 Committer: Cheng Lian Committed: Thu Sep 22 23:25:32 2016 +0800 -- .../aggregate/SortBasedAggregationIterator.scala | 11 +-- 1 file changed, 9 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/8a02410a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregationIterator.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregationIterator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregationIterator.scala index 3f7f849..c2b1ef0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregationIterator.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregationIterator.scala @@ -86,8 +86,15 @@ class SortBasedAggregationIterator( // The aggregation buffer used by the sort-based aggregation. private[this] val sortBasedAggregationBuffer: MutableRow = newBuffer - // A SafeProjection to turn UnsafeRow into GenericInternalRow, because UnsafeRow can't be - // compared to MutableRow (aggregation buffer) directly. + // This safe projection is used to turn the input row into safe row. This is necessary + // because the input row may be produced by unsafe projection in child operator and all the + // produced rows share one byte array. However, when we update the aggregate buffer according to + // the input row, we may cache some values from input row, e.g. `Max` will keep the max value from + // input row via MutableProjection, `CollectList` will keep all values in an array via + // ImperativeAggregate framework. These values may get changed unexpectedly if the underlying + // unsafe projection update the shared byte array. By applying a safe projection to the input row, + // we can cut down the connection from input row to the shared byte array, and thus it's safe to + // cache values from input row while updating the aggregation buffer. private[this] val safeProj: Projection = FromUnsafeProjection(valueAttributes.map(_.dataType)) protected def initialize(): Unit = { - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-17281][ML][MLLIB] Add treeAggregateDepth parameter for AFTSurvivalRegression
Repository: spark Updated Branches: refs/heads/master 646f38346 -> 72d9fba26 [SPARK-17281][ML][MLLIB] Add treeAggregateDepth parameter for AFTSurvivalRegression ## What changes were proposed in this pull request? Add treeAggregateDepth parameter for AFTSurvivalRegression to keep consistent with LiR/LoR. ## How was this patch tested? Existing tests. Author: WeichenXu Closes #14851 from WeichenXu123/add_treeAggregate_param_for_survival_regression. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/72d9fba2 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/72d9fba2 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/72d9fba2 Branch: refs/heads/master Commit: 72d9fba26c19aae73116fd0d00b566967934c6fc Parents: 646f383 Author: WeichenXu Authored: Thu Sep 22 04:35:54 2016 -0700 Committer: Yanbo Liang Committed: Thu Sep 22 04:35:54 2016 -0700 -- .../ml/regression/AFTSurvivalRegression.scala | 24 python/pyspark/ml/regression.py | 11 + 2 files changed, 25 insertions(+), 10 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/72d9fba2/mllib/src/main/scala/org/apache/spark/ml/regression/AFTSurvivalRegression.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/ml/regression/AFTSurvivalRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/regression/AFTSurvivalRegression.scala index 3179f48..9d5ba99 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/regression/AFTSurvivalRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/regression/AFTSurvivalRegression.scala @@ -46,7 +46,7 @@ import org.apache.spark.storage.StorageLevel */ private[regression] trait AFTSurvivalRegressionParams extends Params with HasFeaturesCol with HasLabelCol with HasPredictionCol with HasMaxIter - with HasTol with HasFitIntercept with Logging { + with HasTol with HasFitIntercept with HasAggregationDepth with Logging { /** * Param for censor column name. @@ -184,6 +184,17 @@ class AFTSurvivalRegression @Since("1.6.0") (@Since("1.6.0") override val uid: S setDefault(tol -> 1E-6) /** + * Suggested depth for treeAggregate (>= 2). + * If the dimensions of features or the number of partitions are large, + * this param could be adjusted to a larger size. + * Default is 2. + * @group expertSetParam + */ + @Since("2.1.0") + def setAggregationDepth(value: Int): this.type = set(aggregationDepth, value) + setDefault(aggregationDepth -> 2) + + /** * Extract [[featuresCol]], [[labelCol]] and [[censorCol]] from input dataset, * and put it in an RDD with strong types. */ @@ -207,7 +218,9 @@ class AFTSurvivalRegression @Since("1.6.0") (@Since("1.6.0") override val uid: S val combOp = (c1: MultivariateOnlineSummarizer, c2: MultivariateOnlineSummarizer) => { c1.merge(c2) } - instances.treeAggregate(new MultivariateOnlineSummarizer)(seqOp, combOp) + instances.treeAggregate( +new MultivariateOnlineSummarizer + )(seqOp, combOp, $(aggregationDepth)) } val featuresStd = featuresSummarizer.variance.toArray.map(math.sqrt) @@ -222,7 +235,7 @@ class AFTSurvivalRegression @Since("1.6.0") (@Since("1.6.0") override val uid: S val bcFeaturesStd = instances.context.broadcast(featuresStd) -val costFun = new AFTCostFun(instances, $(fitIntercept), bcFeaturesStd) +val costFun = new AFTCostFun(instances, $(fitIntercept), bcFeaturesStd, $(aggregationDepth)) val optimizer = new BreezeLBFGS[BDV[Double]]($(maxIter), 10, $(tol)) /* @@ -591,7 +604,8 @@ private class AFTAggregator( private class AFTCostFun( data: RDD[AFTPoint], fitIntercept: Boolean, -bcFeaturesStd: Broadcast[Array[Double]]) extends DiffFunction[BDV[Double]] { +bcFeaturesStd: Broadcast[Array[Double]], +aggregationDepth: Int) extends DiffFunction[BDV[Double]] { override def calculate(parameters: BDV[Double]): (Double, BDV[Double]) = { @@ -604,7 +618,7 @@ private class AFTCostFun( }, combOp = (c1, c2) => (c1, c2) match { case (aggregator1, aggregator2) => aggregator1.merge(aggregator2) - }) + }, depth = aggregationDepth) bcParameters.destroy(blocking = false) (aftAggregator.loss, aftAggregator.gradient) http://git-wip-us.apache.org/repos/asf/spark/blob/72d9fba2/python/pyspark/ml/regression.py -- diff --git a/python/pyspark/ml/regression.py b/python/pyspark/ml/regression.py index 19afc72..55d3803 100644 --- a/python/pyspark/ml/regression.py +++ b/python/pyspark/ml/regression.py @@ -1088,7 +1088,8 @@ class GBTRegressionModel(TreeEnsembleModel, JavaPredi
spark git commit: [SPARK-17421][DOCS] Documenting the current treatment of MAVEN_OPTS.
Repository: spark Updated Branches: refs/heads/master de7df7def -> 646f38346 [SPARK-17421][DOCS] Documenting the current treatment of MAVEN_OPTS. ## What changes were proposed in this pull request? Modified the documentation to clarify that `build/mvn` and `pom.xml` always add Java 7-specific parameters to `MAVEN_OPTS`, and that developers can safely ignore warnings about `-XX:MaxPermSize` that may result from compiling or running tests with Java 8. ## How was this patch tested? Rebuilt HTML documentation, made sure that building-spark.html displays correctly in a browser. Author: frreiss Closes #15005 from frreiss/fred-17421a. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/646f3834 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/646f3834 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/646f3834 Branch: refs/heads/master Commit: 646f383465c123062cbcce288a127e23984c7c7f Parents: de7df7d Author: frreiss Authored: Thu Sep 22 10:31:15 2016 +0100 Committer: Sean Owen Committed: Thu Sep 22 10:31:15 2016 +0100 -- docs/building-spark.md | 20 ++-- 1 file changed, 14 insertions(+), 6 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/646f3834/docs/building-spark.md -- diff --git a/docs/building-spark.md b/docs/building-spark.md index 6908fc1..75c304a3 100644 --- a/docs/building-spark.md +++ b/docs/building-spark.md @@ -16,11 +16,13 @@ Building Spark using Maven requires Maven 3.3.9 or newer and Java 7+. ### Setting up Maven's Memory Usage -You'll need to configure Maven to use more memory than usual by setting `MAVEN_OPTS`. We recommend the following settings: +You'll need to configure Maven to use more memory than usual by setting `MAVEN_OPTS`: -export MAVEN_OPTS="-Xmx2g -XX:MaxPermSize=512M -XX:ReservedCodeCacheSize=512m" +export MAVEN_OPTS="-Xmx2g -XX:ReservedCodeCacheSize=512m" -If you don't run this, you may see errors like the following: +When compiling with Java 7, you will need to add the additional option "-XX:MaxPermSize=512M" to MAVEN_OPTS. + +If you don't add these parameters to `MAVEN_OPTS`, you may see errors and warnings like the following: [INFO] Compiling 203 Scala sources and 9 Java sources to /Users/me/Development/spark/core/target/scala-{{site.SCALA_BINARY_VERSION}}/classes... [ERROR] PermGen space -> [Help 1] @@ -28,12 +30,18 @@ If you don't run this, you may see errors like the following: [INFO] Compiling 203 Scala sources and 9 Java sources to /Users/me/Development/spark/core/target/scala-{{site.SCALA_BINARY_VERSION}}/classes... [ERROR] Java heap space -> [Help 1] -You can fix this by setting the `MAVEN_OPTS` variable as discussed before. +[INFO] Compiling 233 Scala sources and 41 Java sources to /Users/me/Development/spark/sql/core/target/scala-{site.SCALA_BINARY_VERSION}/classes... +OpenJDK 64-Bit Server VM warning: CodeCache is full. Compiler has been disabled. +OpenJDK 64-Bit Server VM warning: Try increasing the code cache size using -XX:ReservedCodeCacheSize= + +You can fix these problems by setting the `MAVEN_OPTS` variable as discussed before. **Note:** -* For Java 8 and above this step is not required. -* If using `build/mvn` with no `MAVEN_OPTS` set, the script will automate this for you. +* If using `build/mvn` with no `MAVEN_OPTS` set, the script will automatically add the above options to the `MAVEN_OPTS` environment variable. +* The `test` phase of the Spark build will automatically add these options to `MAVEN_OPTS`, even when not using `build/mvn`. +* You may see warnings like "ignoring option MaxPermSize=1g; support was removed in 8.0" when building or running tests with Java 8 and `build/mvn`. These warnings are harmless. + ### build/mvn - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-17421][DOCS] Documenting the current treatment of MAVEN_OPTS.
Repository: spark Updated Branches: refs/heads/branch-2.0 e8b26be9b -> b25a8e6e1 [SPARK-17421][DOCS] Documenting the current treatment of MAVEN_OPTS. ## What changes were proposed in this pull request? Modified the documentation to clarify that `build/mvn` and `pom.xml` always add Java 7-specific parameters to `MAVEN_OPTS`, and that developers can safely ignore warnings about `-XX:MaxPermSize` that may result from compiling or running tests with Java 8. ## How was this patch tested? Rebuilt HTML documentation, made sure that building-spark.html displays correctly in a browser. Author: frreiss Closes #15005 from frreiss/fred-17421a. (cherry picked from commit 646f383465c123062cbcce288a127e23984c7c7f) Signed-off-by: Sean Owen Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b25a8e6e Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b25a8e6e Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b25a8e6e Branch: refs/heads/branch-2.0 Commit: b25a8e6e167717fbe92e6a9b69a8a2510bf926ca Parents: e8b26be Author: frreiss Authored: Thu Sep 22 10:31:15 2016 +0100 Committer: Sean Owen Committed: Thu Sep 22 10:31:28 2016 +0100 -- docs/building-spark.md | 20 ++-- 1 file changed, 14 insertions(+), 6 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/b25a8e6e/docs/building-spark.md -- diff --git a/docs/building-spark.md b/docs/building-spark.md index 2c987cf..330df00 100644 --- a/docs/building-spark.md +++ b/docs/building-spark.md @@ -16,11 +16,13 @@ Building Spark using Maven requires Maven 3.3.9 or newer and Java 7+. ### Setting up Maven's Memory Usage -You'll need to configure Maven to use more memory than usual by setting `MAVEN_OPTS`. We recommend the following settings: +You'll need to configure Maven to use more memory than usual by setting `MAVEN_OPTS`: -export MAVEN_OPTS="-Xmx2g -XX:MaxPermSize=512M -XX:ReservedCodeCacheSize=512m" +export MAVEN_OPTS="-Xmx2g -XX:ReservedCodeCacheSize=512m" -If you don't run this, you may see errors like the following: +When compiling with Java 7, you will need to add the additional option "-XX:MaxPermSize=512M" to MAVEN_OPTS. + +If you don't add these parameters to `MAVEN_OPTS`, you may see errors and warnings like the following: [INFO] Compiling 203 Scala sources and 9 Java sources to /Users/me/Development/spark/core/target/scala-{{site.SCALA_BINARY_VERSION}}/classes... [ERROR] PermGen space -> [Help 1] @@ -28,12 +30,18 @@ If you don't run this, you may see errors like the following: [INFO] Compiling 203 Scala sources and 9 Java sources to /Users/me/Development/spark/core/target/scala-{{site.SCALA_BINARY_VERSION}}/classes... [ERROR] Java heap space -> [Help 1] -You can fix this by setting the `MAVEN_OPTS` variable as discussed before. +[INFO] Compiling 233 Scala sources and 41 Java sources to /Users/me/Development/spark/sql/core/target/scala-{site.SCALA_BINARY_VERSION}/classes... +OpenJDK 64-Bit Server VM warning: CodeCache is full. Compiler has been disabled. +OpenJDK 64-Bit Server VM warning: Try increasing the code cache size using -XX:ReservedCodeCacheSize= + +You can fix these problems by setting the `MAVEN_OPTS` variable as discussed before. **Note:** -* For Java 8 and above this step is not required. -* If using `build/mvn` with no `MAVEN_OPTS` set, the script will automate this for you. +* If using `build/mvn` with no `MAVEN_OPTS` set, the script will automatically add the above options to the `MAVEN_OPTS` environment variable. +* The `test` phase of the Spark build will automatically add these options to `MAVEN_OPTS`, even when not using `build/mvn`. +* You may see warnings like "ignoring option MaxPermSize=1g; support was removed in 8.0" when building or running tests with Java 8 and `build/mvn`. These warnings are harmless. + ### build/mvn - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org