spark git commit: [SPARK-17717][SQL] Add exist/find methods to Catalog.
Repository: spark Updated Branches: refs/heads/master 2f7395670 -> 74ac1c438 [SPARK-17717][SQL] Add exist/find methods to Catalog. ## What changes were proposed in this pull request? The current user facing catalog does not implement methods for checking object existence or finding objects. You could theoretically do this using the `list*` commands, but this is rather cumbersome and can actually be costly when there are many objects. This PR adds `exists*` and `find*` methods for Databases, Table and Functions. ## How was this patch tested? Added tests to `org.apache.spark.sql.internal.CatalogSuite` Author: Herman van HovellCloses #15301 from hvanhovell/SPARK-17717. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/74ac1c43 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/74ac1c43 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/74ac1c43 Branch: refs/heads/master Commit: 74ac1c43817c0b8da70342e540ec7638dd7d01bd Parents: 2f73956 Author: Herman van Hovell Authored: Thu Sep 29 17:56:32 2016 -0700 Committer: Reynold Xin Committed: Thu Sep 29 17:56:32 2016 -0700 -- project/MimaExcludes.scala | 11 +- .../org/apache/spark/sql/catalog/Catalog.scala | 83 ++ .../apache/spark/sql/internal/CatalogImpl.scala | 152 --- .../spark/sql/internal/CatalogSuite.scala | 118 ++ 4 files changed, 339 insertions(+), 25 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/74ac1c43/project/MimaExcludes.scala -- diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index 4db3edb..2ffe0ac 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -46,7 +46,16 @@ object MimaExcludes { // [SPARK-16967] Move Mesos to Module ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.SparkMasterRegex.MESOS_REGEX"), // [SPARK-16240] ML persistence backward compatibility for LDA - ProblemFilters.exclude[MissingTypesProblem]("org.apache.spark.ml.clustering.LDA$") + ProblemFilters.exclude[MissingTypesProblem]("org.apache.spark.ml.clustering.LDA$"), + // [SPARK-17717] Add Find and Exists method to Catalog. + ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.catalog.Catalog.findDatabase"), + ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.catalog.Catalog.findTable"), + ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.catalog.Catalog.findFunction"), + ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.catalog.Catalog.findColumn"), + ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.catalog.Catalog.databaseExists"), + ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.catalog.Catalog.tableExists"), + ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.catalog.Catalog.functionExists"), + ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.catalog.Catalog.columnExists") ) } http://git-wip-us.apache.org/repos/asf/spark/blob/74ac1c43/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala index 1aed245..b439022 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala @@ -102,6 +102,89 @@ abstract class Catalog { def listColumns(dbName: String, tableName: String): Dataset[Column] /** + * Find the database with the specified name. This throws an AnalysisException when the database + * cannot be found. + * + * @since 2.1.0 + */ + @throws[AnalysisException]("database does not exist") + def findDatabase(dbName: String): Database + + /** + * Find the table with the specified name. This table can be a temporary table or a table in the + * current database. This throws an AnalysisException when the table cannot be found. + * + * @since 2.1.0 + */ + @throws[AnalysisException]("table does not exist") + def findTable(tableName: String): Table + + /** + * Find the table with the specified name in the specified database. This throws an + * AnalysisException when the table cannot be found. + * + * @since 2.1.0 + */ + @throws[AnalysisException]("database or table does not exist") + def findTable(dbName:
spark git commit: Updated the following PR with minor changes to allow cherry-pick to branch-2.0
Repository: spark Updated Branches: refs/heads/branch-2.0 0cdd7370a -> a99ea4c9e Updated the following PR with minor changes to allow cherry-pick to branch-2.0 [SPARK-17697][ML] Fixed bug in summary calculations that pattern match against label without casting In calling LogisticRegression.evaluate and GeneralizedLinearRegression.evaluate using a Dataset where the Label is not of a double type, calculations pattern match against a double and throw a MatchError. This fix casts the Label column to a DoubleType to ensure there is no MatchError. Added unit tests to call evaluate with a dataset that has Label as other numeric types. Author: Bryan CutlerCloses #15288 from BryanCutler/binaryLOR-numericCheck-SPARK-17697. (cherry picked from commit 2f739567080d804a942cfcca0e22f91ab7cbea36) Signed-off-by: Joseph K. Bradley Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a99ea4c9 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a99ea4c9 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a99ea4c9 Branch: refs/heads/branch-2.0 Commit: a99ea4c9e0e2f91e4b524987788f0acee88e564d Parents: 0cdd737 Author: Bryan Cutler Authored: Thu Sep 29 16:31:30 2016 -0700 Committer: Joseph K. Bradley Committed: Thu Sep 29 16:56:34 2016 -0700 -- .../ml/classification/LogisticRegression.scala | 2 +- .../GeneralizedLinearRegression.scala | 11 + .../LogisticRegressionSuite.scala | 18 +- .../GeneralizedLinearRegressionSuite.scala | 25 4 files changed, 49 insertions(+), 7 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/a99ea4c9/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala index cca3374..c50ee5d 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala @@ -860,7 +860,7 @@ class BinaryLogisticRegressionSummary private[classification] ( // TODO: Allow the user to vary the number of bins using a setBins method in // BinaryClassificationMetrics. For now the default is set to 100. @transient private val binaryMetrics = new BinaryClassificationMetrics( -predictions.select(probabilityCol, labelCol).rdd.map { +predictions.select(col(probabilityCol), col(labelCol).cast(DoubleType)).rdd.map { case Row(score: Vector, label: Double) => (score(1), label) }, 100 ) http://git-wip-us.apache.org/repos/asf/spark/blob/a99ea4c9/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala index 2bdc09e..7f88c12 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala @@ -988,7 +988,7 @@ class GeneralizedLinearRegressionSummary private[regression] ( } else { link.unlink(0.0) } -predictions.select(col(model.getLabelCol), w).rdd.map { +predictions.select(col(model.getLabelCol).cast(DoubleType), w).rdd.map { case Row(y: Double, weight: Double) => family.deviance(y, wtdmu, weight) }.sum() @@ -1000,7 +1000,7 @@ class GeneralizedLinearRegressionSummary private[regression] ( @Since("2.0.0") lazy val deviance: Double = { val w = weightCol -predictions.select(col(model.getLabelCol), col(predictionCol), w).rdd.map { +predictions.select(col(model.getLabelCol).cast(DoubleType), col(predictionCol), w).rdd.map { case Row(label: Double, pred: Double, weight: Double) => family.deviance(label, pred, weight) }.sum() @@ -1026,9 +1026,10 @@ class GeneralizedLinearRegressionSummary private[regression] ( lazy val aic: Double = { val w = weightCol val weightSum = predictions.select(w).agg(sum(w)).first().getDouble(0) -val t = predictions.select(col(model.getLabelCol), col(predictionCol), w).rdd.map { - case Row(label: Double, pred: Double, weight: Double) => -(label, pred, weight) +val t = predictions.select( + col(model.getLabelCol).cast(DoubleType), col(predictionCol), w).rdd.map { +
spark git commit: [SPARK-17697][ML] Fixed bug in summary calculations that pattern match against label without casting
Repository: spark Updated Branches: refs/heads/master 39eb3bb1e -> 2f7395670 [SPARK-17697][ML] Fixed bug in summary calculations that pattern match against label without casting ## What changes were proposed in this pull request? In calling LogisticRegression.evaluate and GeneralizedLinearRegression.evaluate using a Dataset where the Label is not of a double type, calculations pattern match against a double and throw a MatchError. This fix casts the Label column to a DoubleType to ensure there is no MatchError. ## How was this patch tested? Added unit tests to call evaluate with a dataset that has Label as other numeric types. Author: Bryan CutlerCloses #15288 from BryanCutler/binaryLOR-numericCheck-SPARK-17697. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2f739567 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2f739567 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2f739567 Branch: refs/heads/master Commit: 2f739567080d804a942cfcca0e22f91ab7cbea36 Parents: 39eb3bb Author: Bryan Cutler Authored: Thu Sep 29 16:31:30 2016 -0700 Committer: Joseph K. Bradley Committed: Thu Sep 29 16:31:30 2016 -0700 -- .../ml/classification/LogisticRegression.scala | 2 +- .../GeneralizedLinearRegression.scala | 11 + .../LogisticRegressionSuite.scala | 18 +- .../GeneralizedLinearRegressionSuite.scala | 25 4 files changed, 49 insertions(+), 7 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/2f739567/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala index 5ab63d1..329961a 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala @@ -1169,7 +1169,7 @@ class BinaryLogisticRegressionSummary private[classification] ( // TODO: Allow the user to vary the number of bins using a setBins method in // BinaryClassificationMetrics. For now the default is set to 100. @transient private val binaryMetrics = new BinaryClassificationMetrics( -predictions.select(probabilityCol, labelCol).rdd.map { +predictions.select(col(probabilityCol), col(labelCol).cast(DoubleType)).rdd.map { case Row(score: Vector, label: Double) => (score(1), label) }, 100 ) http://git-wip-us.apache.org/repos/asf/spark/blob/2f739567/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala index 02b27fb..bb9e150 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala @@ -992,7 +992,7 @@ class GeneralizedLinearRegressionSummary private[regression] ( } else { link.unlink(0.0) } -predictions.select(col(model.getLabelCol), w).rdd.map { +predictions.select(col(model.getLabelCol).cast(DoubleType), w).rdd.map { case Row(y: Double, weight: Double) => family.deviance(y, wtdmu, weight) }.sum() @@ -1004,7 +1004,7 @@ class GeneralizedLinearRegressionSummary private[regression] ( @Since("2.0.0") lazy val deviance: Double = { val w = weightCol -predictions.select(col(model.getLabelCol), col(predictionCol), w).rdd.map { +predictions.select(col(model.getLabelCol).cast(DoubleType), col(predictionCol), w).rdd.map { case Row(label: Double, pred: Double, weight: Double) => family.deviance(label, pred, weight) }.sum() @@ -1030,9 +1030,10 @@ class GeneralizedLinearRegressionSummary private[regression] ( lazy val aic: Double = { val w = weightCol val weightSum = predictions.select(w).agg(sum(w)).first().getDouble(0) -val t = predictions.select(col(model.getLabelCol), col(predictionCol), w).rdd.map { - case Row(label: Double, pred: Double, weight: Double) => -(label, pred, weight) +val t = predictions.select( + col(model.getLabelCol).cast(DoubleType), col(predictionCol), w).rdd.map { +case Row(label: Double, pred: Double, weight: Double) => + (label, pred, weight) } family.aic(t, deviance,
spark git commit: [SPARK-17412][DOC] All test should not be run by `root` or any admin user
Repository: spark Updated Branches: refs/heads/master 3993ebca2 -> 39eb3bb1e [SPARK-17412][DOC] All test should not be run by `root` or any admin user ## What changes were proposed in this pull request? `FsHistoryProviderSuite` fails if `root` user runs it. The test case **SPARK-3697: ignore directories that cannot be read** depends on `setReadable(false, false)` to make test data files and expects the number of accessible files is 1. But, `root` can access all files, so it returns 2. This PR adds the assumption explicitly on doc. `building-spark.md`. ## How was this patch tested? This is a documentation change. Author: Dongjoon HyunCloses #15291 from dongjoon-hyun/SPARK-17412. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/39eb3bb1 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/39eb3bb1 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/39eb3bb1 Branch: refs/heads/master Commit: 39eb3bb1ec29aa993de13a6eba3ab27db6fc5371 Parents: 3993ebc Author: Dongjoon Hyun Authored: Thu Sep 29 16:01:45 2016 -0700 Committer: Marcelo Vanzin Committed: Thu Sep 29 16:01:45 2016 -0700 -- docs/building-spark.md | 1 + 1 file changed, 1 insertion(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/39eb3bb1/docs/building-spark.md -- diff --git a/docs/building-spark.md b/docs/building-spark.md index 75c304a3..da7eeb8 100644 --- a/docs/building-spark.md +++ b/docs/building-spark.md @@ -215,6 +215,7 @@ For help in setting up IntelliJ IDEA or Eclipse for Spark development, and troub # Running Tests Tests are run by default via the [ScalaTest Maven plugin](http://www.scalatest.org/user_guide/using_the_scalatest_maven_plugin). +Note that tests should not be run as root or an admin user. Some of the tests require Spark to be packaged first, so always run `mvn package` with `-DskipTests` the first time. The following is an example of a correct (build, test) sequence: - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-17676][CORE] FsHistoryProvider should ignore hidden files
Repository: spark Updated Branches: refs/heads/master 29396e7d1 -> 3993ebca2 [SPARK-17676][CORE] FsHistoryProvider should ignore hidden files ## What changes were proposed in this pull request? FsHistoryProvider was writing a hidden file (to check the fs's clock). Even though it deleted the file immediately, sometimes another thread would try to scan the files on the fs in-between, and then there would be an error msg logged which was very misleading for the end-user. (The logged error was harmless, though.) ## How was this patch tested? I added one unit test, but to be clear, that test was passing before. The actual change in behavior in that test is just logging (after the change, there is no more logged error), which I just manually verified. Author: Imran RashidCloses #15250 from squito/SPARK-17676. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3993ebca Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3993ebca Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3993ebca Branch: refs/heads/master Commit: 3993ebca23afa4b8770695051635933a6c9d2c11 Parents: 29396e7 Author: Imran Rashid Authored: Thu Sep 29 15:40:35 2016 -0700 Committer: Marcelo Vanzin Committed: Thu Sep 29 15:40:35 2016 -0700 -- .../deploy/history/FsHistoryProvider.scala | 7 +++- .../deploy/history/FsHistoryProviderSuite.scala | 36 ++-- 2 files changed, 40 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/3993ebca/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala -- diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index d494ff0..c5740e4 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -294,7 +294,12 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) .filter { entry => try { val prevFileSize = fileToAppInfo.get(entry.getPath()).map{_.fileSize}.getOrElse(0L) -!entry.isDirectory() && prevFileSize < entry.getLen() +!entry.isDirectory() && + // FsHistoryProvider generates a hidden file which can't be read. Accidentally + // reading a garbage file is safe, but we would log an error which can be scary to + // the end-user. + !entry.getPath().getName().startsWith(".") && + prevFileSize < entry.getLen() } catch { case e: AccessControlException => // Do not use "logInfo" since these messages can get pretty noisy if printed on http://git-wip-us.apache.org/repos/asf/spark/blob/3993ebca/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala -- diff --git a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala index 39c5857..01bef0a 100644 --- a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala @@ -17,8 +17,7 @@ package org.apache.spark.deploy.history -import java.io.{BufferedOutputStream, ByteArrayInputStream, ByteArrayOutputStream, File, - FileOutputStream, OutputStreamWriter} +import java.io._ import java.net.URI import java.nio.charset.StandardCharsets import java.util.concurrent.TimeUnit @@ -394,6 +393,39 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc } } + test("ignore hidden files") { + +// FsHistoryProvider should ignore hidden files. (It even writes out a hidden file itself +// that should be ignored). + +// write out one totally bogus hidden file +val hiddenGarbageFile = new File(testDir, ".garbage") +val out = new PrintWriter(hiddenGarbageFile) +// scalastyle:off println +out.println("GARBAGE") +// scalastyle:on println +out.close() + +// also write out one real event log file, but since its a hidden file, we shouldn't read it +val tmpNewAppFile = newLogFile("hidden", None, inProgress = false) +val hiddenNewAppFile = new File(tmpNewAppFile.getParentFile, "." + tmpNewAppFile.getName) +tmpNewAppFile.renameTo(hiddenNewAppFile) + +// and write one real file, which should still get picked up just fine +val newAppComplete =
spark git commit: [SPARK-17721][MLLIB][ML] Fix for multiplying transposed SparseMatrix with SparseVector
Repository: spark Updated Branches: refs/heads/master 4ecc648ad -> 29396e7d1 [SPARK-17721][MLLIB][ML] Fix for multiplying transposed SparseMatrix with SparseVector ## What changes were proposed in this pull request? * changes the implementation of gemv with transposed SparseMatrix and SparseVector both in mllib-local and mllib (identical) * adds a test that was failing before this change, but succeeds with these changes. The problem in the previous implementation was that it only increments `i`, that is enumerating the columns of a row in the SparseMatrix, when the row-index of the vector matches the column-index of the SparseMatrix. In cases where a particular row of the SparseMatrix has non-zero values at column-indices lower than corresponding non-zero row-indices of the SparseVector, the non-zero values of the SparseVector are enumerated without ever matching the column-index at index `i` and the remaining column-indices i+1,...,indEnd-1 are never attempted. The test cases in this PR illustrate this issue. ## How was this patch tested? I have run the specific `gemv` tests in both mllib-local and mllib. I am currently still running `./dev/run-tests`. ## ___ As per instructions, I hereby state that this is my original work and that I license the work to the project (Apache Spark) under the project's open source license. Mentioning dbtsai, viirya and brkyvz whom I can see have worked/authored on these parts before. Author: Bjarne FruergaardCloses #15296 from bwahlgreen/bugfix-spark-17721. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/29396e7d Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/29396e7d Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/29396e7d Branch: refs/heads/master Commit: 29396e7d1483d027960b9a1bed47008775c4253e Parents: 4ecc648 Author: Bjarne Fruergaard Authored: Thu Sep 29 15:39:57 2016 -0700 Committer: Joseph K. Bradley Committed: Thu Sep 29 15:39:57 2016 -0700 -- .../scala/org/apache/spark/ml/linalg/BLAS.scala| 8 ++-- .../org/apache/spark/ml/linalg/BLASSuite.scala | 17 + .../scala/org/apache/spark/mllib/linalg/BLAS.scala | 8 ++-- .../org/apache/spark/mllib/linalg/BLASSuite.scala | 17 + 4 files changed, 46 insertions(+), 4 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/29396e7d/mllib-local/src/main/scala/org/apache/spark/ml/linalg/BLAS.scala -- diff --git a/mllib-local/src/main/scala/org/apache/spark/ml/linalg/BLAS.scala b/mllib-local/src/main/scala/org/apache/spark/ml/linalg/BLAS.scala index 41b0c6c..4ca19f3 100644 --- a/mllib-local/src/main/scala/org/apache/spark/ml/linalg/BLAS.scala +++ b/mllib-local/src/main/scala/org/apache/spark/ml/linalg/BLAS.scala @@ -638,12 +638,16 @@ private[spark] object BLAS extends Serializable { val indEnd = Arows(rowCounter + 1) var sum = 0.0 var k = 0 -while (k < xNnz && i < indEnd) { +while (i < indEnd && k < xNnz) { if (xIndices(k) == Acols(i)) { sum += Avals(i) * xValues(k) +k += 1 +i += 1 + } else if (xIndices(k) < Acols(i)) { +k += 1 + } else { i += 1 } - k += 1 } yValues(rowCounter) = sum * alpha + beta * yValues(rowCounter) rowCounter += 1 http://git-wip-us.apache.org/repos/asf/spark/blob/29396e7d/mllib-local/src/test/scala/org/apache/spark/ml/linalg/BLASSuite.scala -- diff --git a/mllib-local/src/test/scala/org/apache/spark/ml/linalg/BLASSuite.scala b/mllib-local/src/test/scala/org/apache/spark/ml/linalg/BLASSuite.scala index 8a9f497..6e72a5f 100644 --- a/mllib-local/src/test/scala/org/apache/spark/ml/linalg/BLASSuite.scala +++ b/mllib-local/src/test/scala/org/apache/spark/ml/linalg/BLASSuite.scala @@ -392,6 +392,23 @@ class BLASSuite extends SparkMLFunSuite { } } +val y17 = new DenseVector(Array(0.0, 0.0)) +val y18 = y17.copy + +val sA3 = new SparseMatrix(3, 2, Array(0, 2, 4), Array(1, 2, 0, 1), Array(2.0, 1.0, 1.0, 2.0)) + .transpose +val sA4 = + new SparseMatrix(2, 3, Array(0, 1, 3, 4), Array(1, 0, 1, 0), Array(1.0, 2.0, 2.0, 1.0)) +val sx3 = new SparseVector(3, Array(1, 2), Array(2.0, 1.0)) + +val expected4 = new DenseVector(Array(5.0, 4.0)) + +gemv(1.0, sA3, sx3, 0.0, y17) +gemv(1.0, sA4, sx3, 0.0, y18) + +assert(y17 ~== expected4 absTol 1e-15) +assert(y18 ~== expected4 absTol 1e-15) + val dAT = new DenseMatrix(3, 4, Array(0.0, 2.0, 0.0, 1.0, 0.0, 0.0,
spark git commit: [SPARK-17721][MLLIB][ML] Fix for multiplying transposed SparseMatrix with SparseVector
Repository: spark Updated Branches: refs/heads/branch-2.0 7c9450b00 -> 0cdd7370a [SPARK-17721][MLLIB][ML] Fix for multiplying transposed SparseMatrix with SparseVector ## What changes were proposed in this pull request? * changes the implementation of gemv with transposed SparseMatrix and SparseVector both in mllib-local and mllib (identical) * adds a test that was failing before this change, but succeeds with these changes. The problem in the previous implementation was that it only increments `i`, that is enumerating the columns of a row in the SparseMatrix, when the row-index of the vector matches the column-index of the SparseMatrix. In cases where a particular row of the SparseMatrix has non-zero values at column-indices lower than corresponding non-zero row-indices of the SparseVector, the non-zero values of the SparseVector are enumerated without ever matching the column-index at index `i` and the remaining column-indices i+1,...,indEnd-1 are never attempted. The test cases in this PR illustrate this issue. ## How was this patch tested? I have run the specific `gemv` tests in both mllib-local and mllib. I am currently still running `./dev/run-tests`. ## ___ As per instructions, I hereby state that this is my original work and that I license the work to the project (Apache Spark) under the project's open source license. Mentioning dbtsai, viirya and brkyvz whom I can see have worked/authored on these parts before. Author: Bjarne FruergaardCloses #15296 from bwahlgreen/bugfix-spark-17721. (cherry picked from commit 29396e7d1483d027960b9a1bed47008775c4253e) Signed-off-by: Joseph K. Bradley Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0cdd7370 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0cdd7370 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0cdd7370 Branch: refs/heads/branch-2.0 Commit: 0cdd7370a61618d042417ee387a3c32ee5c924e6 Parents: 7c9450b Author: Bjarne Fruergaard Authored: Thu Sep 29 15:39:57 2016 -0700 Committer: Joseph K. Bradley Committed: Thu Sep 29 15:40:08 2016 -0700 -- .../scala/org/apache/spark/ml/linalg/BLAS.scala| 8 ++-- .../org/apache/spark/ml/linalg/BLASSuite.scala | 17 + .../scala/org/apache/spark/mllib/linalg/BLAS.scala | 8 ++-- .../org/apache/spark/mllib/linalg/BLASSuite.scala | 17 + 4 files changed, 46 insertions(+), 4 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/0cdd7370/mllib-local/src/main/scala/org/apache/spark/ml/linalg/BLAS.scala -- diff --git a/mllib-local/src/main/scala/org/apache/spark/ml/linalg/BLAS.scala b/mllib-local/src/main/scala/org/apache/spark/ml/linalg/BLAS.scala index 41b0c6c..4ca19f3 100644 --- a/mllib-local/src/main/scala/org/apache/spark/ml/linalg/BLAS.scala +++ b/mllib-local/src/main/scala/org/apache/spark/ml/linalg/BLAS.scala @@ -638,12 +638,16 @@ private[spark] object BLAS extends Serializable { val indEnd = Arows(rowCounter + 1) var sum = 0.0 var k = 0 -while (k < xNnz && i < indEnd) { +while (i < indEnd && k < xNnz) { if (xIndices(k) == Acols(i)) { sum += Avals(i) * xValues(k) +k += 1 +i += 1 + } else if (xIndices(k) < Acols(i)) { +k += 1 + } else { i += 1 } - k += 1 } yValues(rowCounter) = sum * alpha + beta * yValues(rowCounter) rowCounter += 1 http://git-wip-us.apache.org/repos/asf/spark/blob/0cdd7370/mllib-local/src/test/scala/org/apache/spark/ml/linalg/BLASSuite.scala -- diff --git a/mllib-local/src/test/scala/org/apache/spark/ml/linalg/BLASSuite.scala b/mllib-local/src/test/scala/org/apache/spark/ml/linalg/BLASSuite.scala index 8a9f497..6e72a5f 100644 --- a/mllib-local/src/test/scala/org/apache/spark/ml/linalg/BLASSuite.scala +++ b/mllib-local/src/test/scala/org/apache/spark/ml/linalg/BLASSuite.scala @@ -392,6 +392,23 @@ class BLASSuite extends SparkMLFunSuite { } } +val y17 = new DenseVector(Array(0.0, 0.0)) +val y18 = y17.copy + +val sA3 = new SparseMatrix(3, 2, Array(0, 2, 4), Array(1, 2, 0, 1), Array(2.0, 1.0, 1.0, 2.0)) + .transpose +val sA4 = + new SparseMatrix(2, 3, Array(0, 1, 3, 4), Array(1, 0, 1, 0), Array(1.0, 2.0, 2.0, 1.0)) +val sx3 = new SparseVector(3, Array(1, 2), Array(2.0, 1.0)) + +val expected4 = new DenseVector(Array(5.0, 4.0)) + +gemv(1.0, sA3, sx3, 0.0, y17) +gemv(1.0, sA4, sx3, 0.0, y18) + +assert(y17 ~== expected4
spark git commit: [SPARK-17612][SQL] Support `DESCRIBE table PARTITION` SQL syntax
Repository: spark Updated Branches: refs/heads/master 566d7f282 -> 4ecc648ad [SPARK-17612][SQL] Support `DESCRIBE table PARTITION` SQL syntax ## What changes were proposed in this pull request? This PR implements `DESCRIBE table PARTITION` SQL Syntax again. It was supported until Spark 1.6.2, but was dropped since 2.0.0. **Spark 1.6.2** ```scala scala> sql("CREATE TABLE partitioned_table (a STRING, b INT) PARTITIONED BY (c STRING, d STRING)") res1: org.apache.spark.sql.DataFrame = [result: string] scala> sql("ALTER TABLE partitioned_table ADD PARTITION (c='Us', d=1)") res2: org.apache.spark.sql.DataFrame = [result: string] scala> sql("DESC partitioned_table PARTITION (c='Us', d=1)").show(false) ++ |result | ++ |a string | |b int | |c string | |d string | || |# Partition Information | |# col_name data_type comment | || |c string | |d string | ++ ``` **Spark 2.0** - **Before** ```scala scala> sql("CREATE TABLE partitioned_table (a STRING, b INT) PARTITIONED BY (c STRING, d STRING)") res0: org.apache.spark.sql.DataFrame = [] scala> sql("ALTER TABLE partitioned_table ADD PARTITION (c='Us', d=1)") res1: org.apache.spark.sql.DataFrame = [] scala> sql("DESC partitioned_table PARTITION (c='Us', d=1)").show(false) org.apache.spark.sql.catalyst.parser.ParseException: Unsupported SQL statement ``` - **After** ```scala scala> sql("CREATE TABLE partitioned_table (a STRING, b INT) PARTITIONED BY (c STRING, d STRING)") res0: org.apache.spark.sql.DataFrame = [] scala> sql("ALTER TABLE partitioned_table ADD PARTITION (c='Us', d=1)") res1: org.apache.spark.sql.DataFrame = [] scala> sql("DESC partitioned_table PARTITION (c='Us', d=1)").show(false) +---+-+---+ |col_name |data_type|comment| +---+-+---+ |a |string |null | |b |int |null | |c |string |null | |d |string |null | |# Partition Information| | | |# col_name |data_type|comment| |c |string |null | |d |string |null | +---+-+---+ scala> sql("DESC EXTENDED partitioned_table PARTITION (c='Us', d=1)").show(100,false) +---+-+---+ |col_name |data_type|comment| +---+-+---+ |a
spark git commit: [SPARK-17653][SQL] Remove unnecessary distincts in multiple unions
Repository: spark Updated Branches: refs/heads/master fe33121a5 -> 566d7f282 [SPARK-17653][SQL] Remove unnecessary distincts in multiple unions ## What changes were proposed in this pull request? Currently for `Union [Distinct]`, a `Distinct` operator is necessary to be on the top of `Union`. Once there are adjacent `Union [Distinct]`, there will be multiple `Distinct` in the query plan. E.g., For a query like: select 1 a union select 2 b union select 3 c Before this patch, its physical plan looks like: *HashAggregate(keys=[a#13], functions=[]) +- Exchange hashpartitioning(a#13, 200) +- *HashAggregate(keys=[a#13], functions=[]) +- Union :- *HashAggregate(keys=[a#13], functions=[]) : +- Exchange hashpartitioning(a#13, 200) : +- *HashAggregate(keys=[a#13], functions=[]) :+- Union : :- *Project [1 AS a#13] : : +- Scan OneRowRelation[] : +- *Project [2 AS b#14] : +- Scan OneRowRelation[] +- *Project [3 AS c#15] +- Scan OneRowRelation[] Only the top distinct should be necessary. After this patch, the physical plan looks like: *HashAggregate(keys=[a#221], functions=[], output=[a#221]) +- Exchange hashpartitioning(a#221, 5) +- *HashAggregate(keys=[a#221], functions=[], output=[a#221]) +- Union :- *Project [1 AS a#221] : +- Scan OneRowRelation[] :- *Project [2 AS b#222] : +- Scan OneRowRelation[] +- *Project [3 AS c#223] +- Scan OneRowRelation[] ## How was this patch tested? Jenkins tests. Author: Liang-Chi HsiehCloses #15238 from viirya/remove-extra-distinct-union. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/566d7f28 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/566d7f28 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/566d7f28 Branch: refs/heads/master Commit: 566d7f28275f90f7b9bed6a75e90989ad0c59931 Parents: fe33121 Author: Liang-Chi Hsieh Authored: Thu Sep 29 14:30:23 2016 -0700 Committer: Herman van Hovell Committed: Thu Sep 29 14:30:23 2016 -0700 -- .../sql/catalyst/optimizer/Optimizer.scala | 24 ++- .../spark/sql/catalyst/planning/patterns.scala | 27 .../catalyst/optimizer/SetOperationSuite.scala | 68 3 files changed, 89 insertions(+), 30 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/566d7f28/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 4952ba3..9df8ce1 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.catalyst.optimizer import scala.annotation.tailrec import scala.collection.immutable.HashSet +import scala.collection.mutable import scala.collection.mutable.ArrayBuffer import org.apache.spark.api.java.function.FilterFunction @@ -29,7 +30,7 @@ import org.apache.spark.sql.catalyst.catalog.{InMemoryCatalog, SessionCatalog} import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate._ import org.apache.spark.sql.catalyst.expressions.Literal.{FalseLiteral, TrueLiteral} -import org.apache.spark.sql.catalyst.planning.{ExtractFiltersAndInnerJoins, Unions} +import org.apache.spark.sql.catalyst.planning.ExtractFiltersAndInnerJoins import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules._ @@ -579,8 +580,25 @@ object InferFiltersFromConstraints extends Rule[LogicalPlan] with PredicateHelpe * Combines all adjacent [[Union]] operators into a single [[Union]]. */ object CombineUnions extends Rule[LogicalPlan] { - def apply(plan: LogicalPlan): LogicalPlan = plan transform { -case Unions(children) => Union(children) + def apply(plan: LogicalPlan): LogicalPlan = plan transformDown { +case u: Union => flattenUnion(u, false) +case Distinct(u: Union) => Distinct(flattenUnion(u, true)) + } + + private def flattenUnion(union: Union, flattenDistinct: Boolean): Union = { +val stack = mutable.Stack[LogicalPlan](union) +val flattened = mutable.ArrayBuffer.empty[LogicalPlan] +
spark git commit: [SPARK-17699] Support for parsing JSON string columns
Repository: spark Updated Branches: refs/heads/master 027dea8f2 -> fe33121a5 [SPARK-17699] Support for parsing JSON string columns Spark SQL has great support for reading text files that contain JSON data. However, in many cases the JSON data is just one column amongst others. This is particularly true when reading from sources such as Kafka. This PR adds a new functions `from_json` that converts a string column into a nested `StructType` with a user specified schema. Example usage: ```scala val df = Seq("""{"a": 1}""").toDS() val schema = new StructType().add("a", IntegerType) df.select(from_json($"value", schema) as 'json) // => [json: ] ``` This PR adds support for java, scala and python. I leveraged our existing JSON parsing support by moving it into catalyst (so that we could define expressions using it). I left SQL out for now, because I'm not sure how users would specify a schema. Author: Michael ArmbrustCloses #15274 from marmbrus/jsonParser. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fe33121a Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fe33121a Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fe33121a Branch: refs/heads/master Commit: fe33121a53384811a8e094ab6c05dc85b7c7ca87 Parents: 027dea8 Author: Michael Armbrust Authored: Thu Sep 29 13:01:10 2016 -0700 Committer: Yin Huai Committed: Thu Sep 29 13:01:10 2016 -0700 -- python/pyspark/sql/functions.py | 23 + .../catalyst/expressions/jsonExpressions.scala | 31 +- .../spark/sql/catalyst/json/JSONOptions.scala | 84 .../spark/sql/catalyst/json/JacksonParser.scala | 443 +++ .../spark/sql/catalyst/json/JacksonUtils.scala | 32 ++ .../sql/catalyst/util/CompressionCodecs.scala | 72 +++ .../spark/sql/catalyst/util/ParseModes.scala| 41 ++ .../expressions/JsonExpressionsSuite.scala | 26 ++ .../org/apache/spark/sql/DataFrameReader.scala | 5 +- .../datasources/CompressionCodecs.scala | 72 --- .../sql/execution/datasources/ParseModes.scala | 41 -- .../datasources/csv/CSVFileFormat.scala | 1 + .../execution/datasources/csv/CSVOptions.scala | 2 +- .../datasources/json/InferSchema.scala | 3 +- .../datasources/json/JSONOptions.scala | 84 .../datasources/json/JacksonGenerator.scala | 3 +- .../datasources/json/JacksonParser.scala| 440 -- .../datasources/json/JacksonUtils.scala | 32 -- .../datasources/json/JsonFileFormat.scala | 2 + .../datasources/text/TextFileFormat.scala | 1 + .../scala/org/apache/spark/sql/functions.scala | 58 +++ .../apache/spark/sql/JsonFunctionsSuite.scala | 29 ++ .../json/JsonParsingOptionsSuite.scala | 1 + .../execution/datasources/json/JsonSuite.scala | 3 +- 24 files changed, 852 insertions(+), 677 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/fe33121a/python/pyspark/sql/functions.py -- diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py index 89b3c07..45d6bf9 100644 --- a/python/pyspark/sql/functions.py +++ b/python/pyspark/sql/functions.py @@ -1706,6 +1706,29 @@ def json_tuple(col, *fields): return Column(jc) +@since(2.1) +def from_json(col, schema, options={}): +""" +Parses a column containing a JSON string into a [[StructType]] with the +specified schema. Returns `null`, in the case of an unparseable string. + +:param col: string column in json format +:param schema: a StructType to use when parsing the json column +:param options: options to control parsing. accepts the same options as the json datasource + +>>> from pyspark.sql.types import * +>>> data = [(1, '''{"a": 1}''')] +>>> schema = StructType([StructField("a", IntegerType())]) +>>> df = spark.createDataFrame(data, ("key", "value")) +>>> df.select(from_json(df.value, schema).alias("json")).collect() +[Row(json=Row(a=1))] +""" + +sc = SparkContext._active_spark_context +jc = sc._jvm.functions.from_json(_to_java_column(col), schema.json(), options) +return Column(jc) + + @since(1.5) def size(col): """ http://git-wip-us.apache.org/repos/asf/spark/blob/fe33121a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala index c14a2fb..65dbd6a 100644 ---
spark git commit: [SPARK-17715][SCHEDULER] Make task launch logs DEBUG
Repository: spark Updated Branches: refs/heads/master cb87b3ced -> 027dea8f2 [SPARK-17715][SCHEDULER] Make task launch logs DEBUG ## What changes were proposed in this pull request? Ramp down the task launch logs from INFO to DEBUG. Task launches can happen orders of magnitude more than executor registration so it makes the logs easier to handle if they are different log levels. For larger jobs, there can be 100,000s of task launches which makes the driver log huge. ## How was this patch tested? No tests, as this is a trivial change. Author: Brian ChoCloses #15290 from dafrista/ramp-down-task-logging. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/027dea8f Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/027dea8f Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/027dea8f Branch: refs/heads/master Commit: 027dea8f294504bc5cd8bfedde546d171cb78657 Parents: cb87b3c Author: Brian Cho Authored: Thu Sep 29 15:59:17 2016 -0400 Committer: Andrew Or Committed: Thu Sep 29 15:59:17 2016 -0400 -- .../spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala| 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/027dea8f/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala -- diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 2d09863..0dae0e6 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -265,7 +265,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp val executorData = executorDataMap(task.executorId) executorData.freeCores -= scheduler.CPUS_PER_TASK - logInfo(s"Launching task ${task.taskId} on executor id: ${task.executorId} hostname: " + + logDebug(s"Launching task ${task.taskId} on executor id: ${task.executorId} hostname: " + s"${executorData.executorHost}.") executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask))) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-17672] Spark 2.0 history server web Ui takes too long for a single application
Repository: spark Updated Branches: refs/heads/branch-2.0 f7839e47c -> 7c9450b00 [SPARK-17672] Spark 2.0 history server web Ui takes too long for a single application Added a new API getApplicationInfo(appId: String) in class ApplicationHistoryProvider and class SparkUI to get app info. In this change, FsHistoryProvider can directly fetch one app info in O(1) time complexity compared to O(n) before the change which used an Iterator.find() interface. Both ApplicationCache and OneApplicationResource classes adopt this new api. manual tests Author: Gang WuCloses #15247 from wgtmac/SPARK-17671. (cherry picked from commit cb87b3ced9453b5717fa8e8637b97a2f3f25fdd7) Signed-off-by: Andrew Or Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7c9450b0 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7c9450b0 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7c9450b0 Branch: refs/heads/branch-2.0 Commit: 7c9450b007205958984f39a881415cdbe75e0c34 Parents: f7839e4 Author: Gang Wu Authored: Thu Sep 29 15:51:05 2016 -0400 Committer: Andrew Or Committed: Thu Sep 29 15:51:57 2016 -0400 -- .../spark/deploy/history/ApplicationHistoryProvider.scala | 5 + .../org/apache/spark/deploy/history/FsHistoryProvider.scala | 4 .../scala/org/apache/spark/deploy/history/HistoryServer.scala | 4 .../scala/org/apache/spark/status/api/v1/ApiRootResource.scala | 1 + .../org/apache/spark/status/api/v1/OneApplicationResource.scala | 2 +- core/src/main/scala/org/apache/spark/ui/SparkUI.scala | 4 6 files changed, 19 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/7c9450b0/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala -- diff --git a/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala index 44661ed..ba42b48 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala @@ -109,4 +109,9 @@ private[history] abstract class ApplicationHistoryProvider { @throws(classOf[SparkException]) def writeEventLogs(appId: String, attemptId: Option[String], zipStream: ZipOutputStream): Unit + /** + * @return the [[ApplicationHistoryInfo]] for the appId if it exists. + */ + def getApplicationInfo(appId: String): Option[ApplicationHistoryInfo] + } http://git-wip-us.apache.org/repos/asf/spark/blob/7c9450b0/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala -- diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index 110d882..cf4a401 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -222,6 +222,10 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) override def getListing(): Iterable[FsApplicationHistoryInfo] = applications.values + override def getApplicationInfo(appId: String): Option[FsApplicationHistoryInfo] = { +applications.get(appId) + } + override def getAppUI(appId: String, attemptId: Option[String]): Option[LoadedAppUI] = { try { applications.get(appId).flatMap { appInfo => http://git-wip-us.apache.org/repos/asf/spark/blob/7c9450b0/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala -- diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala index c178917..735aa43 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala @@ -182,6 +182,10 @@ class HistoryServer( getApplicationList().iterator.map(ApplicationsListResource.appHistoryInfoToPublicAppInfo) } + def getApplicationInfo(appId: String): Option[ApplicationInfo] = { + provider.getApplicationInfo(appId).map(ApplicationsListResource.appHistoryInfoToPublicAppInfo) + } + override def writeEventLogs( appId: String, attemptId: Option[String],
spark git commit: [SPARK-17672] Spark 2.0 history server web Ui takes too long for a single application
Repository: spark Updated Branches: refs/heads/master 7f779e743 -> cb87b3ced [SPARK-17672] Spark 2.0 history server web Ui takes too long for a single application Added a new API getApplicationInfo(appId: String) in class ApplicationHistoryProvider and class SparkUI to get app info. In this change, FsHistoryProvider can directly fetch one app info in O(1) time complexity compared to O(n) before the change which used an Iterator.find() interface. Both ApplicationCache and OneApplicationResource classes adopt this new api. manual tests Author: Gang WuCloses #15247 from wgtmac/SPARK-17671. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/cb87b3ce Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/cb87b3ce Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/cb87b3ce Branch: refs/heads/master Commit: cb87b3ced9453b5717fa8e8637b97a2f3f25fdd7 Parents: 7f779e7 Author: Gang Wu Authored: Thu Sep 29 15:51:05 2016 -0400 Committer: Andrew Or Committed: Thu Sep 29 15:51:38 2016 -0400 -- .../spark/deploy/history/ApplicationHistoryProvider.scala | 5 + .../org/apache/spark/deploy/history/FsHistoryProvider.scala | 4 .../scala/org/apache/spark/deploy/history/HistoryServer.scala | 4 .../scala/org/apache/spark/status/api/v1/ApiRootResource.scala | 1 + .../org/apache/spark/status/api/v1/OneApplicationResource.scala | 2 +- core/src/main/scala/org/apache/spark/ui/SparkUI.scala | 4 6 files changed, 19 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/cb87b3ce/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala -- diff --git a/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala index 44661ed..ba42b48 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala @@ -109,4 +109,9 @@ private[history] abstract class ApplicationHistoryProvider { @throws(classOf[SparkException]) def writeEventLogs(appId: String, attemptId: Option[String], zipStream: ZipOutputStream): Unit + /** + * @return the [[ApplicationHistoryInfo]] for the appId if it exists. + */ + def getApplicationInfo(appId: String): Option[ApplicationHistoryInfo] + } http://git-wip-us.apache.org/repos/asf/spark/blob/cb87b3ce/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala -- diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index 6874aa5..d494ff0 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -224,6 +224,10 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) override def getListing(): Iterable[FsApplicationHistoryInfo] = applications.values + override def getApplicationInfo(appId: String): Option[FsApplicationHistoryInfo] = { +applications.get(appId) + } + override def getAppUI(appId: String, attemptId: Option[String]): Option[LoadedAppUI] = { try { applications.get(appId).flatMap { appInfo => http://git-wip-us.apache.org/repos/asf/spark/blob/cb87b3ce/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala -- diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala index c178917..735aa43 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala @@ -182,6 +182,10 @@ class HistoryServer( getApplicationList().iterator.map(ApplicationsListResource.appHistoryInfoToPublicAppInfo) } + def getApplicationInfo(appId: String): Option[ApplicationInfo] = { + provider.getApplicationInfo(appId).map(ApplicationsListResource.appHistoryInfoToPublicAppInfo) + } + override def writeEventLogs( appId: String, attemptId: Option[String], http://git-wip-us.apache.org/repos/asf/spark/blob/cb87b3ce/core/src/main/scala/org/apache/spark/status/api/v1/ApiRootResource.scala -- diff --git
spark git commit: [SPARK-17648][CORE] TaskScheduler really needs offers to be an IndexedSeq
Repository: spark Updated Branches: refs/heads/master 958200497 -> 7f779e743 [SPARK-17648][CORE] TaskScheduler really needs offers to be an IndexedSeq ## What changes were proposed in this pull request? The Seq[WorkerOffer] is accessed by index, so it really should be an IndexedSeq, otherwise an O(n) operation becomes O(n^2). In practice this hasn't been an issue b/c where these offers are generated, the call to `.toSeq` just happens to create an IndexedSeq anyway.I got bitten by this in performance tests I was doing, and its better for the types to be more precise so eg. a change in Scala doesn't destroy performance. ## How was this patch tested? Unit tests via jenkins. Author: Imran RashidCloses #15221 from squito/SPARK-17648. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7f779e74 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7f779e74 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7f779e74 Branch: refs/heads/master Commit: 7f779e7439127efa0e3611f7745e1c8423845198 Parents: 9582004 Author: Imran Rashid Authored: Thu Sep 29 15:36:40 2016 -0400 Committer: Andrew Or Committed: Thu Sep 29 15:36:40 2016 -0400 -- .../spark/scheduler/TaskSchedulerImpl.scala | 4 +-- .../cluster/CoarseGrainedSchedulerBackend.scala | 4 +-- .../scheduler/local/LocalSchedulerBackend.scala | 2 +- .../scheduler/SchedulerIntegrationSuite.scala | 7 ++--- .../scheduler/TaskSchedulerImplSuite.scala | 32 ++-- .../MesosFineGrainedSchedulerBackend.scala | 2 +- .../MesosFineGrainedSchedulerBackendSuite.scala | 2 +- 7 files changed, 26 insertions(+), 27 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/7f779e74/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala -- diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 52a7186..0ad4730 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -252,7 +252,7 @@ private[spark] class TaskSchedulerImpl( maxLocality: TaskLocality, shuffledOffers: Seq[WorkerOffer], availableCpus: Array[Int], - tasks: Seq[ArrayBuffer[TaskDescription]]) : Boolean = { + tasks: IndexedSeq[ArrayBuffer[TaskDescription]]) : Boolean = { var launchedTask = false for (i <- 0 until shuffledOffers.size) { val execId = shuffledOffers(i).executorId @@ -286,7 +286,7 @@ private[spark] class TaskSchedulerImpl( * sets for tasks in order of priority. We fill each node with tasks in a round-robin manner so * that tasks are balanced across the cluster. */ - def resourceOffers(offers: Seq[WorkerOffer]): Seq[Seq[TaskDescription]] = synchronized { + def resourceOffers(offers: IndexedSeq[WorkerOffer]): Seq[Seq[TaskDescription]] = synchronized { // Mark each slave as alive and remember its hostname // Also track if new executor is added var newExecAvail = false http://git-wip-us.apache.org/repos/asf/spark/blob/7f779e74/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala -- diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index edc3c19..2d09863 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -216,7 +216,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp val activeExecutors = executorDataMap.filterKeys(executorIsAlive) val workOffers = activeExecutors.map { case (id, executorData) => new WorkerOffer(id, executorData.executorHost, executorData.freeCores) - }.toSeq + }.toIndexedSeq launchTasks(scheduler.resourceOffers(workOffers)) } @@ -233,7 +233,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp // Filter out executors under killing if (executorIsAlive(executorId)) { val executorData = executorDataMap(executorId) -val workOffers = Seq( +val workOffers = IndexedSeq( new WorkerOffer(executorId, executorData.executorHost, executorData.freeCores)) launchTasks(scheduler.resourceOffers(workOffers)) }
spark git commit: [SPARK-17712][SQL] Fix invalid pushdown of data-independent filters beneath aggregates
Repository: spark Updated Branches: refs/heads/branch-2.0 7ffafa3bf -> f7839e47c [SPARK-17712][SQL] Fix invalid pushdown of data-independent filters beneath aggregates ## What changes were proposed in this pull request? This patch fixes a minor correctness issue impacting the pushdown of filters beneath aggregates. Specifically, if a filter condition references no grouping or aggregate columns (e.g. `WHERE false`) then it would be incorrectly pushed beneath an aggregate. Intuitively, the only case where you can push a filter beneath an aggregate is when that filter is deterministic and is defined over the grouping columns / expressions, since in that case the filter is acting to exclude entire groups from the query (like a `HAVING` clause). The existing code would only push deterministic filters beneath aggregates when all of the filter's references were grouping columns, but this logic missed the case where a filter has no references. For example, `WHERE false` is deterministic but is independent of the actual data. This patch fixes this minor bug by adding a new check to ensure that we don't push filters beneath aggregates when those filters don't reference any columns. ## How was this patch tested? New regression test in FilterPushdownSuite. Author: Josh RosenCloses #15289 from JoshRosen/SPARK-17712. (cherry picked from commit 37eb9184f1e9f1c07142c66936671f4711ef407d) Signed-off-by: Josh Rosen Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f7839e47 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f7839e47 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f7839e47 Branch: refs/heads/branch-2.0 Commit: f7839e47c3bda86d61c3b2be72c168aab4a5674f Parents: 7ffafa3 Author: Josh Rosen Authored: Wed Sep 28 19:03:05 2016 -0700 Committer: Josh Rosen Committed: Thu Sep 29 12:05:46 2016 -0700 -- .../spark/sql/catalyst/optimizer/Optimizer.scala | 2 +- .../catalyst/optimizer/FilterPushdownSuite.scala | 17 + 2 files changed, 18 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/f7839e47/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 35b122d..4c06038 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -1071,7 +1071,7 @@ object PushDownPredicate extends Rule[LogicalPlan] with PredicateHelper { val (pushDown, rest) = candidates.partition { cond => val replaced = replaceAlias(cond, aliasMap) -replaced.references.subsetOf(aggregate.child.outputSet) +cond.references.nonEmpty && replaced.references.subsetOf(aggregate.child.outputSet) } val stayUp = rest ++ containingNonDeterministic http://git-wip-us.apache.org/repos/asf/spark/blob/f7839e47/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala -- diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala index 55836f9..019f132 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala @@ -687,6 +687,23 @@ class FilterPushdownSuite extends PlanTest { comparePlans(optimized, correctAnswer) } + test("SPARK-17712: aggregate: don't push down filters that are data-independent") { +val originalQuery = LocalRelation.apply(testRelation.output, Seq.empty) + .select('a, 'b) + .groupBy('a)(count('a)) + .where(false) + +val optimized = Optimize.execute(originalQuery.analyze) + +val correctAnswer = testRelation + .select('a, 'b) + .groupBy('a)(count('a)) + .where(false) + .analyze + +comparePlans(optimized, correctAnswer) + } + test("broadcast hint") { val originalQuery = BroadcastHint(testRelation) .where('a === 2L && 'b + Rand(10).as("rnd") === 3) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail:
spark git commit: [SPARK-16343][SQL] Improve the PushDownPredicate rule to pushdown predicates correctly in non-deterministic condition.
Repository: spark Updated Branches: refs/heads/branch-2.0 ca8130050 -> 7ffafa3bf [SPARK-16343][SQL] Improve the PushDownPredicate rule to pushdown predicates correctly in non-deterministic condition. ## What changes were proposed in this pull request? Currently our Optimizer may reorder the predicates to run them more efficient, but in non-deterministic condition, change the order between deterministic parts and non-deterministic parts may change the number of input rows. For example: ```SELECT a FROM t WHERE rand() < 0.1 AND a = 1``` And ```SELECT a FROM t WHERE a = 1 AND rand() < 0.1``` may call rand() for different times and therefore the output rows differ. This PR improved this condition by checking whether the predicate is placed before any non-deterministic predicates. ## How was this patch tested? Expanded related testcases in FilterPushdownSuite. Author: èæåCloses #14012 from jiangxb1987/ppd. (cherry picked from commit f376c37268848dbb4b2fb57677e22ef2bf207b49) Signed-off-by: Josh Rosen Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7ffafa3b Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7ffafa3b Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7ffafa3b Branch: refs/heads/branch-2.0 Commit: 7ffafa3bfecb8bc92b79eddea1ca18166efd3385 Parents: ca81300 Author: èæå Authored: Thu Jul 14 00:21:27 2016 +0800 Committer: Josh Rosen Committed: Thu Sep 29 11:44:00 2016 -0700 -- .../sql/catalyst/optimizer/Optimizer.scala | 44 +--- .../optimizer/FilterPushdownSuite.scala | 8 ++-- 2 files changed, 33 insertions(+), 19 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/7ffafa3b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index d824c2e..35b122d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -1031,19 +1031,23 @@ object PushDownPredicate extends Rule[LogicalPlan] with PredicateHelper { project.copy(child = Filter(replaceAlias(condition, aliasMap), grandChild)) // Push [[Filter]] operators through [[Window]] operators. Parts of the predicate that can be -// pushed beneath must satisfy the following two conditions: +// pushed beneath must satisfy the following conditions: // 1. All the expressions are part of window partitioning key. The expressions can be compound. -// 2. Deterministic +// 2. Deterministic. +// 3. Placed before any non-deterministic predicates. case filter @ Filter(condition, w: Window) if w.partitionSpec.forall(_.isInstanceOf[AttributeReference]) => val partitionAttrs = AttributeSet(w.partitionSpec.flatMap(_.references)) - val (pushDown, stayUp) = splitConjunctivePredicates(condition).partition { cond => -cond.references.subsetOf(partitionAttrs) && cond.deterministic && - // This is for ensuring all the partitioning expressions have been converted to alias - // in Analyzer. Thus, we do not need to check if the expressions in conditions are - // the same as the expressions used in partitioning columns. - partitionAttrs.forall(_.isInstanceOf[Attribute]) + + val (candidates, containingNonDeterministic) = +splitConjunctivePredicates(condition).span(_.deterministic) + + val (pushDown, rest) = candidates.partition { cond => +cond.references.subsetOf(partitionAttrs) } + + val stayUp = rest ++ containingNonDeterministic + if (pushDown.nonEmpty) { val pushDownPredicate = pushDown.reduce(And) val newWindow = w.copy(child = Filter(pushDownPredicate, w.child)) @@ -1062,11 +1066,16 @@ object PushDownPredicate extends Rule[LogicalPlan] with PredicateHelper { // For each filter, expand the alias and check if the filter can be evaluated using // attributes produced by the aggregate operator's child operator. - val (pushDown, stayUp) = splitConjunctivePredicates(condition).partition { cond => + val (candidates, containingNonDeterministic) = +splitConjunctivePredicates(condition).span(_.deterministic) + + val (pushDown, rest) = candidates.partition { cond => val replaced = replaceAlias(cond, aliasMap) -
spark git commit: [DOCS] Reorganize explanation of Accumulators and Broadcast Variables
Repository: spark Updated Branches: refs/heads/master b2e9731ca -> 958200497 [DOCS] Reorganize explanation of Accumulators and Broadcast Variables ## What changes were proposed in this pull request? The discussion of the interaction of Accumulators and Broadcast Variables should logically follow the discussion on Checkpointing. As currently written, this section discusses Checkpointing before it is formally introduced. To remedy this: - Rename this section to "Accumulators, Broadcast Variables, and Checkpoints", and - Move this section after "Checkpointing". ## How was this patch tested? Testing: ran $ SKIP_API=1 jekyll build , and verified changes in a Web browser pointed at docs/_site/index.html. Author: José Hiram SoltrenCloses #15281 from jsoltren/doc-changes. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/95820049 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/95820049 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/95820049 Branch: refs/heads/master Commit: 958200497affb40f05e321c2b0e252d365ae02f4 Parents: b2e9731 Author: José Hiram Soltren Authored: Thu Sep 29 10:18:56 2016 -0700 Committer: Marcelo Vanzin Committed: Thu Sep 29 10:18:56 2016 -0700 -- docs/streaming-programming-guide.md | 328 +++ 1 file changed, 164 insertions(+), 164 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/95820049/docs/streaming-programming-guide.md -- diff --git a/docs/streaming-programming-guide.md b/docs/streaming-programming-guide.md index 43f1cf3..0b0315b 100644 --- a/docs/streaming-programming-guide.md +++ b/docs/streaming-programming-guide.md @@ -1368,170 +1368,6 @@ Note that the connections in the pool should be lazily created on demand and tim *** -## Accumulators and Broadcast Variables - -[Accumulators](programming-guide.html#accumulators) and [Broadcast variables](programming-guide.html#broadcast-variables) cannot be recovered from checkpoint in Spark Streaming. If you enable checkpointing and use [Accumulators](programming-guide.html#accumulators) or [Broadcast variables](programming-guide.html#broadcast-variables) as well, you'll have to create lazily instantiated singleton instances for [Accumulators](programming-guide.html#accumulators) and [Broadcast variables](programming-guide.html#broadcast-variables) so that they can be re-instantiated after the driver restarts on failure. This is shown in the following example. - - - -{% highlight scala %} - -object WordBlacklist { - - @volatile private var instance: Broadcast[Seq[String]] = null - - def getInstance(sc: SparkContext): Broadcast[Seq[String]] = { -if (instance == null) { - synchronized { -if (instance == null) { - val wordBlacklist = Seq("a", "b", "c") - instance = sc.broadcast(wordBlacklist) -} - } -} -instance - } -} - -object DroppedWordsCounter { - - @volatile private var instance: LongAccumulator = null - - def getInstance(sc: SparkContext): LongAccumulator = { -if (instance == null) { - synchronized { -if (instance == null) { - instance = sc.longAccumulator("WordsInBlacklistCounter") -} - } -} -instance - } -} - -wordCounts.foreachRDD { (rdd: RDD[(String, Int)], time: Time) => - // Get or register the blacklist Broadcast - val blacklist = WordBlacklist.getInstance(rdd.sparkContext) - // Get or register the droppedWordsCounter Accumulator - val droppedWordsCounter = DroppedWordsCounter.getInstance(rdd.sparkContext) - // Use blacklist to drop words and use droppedWordsCounter to count them - val counts = rdd.filter { case (word, count) => -if (blacklist.value.contains(word)) { - droppedWordsCounter.add(count) - false -} else { - true -} - }.collect().mkString("[", ", ", "]") - val output = "Counts at time " + time + " " + counts -}) - -{% endhighlight %} - -See the full [source code]({{site.SPARK_GITHUB_URL}}/blob/v{{site.SPARK_VERSION_SHORT}}/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala). - - -{% highlight java %} - -class JavaWordBlacklist { - - private static volatile Broadcast instance = null; - - public static Broadcast
getInstance(JavaSparkContext jsc) { -if (instance == null) { - synchronized (JavaWordBlacklist.class) { -if (instance == null) { - List wordBlacklist = Arrays.asList("a", "b", "c"); - instance = jsc.broadcast(wordBlacklist); -} - } -} -return instance; - } -} - -class JavaDroppedWordsCounter { - - private static volatile
spark git commit: [MINOR][DOCS] Fix th doc. of spark-streaming with kinesis
Repository: spark Updated Branches: refs/heads/branch-2.0 7d612a7d5 -> ca8130050 [MINOR][DOCS] Fix th doc. of spark-streaming with kinesis ## What changes were proposed in this pull request? This pr is just to fix the document of `spark-kinesis-integration`. Since `SPARK-17418` prevented all the kinesis stuffs (including kinesis example code) from publishing, `bin/run-example streaming.KinesisWordCountASL` and `bin/run-example streaming.JavaKinesisWordCountASL` does not work. Instead, it fetches the kinesis jar from the Spark Package. Author: Takeshi YAMAMUROCloses #15260 from maropu/DocFixKinesis. (cherry picked from commit b2e9731ca494c0c60d571499f68bb8306a3c9fe5) Signed-off-by: Sean Owen Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ca813005 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ca813005 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ca813005 Branch: refs/heads/branch-2.0 Commit: ca8130050964fac8baa568918f0b67c44a7a2518 Parents: 7d612a7 Author: Takeshi YAMAMURO Authored: Thu Sep 29 08:26:03 2016 -0400 Committer: Sean Owen Committed: Thu Sep 29 08:26:14 2016 -0400 -- docs/streaming-kinesis-integration.md | 9 +++-- 1 file changed, 3 insertions(+), 6 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/ca813005/docs/streaming-kinesis-integration.md -- diff --git a/docs/streaming-kinesis-integration.md b/docs/streaming-kinesis-integration.md index 96198dd..6be0b54 100644 --- a/docs/streaming-kinesis-integration.md +++ b/docs/streaming-kinesis-integration.md @@ -166,10 +166,7 @@ A Kinesis stream can be set up at one of the valid Kinesis endpoints with 1 or m Running the Example To run the example, -- Download Spark source and follow the [instructions](building-spark.html) to build Spark with profile *-Pkinesis-asl*. - -mvn -Pkinesis-asl -DskipTests clean package - +- Download a Spark binary from the [download site](http://spark.apache.org/downloads.html). - Set up Kinesis stream (see earlier section) within AWS. Note the name of the Kinesis stream and the endpoint URL corresponding to the region where the stream was created. @@ -180,12 +177,12 @@ To run the example, -bin/run-example streaming.KinesisWordCountASL [Kinesis app name] [Kinesis stream name] [endpoint URL] +bin/run-example --packages org.apache.spark:spark-streaming-kinesis-asl_{{site.SCALA_BINARY_VERSION}}:{{site.SPARK_VERSION_SHORT}} streaming.KinesisWordCountASL [Kinesis app name] [Kinesis stream name] [endpoint URL] -bin/run-example streaming.JavaKinesisWordCountASL [Kinesis app name] [Kinesis stream name] [endpoint URL] +bin/run-example --packages org.apache.spark:spark-streaming-kinesis-asl_{{site.SCALA_BINARY_VERSION}}:{{site.SPARK_VERSION_SHORT}} streaming.JavaKinesisWordCountASL [Kinesis app name] [Kinesis stream name] [endpoint URL] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [MINOR][DOCS] Fix th doc. of spark-streaming with kinesis
Repository: spark Updated Branches: refs/heads/master b35b0dbbf -> b2e9731ca [MINOR][DOCS] Fix th doc. of spark-streaming with kinesis ## What changes were proposed in this pull request? This pr is just to fix the document of `spark-kinesis-integration`. Since `SPARK-17418` prevented all the kinesis stuffs (including kinesis example code) from publishing, `bin/run-example streaming.KinesisWordCountASL` and `bin/run-example streaming.JavaKinesisWordCountASL` does not work. Instead, it fetches the kinesis jar from the Spark Package. Author: Takeshi YAMAMUROCloses #15260 from maropu/DocFixKinesis. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b2e9731c Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b2e9731c Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b2e9731c Branch: refs/heads/master Commit: b2e9731ca494c0c60d571499f68bb8306a3c9fe5 Parents: b35b0db Author: Takeshi YAMAMURO Authored: Thu Sep 29 08:26:03 2016 -0400 Committer: Sean Owen Committed: Thu Sep 29 08:26:03 2016 -0400 -- docs/streaming-kinesis-integration.md | 9 +++-- 1 file changed, 3 insertions(+), 6 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/b2e9731c/docs/streaming-kinesis-integration.md -- diff --git a/docs/streaming-kinesis-integration.md b/docs/streaming-kinesis-integration.md index 96198dd..6be0b54 100644 --- a/docs/streaming-kinesis-integration.md +++ b/docs/streaming-kinesis-integration.md @@ -166,10 +166,7 @@ A Kinesis stream can be set up at one of the valid Kinesis endpoints with 1 or m Running the Example To run the example, -- Download Spark source and follow the [instructions](building-spark.html) to build Spark with profile *-Pkinesis-asl*. - -mvn -Pkinesis-asl -DskipTests clean package - +- Download a Spark binary from the [download site](http://spark.apache.org/downloads.html). - Set up Kinesis stream (see earlier section) within AWS. Note the name of the Kinesis stream and the endpoint URL corresponding to the region where the stream was created. @@ -180,12 +177,12 @@ To run the example, -bin/run-example streaming.KinesisWordCountASL [Kinesis app name] [Kinesis stream name] [endpoint URL] +bin/run-example --packages org.apache.spark:spark-streaming-kinesis-asl_{{site.SCALA_BINARY_VERSION}}:{{site.SPARK_VERSION_SHORT}} streaming.KinesisWordCountASL [Kinesis app name] [Kinesis stream name] [endpoint URL] -bin/run-example streaming.JavaKinesisWordCountASL [Kinesis app name] [Kinesis stream name] [endpoint URL] +bin/run-example --packages org.apache.spark:spark-streaming-kinesis-asl_{{site.SCALA_BINARY_VERSION}}:{{site.SPARK_VERSION_SHORT}} streaming.JavaKinesisWordCountASL [Kinesis app name] [Kinesis stream name] [endpoint URL] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-17614][SQL] sparkSession.read() .jdbc(***) use the sql syntax "where 1=0" that Cassandra does not support
Repository: spark Updated Branches: refs/heads/master f7082ac12 -> b35b0dbbf [SPARK-17614][SQL] sparkSession.read() .jdbc(***) use the sql syntax "where 1=0" that Cassandra does not support ## What changes were proposed in this pull request? Use dialect's table-exists query rather than hard-coded WHERE 1=0 query ## How was this patch tested? Existing tests. Author: Sean OwenCloses #15196 from srowen/SPARK-17614. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b35b0dbb Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b35b0dbb Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b35b0dbb Branch: refs/heads/master Commit: b35b0dbbfa3dc1bdf5e2fa1e9677d06635142b22 Parents: f7082ac Author: Sean Owen Authored: Thu Sep 29 08:24:34 2016 -0400 Committer: Sean Owen Committed: Thu Sep 29 08:24:34 2016 -0400 -- .../sql/execution/datasources/jdbc/JDBCRDD.scala | 6 ++ .../org/apache/spark/sql/jdbc/JdbcDialects.scala | 15 ++- 2 files changed, 16 insertions(+), 5 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/b35b0dbb/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala index a7da29f..f10615e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala @@ -58,11 +58,11 @@ object JDBCRDD extends Logging { val dialect = JdbcDialects.get(url) val conn: Connection = JdbcUtils.createConnectionFactory(url, properties)() try { - val statement = conn.prepareStatement(s"SELECT * FROM $table WHERE 1=0") + val statement = conn.prepareStatement(dialect.getSchemaQuery(table)) try { val rs = statement.executeQuery() try { - return JdbcUtils.getSchema(rs, dialect) + JdbcUtils.getSchema(rs, dialect) } finally { rs.close() } @@ -72,8 +72,6 @@ object JDBCRDD extends Logging { } finally { conn.close() } - -throw new RuntimeException("This line is unreachable.") } /** http://git-wip-us.apache.org/repos/asf/spark/blob/b35b0dbb/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala index 3a6d5b7..8dd4b8f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.jdbc import java.sql.Connection -import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.annotation.{DeveloperApi, Since} import org.apache.spark.sql.types._ /** @@ -100,6 +100,19 @@ abstract class JdbcDialect extends Serializable { } /** + * The SQL query that should be used to discover the schema of a table. It only needs to + * ensure that the result set has the same schema as the table, such as by calling + * "SELECT * ...". Dialects can override this method to return a query that works best in a + * particular database. + * @param table The name of the table. + * @return The SQL query to use for discovering the schema. + */ + @Since("2.1.0") + def getSchemaQuery(table: String): String = { +s"SELECT * FROM $table WHERE 1=0" + } + + /** * Override connection specific properties to run before a select is made. This is in place to * allow dialects that need special treatment to optimize behavior. * @param connection The connection object - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-17704][ML][MLLIB] ChiSqSelector performance improvement.
Repository: spark Updated Branches: refs/heads/master a19a1bb59 -> f7082ac12 [SPARK-17704][ML][MLLIB] ChiSqSelector performance improvement. ## What changes were proposed in this pull request? Several performance improvement for ```ChiSqSelector```: 1, Keep ```selectedFeatures``` ordered ascendent. ```ChiSqSelectorModel.transform``` need ```selectedFeatures``` ordered to make prediction. We should sort it when training model rather than making prediction, since users usually train model once and use the model to do prediction multiple times. 2, When training ```fpr``` type ```ChiSqSelectorModel```, it's not necessary to sort the ChiSq test result by statistic. ## How was this patch tested? Existing unit tests. Author: Yanbo LiangCloses #15277 from yanboliang/spark-17704. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f7082ac1 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f7082ac1 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f7082ac1 Branch: refs/heads/master Commit: f7082ac12518ae84d6d1d4b7330a9f12cf95e7c1 Parents: a19a1bb Author: Yanbo Liang Authored: Thu Sep 29 04:30:42 2016 -0700 Committer: Yanbo Liang Committed: Thu Sep 29 04:30:42 2016 -0700 -- .../spark/mllib/feature/ChiSqSelector.scala | 45 +--- project/MimaExcludes.scala | 3 -- 2 files changed, 30 insertions(+), 18 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/f7082ac1/mllib/src/main/scala/org/apache/spark/mllib/feature/ChiSqSelector.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/mllib/feature/ChiSqSelector.scala b/mllib/src/main/scala/org/apache/spark/mllib/feature/ChiSqSelector.scala index 0f7c6e8..706ce78 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/feature/ChiSqSelector.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/feature/ChiSqSelector.scala @@ -35,12 +35,24 @@ import org.apache.spark.sql.{Row, SparkSession} /** * Chi Squared selector model. * - * @param selectedFeatures list of indices to select (filter). + * @param selectedFeatures list of indices to select (filter). Must be ordered asc */ @Since("1.3.0") class ChiSqSelectorModel @Since("1.3.0") ( @Since("1.3.0") val selectedFeatures: Array[Int]) extends VectorTransformer with Saveable { + require(isSorted(selectedFeatures), "Array has to be sorted asc") + + protected def isSorted(array: Array[Int]): Boolean = { +var i = 1 +val len = array.length +while (i < len) { + if (array(i) < array(i-1)) return false + i += 1 +} +true + } + /** * Applies transformation on a vector. * @@ -57,22 +69,21 @@ class ChiSqSelectorModel @Since("1.3.0") ( * Preserves the order of filtered features the same as their indices are stored. * Might be moved to Vector as .slice * @param features vector - * @param filterIndices indices of features to filter + * @param filterIndices indices of features to filter, must be ordered asc */ private def compress(features: Vector, filterIndices: Array[Int]): Vector = { -val orderedIndices = filterIndices.sorted features match { case SparseVector(size, indices, values) => -val newSize = orderedIndices.length +val newSize = filterIndices.length val newValues = new ArrayBuilder.ofDouble val newIndices = new ArrayBuilder.ofInt var i = 0 var j = 0 var indicesIdx = 0 var filterIndicesIdx = 0 -while (i < indices.length && j < orderedIndices.length) { +while (i < indices.length && j < filterIndices.length) { indicesIdx = indices(i) - filterIndicesIdx = orderedIndices(j) + filterIndicesIdx = filterIndices(j) if (indicesIdx == filterIndicesIdx) { newIndices += j newValues += values(i) @@ -90,7 +101,7 @@ class ChiSqSelectorModel @Since("1.3.0") ( Vectors.sparse(newSize, newIndices.result(), newValues.result()) case DenseVector(values) => val values = features.toArray -Vectors.dense(orderedIndices.map(i => values(i))) +Vectors.dense(filterIndices.map(i => values(i))) case other => throw new UnsupportedOperationException( s"Only sparse and dense vectors are supported but got ${other.getClass}.") @@ -220,18 +231,22 @@ class ChiSqSelector @Since("2.1.0") () extends Serializable { @Since("1.3.0") def fit(data: RDD[LabeledPoint]): ChiSqSelectorModel = { val chiSqTestResult = Statistics.chiSqTest(data) - .zipWithIndex.sortBy { case (res, _) => -res.statistic } val features =
spark git commit: [SPARK-16356][FOLLOW-UP][ML] Enforce ML test of exception for local/distributed Dataset.
Repository: spark Updated Branches: refs/heads/master 37eb9184f -> a19a1bb59 [SPARK-16356][FOLLOW-UP][ML] Enforce ML test of exception for local/distributed Dataset. ## What changes were proposed in this pull request? #14035 added ```testImplicits``` to ML unit tests and promoted ```toDF()```, but left one minor issue at ```VectorIndexerSuite```. If we create the DataFrame by ```Seq(...).toDF()```, it will throw different error/exception compared with ```sc.parallelize(Seq(...)).toDF()``` for one of the test cases. After in-depth study, I found it was caused by different behavior of local and distributed Dataset if the UDF failed at ```assert```. If the data is local Dataset, it throws ```AssertionError``` directly; If the data is distributed Dataset, it throws ```SparkException``` which is the wrapper of ```AssertionError```. I think we should enforce this test to cover both case. ## How was this patch tested? Unit test. Author: Yanbo LiangCloses #15261 from yanboliang/spark-16356. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a19a1bb5 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a19a1bb5 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a19a1bb5 Branch: refs/heads/master Commit: a19a1bb59411177caaf99581e89098826b7d0c7b Parents: 37eb918 Author: Yanbo Liang Authored: Thu Sep 29 00:54:26 2016 -0700 Committer: Yanbo Liang Committed: Thu Sep 29 00:54:26 2016 -0700 -- .../apache/spark/ml/feature/VectorIndexerSuite.scala | 13 + 1 file changed, 9 insertions(+), 4 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/a19a1bb5/mllib/src/test/scala/org/apache/spark/ml/feature/VectorIndexerSuite.scala -- diff --git a/mllib/src/test/scala/org/apache/spark/ml/feature/VectorIndexerSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/feature/VectorIndexerSuite.scala index 4da1b13..b28ce2a 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/feature/VectorIndexerSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/feature/VectorIndexerSuite.scala @@ -88,9 +88,7 @@ class VectorIndexerSuite extends SparkFunSuite with MLlibTestSparkContext densePoints1 = densePoints1Seq.map(FeatureData).toDF() sparsePoints1 = sparsePoints1Seq.map(FeatureData).toDF() -// TODO: If we directly use `toDF` without parallelize, the test in -// "Throws error when given RDDs with different size vectors" is failed for an unknown reason. -densePoints2 = sc.parallelize(densePoints2Seq, 2).map(FeatureData).toDF() +densePoints2 = densePoints2Seq.map(FeatureData).toDF() sparsePoints2 = sparsePoints2Seq.map(FeatureData).toDF() badPoints = badPointsSeq.map(FeatureData).toDF() } @@ -121,10 +119,17 @@ class VectorIndexerSuite extends SparkFunSuite with MLlibTestSparkContext model.transform(densePoints1) // should work model.transform(sparsePoints1) // should work -intercept[SparkException] { +// If the data is local Dataset, it throws AssertionError directly. +intercept[AssertionError] { model.transform(densePoints2).collect() logInfo("Did not throw error when fit, transform were called on vectors of different lengths") } +// If the data is distributed Dataset, it throws SparkException +// which is the wrapper of AssertionError. +intercept[SparkException] { + model.transform(densePoints2.repartition(2)).collect() + logInfo("Did not throw error when fit, transform were called on vectors of different lengths") +} intercept[SparkException] { vectorIndexer.fit(badPoints) logInfo("Did not throw error when fitting vectors of different lengths in same RDD.") - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org