spark git commit: [SPARK-17513][SQL] Make StreamExecution garbage-collect its metadata
Repository: spark Updated Branches: refs/heads/branch-2.0 7026eb87e -> 5456a1b4f [SPARK-17513][SQL] Make StreamExecution garbage-collect its metadata ## What changes were proposed in this pull request? This PR modifies StreamExecution such that it discards metadata for batches that have already been fully processed. I used the purge method that was added as part of SPARK-17235. This is based on work by frreiss in #15067, but fixed the test case along with some typos. ## How was this patch tested? A new test case in StreamingQuerySuite. The test case would fail without the changes in this pull request. Author: petermaxleeAuthor: frreiss Closes #15126 from petermaxlee/SPARK-17513. (cherry picked from commit be9d57fc9d8b10e4234c01c06ed43fd7dd12c07b) Signed-off-by: Reynold Xin Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5456a1b4 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5456a1b4 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5456a1b4 Branch: refs/heads/branch-2.0 Commit: 5456a1b4fcd85d0d7f2f1cc64e44967def0950bf Parents: 7026eb8 Author: petermaxlee Authored: Mon Sep 19 22:19:51 2016 -0700 Committer: Reynold Xin Committed: Mon Sep 19 22:19:58 2016 -0700 -- .../sql/execution/streaming/MetadataLog.scala | 1 + .../execution/streaming/StreamExecution.scala | 7 ++ .../sql/streaming/StreamingQuerySuite.scala | 24 3 files changed, 32 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/5456a1b4/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLog.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLog.scala index 78d6be1..9e2604c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLog.scala @@ -24,6 +24,7 @@ package org.apache.spark.sql.execution.streaming * - Allow the user to query the latest batch id. * - Allow the user to query the metadata object of a specified batch id. * - Allow the user to query metadata objects in a range of batch ids. + * - Allow the user to remove obsolete metadata */ trait MetadataLog[T] { http://git-wip-us.apache.org/repos/asf/spark/blob/5456a1b4/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index 5e1e5ee..b7587f2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -290,6 +290,13 @@ class StreamExecution( assert(offsetLog.add(currentBatchId, availableOffsets.toCompositeOffset(sources)), s"Concurrent update to the log. Multiple streaming jobs detected for $currentBatchId") logInfo(s"Committed offsets for batch $currentBatchId.") + + // Now that we have logged the new batch, no further processing will happen for + // the previous batch, and it is safe to discard the old metadata. + // Note that purge is exclusive, i.e. it purges everything before currentBatchId. + // NOTE: If StreamExecution implements pipeline parallelism (multiple batches in + // flight at the same time), this cleanup logic will need to change. + offsetLog.purge(currentBatchId) } else { awaitBatchLock.lock() try { http://git-wip-us.apache.org/repos/asf/spark/blob/5456a1b4/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala index 9d58315..d3e2cab 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala @@ -125,6 +125,30 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter { ) } + testQuietly("StreamExecution metadata garbage collection") { +val inputData = MemoryStream[Int] +val mapped =
spark git commit: [SPARK-17513][SQL] Make StreamExecution garbage-collect its metadata
Repository: spark Updated Branches: refs/heads/master 26145a5af -> be9d57fc9 [SPARK-17513][SQL] Make StreamExecution garbage-collect its metadata ## What changes were proposed in this pull request? This PR modifies StreamExecution such that it discards metadata for batches that have already been fully processed. I used the purge method that was added as part of SPARK-17235. This is based on work by frreiss in #15067, but fixed the test case along with some typos. ## How was this patch tested? A new test case in StreamingQuerySuite. The test case would fail without the changes in this pull request. Author: petermaxleeAuthor: frreiss Closes #15126 from petermaxlee/SPARK-17513. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/be9d57fc Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/be9d57fc Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/be9d57fc Branch: refs/heads/master Commit: be9d57fc9d8b10e4234c01c06ed43fd7dd12c07b Parents: 26145a5 Author: petermaxlee Authored: Mon Sep 19 22:19:51 2016 -0700 Committer: Reynold Xin Committed: Mon Sep 19 22:19:51 2016 -0700 -- .../sql/execution/streaming/MetadataLog.scala | 1 + .../execution/streaming/StreamExecution.scala | 7 ++ .../sql/streaming/StreamingQuerySuite.scala | 24 3 files changed, 32 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/be9d57fc/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLog.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLog.scala index 78d6be1..9e2604c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLog.scala @@ -24,6 +24,7 @@ package org.apache.spark.sql.execution.streaming * - Allow the user to query the latest batch id. * - Allow the user to query the metadata object of a specified batch id. * - Allow the user to query metadata objects in a range of batch ids. + * - Allow the user to remove obsolete metadata */ trait MetadataLog[T] { http://git-wip-us.apache.org/repos/asf/spark/blob/be9d57fc/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index a1aae61..220f77d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -290,6 +290,13 @@ class StreamExecution( assert(offsetLog.add(currentBatchId, availableOffsets.toCompositeOffset(sources)), s"Concurrent update to the log. Multiple streaming jobs detected for $currentBatchId") logInfo(s"Committed offsets for batch $currentBatchId.") + + // Now that we have logged the new batch, no further processing will happen for + // the previous batch, and it is safe to discard the old metadata. + // Note that purge is exclusive, i.e. it purges everything before currentBatchId. + // NOTE: If StreamExecution implements pipeline parallelism (multiple batches in + // flight at the same time), this cleanup logic will need to change. + offsetLog.purge(currentBatchId) } else { awaitBatchLock.lock() try { http://git-wip-us.apache.org/repos/asf/spark/blob/be9d57fc/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala index 9d58315..d3e2cab 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala @@ -125,6 +125,30 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter { ) } + testQuietly("StreamExecution metadata garbage collection") { +val inputData = MemoryStream[Int] +val mapped = inputData.toDS().map(6 / _) + +// Run 3 batches, and then assert that only 1 metadata file is left at the end +// since the first 2
[1/3] spark git commit: [SPARK-17163][ML] Unified LogisticRegression interface
Repository: spark Updated Branches: refs/heads/master e719b1c04 -> 26145a5af http://git-wip-us.apache.org/repos/asf/spark/blob/26145a5a/mllib/src/test/scala/org/apache/spark/ml/classification/MultinomialLogisticRegressionSuite.scala -- diff --git a/mllib/src/test/scala/org/apache/spark/ml/classification/MultinomialLogisticRegressionSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/classification/MultinomialLogisticRegressionSuite.scala deleted file mode 100644 index 0913fe5..000 --- a/mllib/src/test/scala/org/apache/spark/ml/classification/MultinomialLogisticRegressionSuite.scala +++ /dev/null @@ -1,1056 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - *http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.ml.classification - -import scala.language.existentials - -import org.apache.spark.SparkFunSuite -import org.apache.spark.ml.attribute.NominalAttribute -import org.apache.spark.ml.classification.LogisticRegressionSuite._ -import org.apache.spark.ml.feature.LabeledPoint -import org.apache.spark.ml.linalg._ -import org.apache.spark.ml.param.ParamsSuite -import org.apache.spark.ml.util.{DefaultReadWriteTest, MLTestingUtils} -import org.apache.spark.ml.util.TestingUtils._ -import org.apache.spark.mllib.util.MLlibTestSparkContext -import org.apache.spark.sql.{DataFrame, Dataset, Row} - -class MultinomialLogisticRegressionSuite - extends SparkFunSuite with MLlibTestSparkContext with DefaultReadWriteTest { - - @transient var dataset: Dataset[_] = _ - @transient var multinomialDataset: DataFrame = _ - private val eps: Double = 1e-5 - - override def beforeAll(): Unit = { -super.beforeAll() - -dataset = { - val nPoints = 100 - val coefficients = Array( --0.57997, 0.912083, -0.371077, --0.16624, -0.84355, -0.048509) - - val xMean = Array(5.843, 3.057) - val xVariance = Array(0.6856, 0.1899) - - val testData = generateMultinomialLogisticInput( -coefficients, xMean, xVariance, addIntercept = true, nPoints, 42) - - val df = spark.createDataFrame(sc.parallelize(testData, 4)) - df.cache() - df -} - -multinomialDataset = { - val nPoints = 1 - val coefficients = Array( --0.57997, 0.912083, -0.371077, -0.819866, 2.688191, --0.16624, -0.84355, -0.048509, -0.301789, 4.170682) - - val xMean = Array(5.843, 3.057, 3.758, 1.199) - val xVariance = Array(0.6856, 0.1899, 3.116, 0.581) - - val testData = generateMultinomialLogisticInput( -coefficients, xMean, xVariance, addIntercept = true, nPoints, 42) - - val df = spark.createDataFrame(sc.parallelize(testData, 4)) - df.cache() - df -} - } - - /** - * Enable the ignored test to export the dataset into CSV format, - * so we can validate the training accuracy compared with R's glmnet package. - */ - ignore("export test data into CSV format") { -val rdd = multinomialDataset.rdd.map { case Row(label: Double, features: Vector) => - label + "," + features.toArray.mkString(",") -}.repartition(1) - rdd.saveAsTextFile("target/tmp/MultinomialLogisticRegressionSuite/multinomialDataset") - } - - test("params") { -ParamsSuite.checkParams(new MultinomialLogisticRegression) -val model = new MultinomialLogisticRegressionModel("mLogReg", - Matrices.dense(2, 1, Array(0.0, 0.0)), Vectors.dense(0.0, 0.0), 2) -ParamsSuite.checkParams(model) - } - - test("multinomial logistic regression: default params") { -val mlr = new MultinomialLogisticRegression -assert(mlr.getLabelCol === "label") -assert(mlr.getFeaturesCol === "features") -assert(mlr.getPredictionCol === "prediction") -assert(mlr.getRawPredictionCol === "rawPrediction") -assert(mlr.getProbabilityCol === "probability") -assert(!mlr.isDefined(mlr.weightCol)) -assert(!mlr.isDefined(mlr.thresholds)) -assert(mlr.getFitIntercept) -assert(mlr.getStandardization) -val model = mlr.fit(dataset) -model.transform(dataset) - .select("label", "probability", "prediction", "rawPrediction") - .collect() -assert(model.getFeaturesCol === "features") -assert(model.getPredictionCol
[2/3] spark git commit: [SPARK-17163][ML] Unified LogisticRegression interface
http://git-wip-us.apache.org/repos/asf/spark/blob/26145a5a/mllib/src/test/scala/org/apache/spark/ml/classification/LogisticRegressionSuite.scala -- diff --git a/mllib/src/test/scala/org/apache/spark/ml/classification/LogisticRegressionSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/classification/LogisticRegressionSuite.scala index a1b4853..2623759 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/classification/LogisticRegressionSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/classification/LogisticRegressionSuite.scala @@ -22,28 +22,49 @@ import scala.language.existentials import scala.util.Random import scala.util.control.Breaks._ -import org.apache.spark.SparkFunSuite +import org.apache.spark.{SparkException, SparkFunSuite} +import org.apache.spark.ml.attribute.NominalAttribute import org.apache.spark.ml.classification.LogisticRegressionSuite._ -import org.apache.spark.ml.feature.{Instance, LabeledPoint} -import org.apache.spark.ml.linalg.{Vector, Vectors} +import org.apache.spark.ml.feature.LabeledPoint +import org.apache.spark.ml.linalg.{DenseMatrix, Matrices, SparseMatrix, SparseVector, Vector, Vectors} import org.apache.spark.ml.param.ParamsSuite import org.apache.spark.ml.util.{DefaultReadWriteTest, MLTestingUtils} import org.apache.spark.ml.util.TestingUtils._ import org.apache.spark.mllib.util.MLlibTestSparkContext -import org.apache.spark.sql.{DataFrame, Dataset, Row} +import org.apache.spark.sql.{Dataset, Row} import org.apache.spark.sql.functions.lit class LogisticRegressionSuite extends SparkFunSuite with MLlibTestSparkContext with DefaultReadWriteTest { - @transient var dataset: Dataset[_] = _ - @transient var binaryDataset: DataFrame = _ + @transient var smallBinaryDataset: Dataset[_] = _ + @transient var smallMultinomialDataset: Dataset[_] = _ + @transient var binaryDataset: Dataset[_] = _ + @transient var multinomialDataset: Dataset[_] = _ private val eps: Double = 1e-5 override def beforeAll(): Unit = { super.beforeAll() -dataset = spark.createDataFrame(generateLogisticInput(1.0, 1.0, nPoints = 100, seed = 42)) +smallBinaryDataset = + spark.createDataFrame(generateLogisticInput(1.0, 1.0, nPoints = 100, seed = 42)) + +smallMultinomialDataset = { + val nPoints = 100 + val coefficients = Array( +-0.57997, 0.912083, -0.371077, +-0.16624, -0.84355, -0.048509) + + val xMean = Array(5.843, 3.057) + val xVariance = Array(0.6856, 0.1899) + + val testData = generateMultinomialLogisticInput( +coefficients, xMean, xVariance, addIntercept = true, nPoints, 42) + + val df = spark.createDataFrame(sc.parallelize(testData, 4)) + df.cache() + df +} binaryDataset = { val nPoints = 1 @@ -57,6 +78,23 @@ class LogisticRegressionSuite spark.createDataFrame(sc.parallelize(testData, 4)) } + +multinomialDataset = { + val nPoints = 1 + val coefficients = Array( +-0.57997, 0.912083, -0.371077, -0.819866, 2.688191, +-0.16624, -0.84355, -0.048509, -0.301789, 4.170682) + + val xMean = Array(5.843, 3.057, 3.758, 1.199) + val xVariance = Array(0.6856, 0.1899, 3.116, 0.581) + + val testData = generateMultinomialLogisticInput( +coefficients, xMean, xVariance, addIntercept = true, nPoints, 42) + + val df = spark.createDataFrame(sc.parallelize(testData, 4)) + df.cache() + df +} } /** @@ -67,6 +105,9 @@ class LogisticRegressionSuite binaryDataset.rdd.map { case Row(label: Double, features: Vector) => label + "," + features.toArray.mkString(",") }.repartition(1).saveAsTextFile("target/tmp/LogisticRegressionSuite/binaryDataset") +multinomialDataset.rdd.map { case Row(label: Double, features: Vector) => + label + "," + features.toArray.mkString(",") + }.repartition(1).saveAsTextFile("target/tmp/LogisticRegressionSuite/multinomialDataset") } test("params") { @@ -82,11 +123,12 @@ class LogisticRegressionSuite assert(lr.getPredictionCol === "prediction") assert(lr.getRawPredictionCol === "rawPrediction") assert(lr.getProbabilityCol === "probability") +assert(lr.getFamily === "auto") assert(!lr.isDefined(lr.weightCol)) assert(lr.getFitIntercept) assert(lr.getStandardization) -val model = lr.fit(dataset) -model.transform(dataset) +val model = lr.fit(smallBinaryDataset) +model.transform(smallBinaryDataset) .select("label", "probability", "prediction", "rawPrediction") .collect() assert(model.getThreshold === 0.5) @@ -100,17 +142,17 @@ class LogisticRegressionSuite test("empty probabilityCol") { val lr = new LogisticRegression().setProbabilityCol("") -val model = lr.fit(dataset) +val model = lr.fit(smallBinaryDataset) assert(model.hasSummary) // Validate that we
[3/3] spark git commit: [SPARK-17163][ML] Unified LogisticRegression interface
[SPARK-17163][ML] Unified LogisticRegression interface ## What changes were proposed in this pull request? Merge `MultinomialLogisticRegression` into `LogisticRegression` and remove `MultinomialLogisticRegression`. Marked as WIP because we should discuss the coefficients API in the model. See discussion below. JIRA: [SPARK-17163](https://issues.apache.org/jira/browse/SPARK-17163) ## How was this patch tested? Merged test suites and added some new unit tests. ## Design ### Switching between binomial and multinomial We default to automatically detecting whether we should run binomial or multinomial lor. We expose a new parameter called `family` which defaults to auto. When "auto" is used, we run normal binomial lor with pivoting if there are 1 or 2 label classes. Otherwise, we run multinomial. If the user explicitly sets the family, then we abide by that setting. In the case where "binomial" is set but multiclass lor is detected, we throw an error. ### coefficients/intercept model API (TODO) This is the biggest design point remaining, IMO. We need to decide how to store the coefficients and intercepts in the model, and in turn how to expose them via the API. Two important points: * We must maintain compatibility with the old API, i.e. we must expose `def coefficients: Vector` and `def intercept: Double` * There are two separate cases: binomial lr where we have a single set of coefficients and a single intercept and multinomial lr where we have `numClasses` sets of coefficients and `numClasses` intercepts. Some options: 1. **Store the binomial coefficients as a `2 x numFeatures` matrix.** This means that we would center the model coefficients before storing them in the model. The BLOR algorithm gives `1 * numFeatures` coefficients, but we would convert them to `2 x numFeatures` coefficients before storing them, effectively doubling the storage in the model. This has the advantage that we can make the code cleaner (i.e. less `if (isMultinomial) ... else ...`) and we don't have to reason about the different cases as much. It has the disadvantage that we double the storage space and we could see small regressions at prediction time since there are 2x the number of operations in the prediction algorithms. Additionally, we still have to produce the uncentered coefficients/intercept via the API, so we will have to either ALSO store the uncentered version, or compute it in `def coefficients: Vector` every time. 2. **Store the binomial coefficients as a `1 x numFeatures` matrix.** We still store the coefficients as a matrix and the intercepts as a vector. When users call `coefficients` we return them a `Vector` that is backed by the same underlying array as the `coefficientMatrix`, so we don't duplicate any data. At prediction time, we use the old prediction methods that are specialized for binary LOR. The benefits here are that we don't store extra data, and we won't see any regressions in performance. The cost of this is that we have separate implementations for predict methods in the binary vs multiclass case. The duplicated code is really not very high, but it's still a bit messy. If we do decide to store the 2x coefficients, we would likely want to see some performance tests to understand the potential regressions. **Update:** We have chosen option 2 ### Threshold/thresholds (TODO) Currently, when `threshold` is set we clear whatever value is in `thresholds` and when `thresholds` is set we clear whatever value is in `threshold`. [SPARK-11543](https://issues.apache.org/jira/browse/SPARK-11543) was created to prefer thresholds over threshold. We should decide if we should implement this behavior now or if we want to do it in a separate JIRA. **Update:** Let's leave it for a follow up PR ## Follow up * Summary model for multiclass logistic regression [SPARK-17139](https://issues.apache.org/jira/browse/SPARK-17139) * Thresholds vs threshold [SPARK-11543](https://issues.apache.org/jira/browse/SPARK-11543) Author: sethahCloses #14834 from sethah/SPARK-17163. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/26145a5a Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/26145a5a Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/26145a5a Branch: refs/heads/master Commit: 26145a5af9a88053c0eaf280206ca2621c8919f6 Parents: e719b1c Author: sethah Authored: Mon Sep 19 21:33:54 2016 -0700 Committer: DB Tsai Committed: Mon Sep 19 21:33:54 2016 -0700 -- .../ml/classification/LogisticRegression.scala | 476 +-- .../MultinomialLogisticRegression.scala | 632 - .../ProbabilisticClassifier.scala | 22 +- .../classification/LogisticRegression.scala |6 +- .../LogisticRegressionSuite.scala
spark git commit: [SPARK-17160] Properly escape field names in code-generated error messages
Repository: spark Updated Branches: refs/heads/branch-2.0 c02bc926d -> 7026eb87e [SPARK-17160] Properly escape field names in code-generated error messages This patch addresses a corner-case escaping bug where field names which contain special characters were unsafely interpolated into error message string literals in generated Java code, leading to compilation errors. This patch addresses these issues by using `addReferenceObj` to store the error messages as string fields rather than inline string constants. Author: Josh RosenCloses #15156 from JoshRosen/SPARK-17160. (cherry picked from commit e719b1c045ba185d242d21bbfcdee2c84dafc587) Signed-off-by: Josh Rosen Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7026eb87 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7026eb87 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7026eb87 Branch: refs/heads/branch-2.0 Commit: 7026eb87e7d7799d2818334a2e191dc46987975f Parents: c02bc92 Author: Josh Rosen Authored: Mon Sep 19 20:20:36 2016 -0700 Committer: Josh Rosen Committed: Mon Sep 19 20:21:25 2016 -0700 -- .../apache/spark/sql/catalyst/expressions/misc.scala | 12 +--- .../sql/catalyst/expressions/objects/objects.scala | 12 .../sql/catalyst/expressions/CodeGenerationSuite.scala | 13 - 3 files changed, 29 insertions(+), 8 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/7026eb87/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/misc.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/misc.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/misc.scala index 3692075..92f8fb8 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/misc.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/misc.scala @@ -477,10 +477,13 @@ case class PrintToStderr(child: Expression) extends UnaryExpression { protected override def nullSafeEval(input: Any): Any = input + private val outputPrefix = s"Result of ${child.simpleString} is " + override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { +val outputPrefixField = ctx.addReferenceObj("outputPrefix", outputPrefix) nullSafeCodeGen(ctx, ev, c => s""" - | System.err.println("Result of ${child.simpleString} is " + $c); + | System.err.println($outputPrefixField + $c); | ${ev.value} = $c; """.stripMargin) } @@ -501,10 +504,12 @@ case class AssertTrue(child: Expression) extends UnaryExpression with ImplicitCa override def prettyName: String = "assert_true" + private val errMsg = s"'${child.simpleString}' is not true!" + override def eval(input: InternalRow) : Any = { val v = child.eval(input) if (v == null || java.lang.Boolean.FALSE.equals(v)) { - throw new RuntimeException(s"'${child.simpleString}' is not true!") + throw new RuntimeException(errMsg) } else { null } @@ -512,9 +517,10 @@ case class AssertTrue(child: Expression) extends UnaryExpression with ImplicitCa override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { val eval = child.genCode(ctx) +val errMsgField = ctx.addReferenceObj("errMsg", errMsg) ExprCode(code = s"""${eval.code} |if (${eval.isNull} || !${eval.value}) { - | throw new RuntimeException("'${child.simpleString}' is not true."); + | throw new RuntimeException($errMsgField); |}""".stripMargin, isNull = "true", value = "null") } http://git-wip-us.apache.org/repos/asf/spark/blob/7026eb87/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala index 1cdda53..691edd5 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala @@ -763,7 +763,10 @@ case class GetExternalRowField( override def eval(input: InternalRow): Any = throw new UnsupportedOperationException("Only code-generated evaluation is supported") + private val errMsg = s"The ${index}th field '$fieldName' of input row cannot be null." + override def doGenCode(ctx: CodegenContext, ev: ExprCode):
spark git commit: [SPARK-17160] Properly escape field names in code-generated error messages
Repository: spark Updated Branches: refs/heads/master d8104158a -> e719b1c04 [SPARK-17160] Properly escape field names in code-generated error messages This patch addresses a corner-case escaping bug where field names which contain special characters were unsafely interpolated into error message string literals in generated Java code, leading to compilation errors. This patch addresses these issues by using `addReferenceObj` to store the error messages as string fields rather than inline string constants. Author: Josh RosenCloses #15156 from JoshRosen/SPARK-17160. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e719b1c0 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e719b1c0 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e719b1c0 Branch: refs/heads/master Commit: e719b1c045ba185d242d21bbfcdee2c84dafc587 Parents: d810415 Author: Josh Rosen Authored: Mon Sep 19 20:20:36 2016 -0700 Committer: Josh Rosen Committed: Mon Sep 19 20:20:36 2016 -0700 -- .../apache/spark/sql/catalyst/expressions/misc.scala | 12 +--- .../sql/catalyst/expressions/objects/objects.scala | 12 .../sql/catalyst/expressions/CodeGenerationSuite.scala | 13 - 3 files changed, 29 insertions(+), 8 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/e719b1c0/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/misc.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/misc.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/misc.scala index 3692075..92f8fb8 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/misc.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/misc.scala @@ -477,10 +477,13 @@ case class PrintToStderr(child: Expression) extends UnaryExpression { protected override def nullSafeEval(input: Any): Any = input + private val outputPrefix = s"Result of ${child.simpleString} is " + override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { +val outputPrefixField = ctx.addReferenceObj("outputPrefix", outputPrefix) nullSafeCodeGen(ctx, ev, c => s""" - | System.err.println("Result of ${child.simpleString} is " + $c); + | System.err.println($outputPrefixField + $c); | ${ev.value} = $c; """.stripMargin) } @@ -501,10 +504,12 @@ case class AssertTrue(child: Expression) extends UnaryExpression with ImplicitCa override def prettyName: String = "assert_true" + private val errMsg = s"'${child.simpleString}' is not true!" + override def eval(input: InternalRow) : Any = { val v = child.eval(input) if (v == null || java.lang.Boolean.FALSE.equals(v)) { - throw new RuntimeException(s"'${child.simpleString}' is not true!") + throw new RuntimeException(errMsg) } else { null } @@ -512,9 +517,10 @@ case class AssertTrue(child: Expression) extends UnaryExpression with ImplicitCa override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { val eval = child.genCode(ctx) +val errMsgField = ctx.addReferenceObj("errMsg", errMsg) ExprCode(code = s"""${eval.code} |if (${eval.isNull} || !${eval.value}) { - | throw new RuntimeException("'${child.simpleString}' is not true."); + | throw new RuntimeException($errMsgField); |}""".stripMargin, isNull = "true", value = "null") } http://git-wip-us.apache.org/repos/asf/spark/blob/e719b1c0/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala index 4da74a0..faf8fec 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala @@ -938,7 +938,10 @@ case class GetExternalRowField( override def eval(input: InternalRow): Any = throw new UnsupportedOperationException("Only code-generated evaluation is supported") + private val errMsg = s"The ${index}th field '$fieldName' of input row cannot be null." + override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { +val errMsgField = ctx.addReferenceObj("errMsg", errMsg) val row = child.genCode(ctx) val code = s"""
spark git commit: [SPARK-17100] [SQL] fix Python udf in filter on top of outer join
Repository: spark Updated Branches: refs/heads/master e06320626 -> d8104158a [SPARK-17100] [SQL] fix Python udf in filter on top of outer join ## What changes were proposed in this pull request? In optimizer, we try to evaluate the condition to see whether it's nullable or not, but some expressions are not evaluable, we should check that before evaluate it. ## How was this patch tested? Added regression tests. Author: Davies LiuCloses #15103 from davies/udf_join. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d8104158 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d8104158 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d8104158 Branch: refs/heads/master Commit: d8104158a922d86dd4f00e50d5d7dddc7b777a21 Parents: e063206 Author: Davies Liu Authored: Mon Sep 19 13:24:16 2016 -0700 Committer: Davies Liu Committed: Mon Sep 19 13:24:16 2016 -0700 -- python/pyspark/sql/tests.py | 8 .../org/apache/spark/sql/catalyst/optimizer/joins.scala | 4 +++- 2 files changed, 11 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/d8104158/python/pyspark/sql/tests.py -- diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index 1be0b72..c2171c2 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -328,6 +328,14 @@ class SQLTests(ReusedPySparkTestCase): [row] = self.spark.sql("SELECT double(add(1, 2)), add(double(2), 1)").collect() self.assertEqual(tuple(row), (6, 5)) +def test_udf_in_filter_on_top_of_outer_join(self): +from pyspark.sql.functions import udf +left = self.spark.createDataFrame([Row(a=1)]) +right = self.spark.createDataFrame([Row(a=1)]) +df = left.join(right, on='a', how='left_outer') +df = df.withColumn('b', udf(lambda x: 'x')(df.a)) +self.assertEqual(df.filter('b = "x"').collect(), [Row(a=1, b='x')]) + def test_udf_without_arguments(self): self.spark.catalog.registerFunction("foo", lambda: "bar") [row] = self.spark.sql("SELECT foo()").collect() http://git-wip-us.apache.org/repos/asf/spark/blob/d8104158/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala index 1621bff..2626057 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala @@ -109,7 +109,9 @@ object EliminateOuterJoin extends Rule[LogicalPlan] with PredicateHelper { if (!e.deterministic || SubqueryExpression.hasCorrelatedSubquery(e)) return false val attributes = e.references.toSeq val emptyRow = new GenericInternalRow(attributes.length) -val v = BindReferences.bindReference(e, attributes).eval(emptyRow) +val boundE = BindReferences.bindReference(e, attributes) +if (boundE.find(_.isInstanceOf[Unevaluable]).isDefined) return false +val v = boundE.eval(emptyRow) v == null || v == false } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-17100] [SQL] fix Python udf in filter on top of outer join
Repository: spark Updated Branches: refs/heads/branch-2.0 fef3ec151 -> c02bc926d [SPARK-17100] [SQL] fix Python udf in filter on top of outer join ## What changes were proposed in this pull request? In optimizer, we try to evaluate the condition to see whether it's nullable or not, but some expressions are not evaluable, we should check that before evaluate it. ## How was this patch tested? Added regression tests. Author: Davies LiuCloses #15103 from davies/udf_join. (cherry picked from commit d8104158a922d86dd4f00e50d5d7dddc7b777a21) Signed-off-by: Davies Liu Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c02bc926 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c02bc926 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c02bc926 Branch: refs/heads/branch-2.0 Commit: c02bc926d71c406109870740dcdba68785a2c5d2 Parents: fef3ec1 Author: Davies Liu Authored: Mon Sep 19 13:24:16 2016 -0700 Committer: Davies Liu Committed: Mon Sep 19 13:24:25 2016 -0700 -- python/pyspark/sql/tests.py | 8 .../org/apache/spark/sql/catalyst/optimizer/joins.scala | 4 +++- 2 files changed, 11 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/c02bc926/python/pyspark/sql/tests.py -- diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index d3634fc..1ec40ce 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -323,6 +323,14 @@ class SQLTests(ReusedPySparkTestCase): [row] = self.spark.sql("SELECT double(add(1, 2)), add(double(2), 1)").collect() self.assertEqual(tuple(row), (6, 5)) +def test_udf_in_filter_on_top_of_outer_join(self): +from pyspark.sql.functions import udf +left = self.spark.createDataFrame([Row(a=1)]) +right = self.spark.createDataFrame([Row(a=1)]) +df = left.join(right, on='a', how='left_outer') +df = df.withColumn('b', udf(lambda x: 'x')(df.a)) +self.assertEqual(df.filter('b = "x"').collect(), [Row(a=1, b='x')]) + def test_udf_without_arguments(self): self.spark.catalog.registerFunction("foo", lambda: "bar") [row] = self.spark.sql("SELECT foo()").collect() http://git-wip-us.apache.org/repos/asf/spark/blob/c02bc926/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala index 158ad3d..ae4cd8e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala @@ -104,7 +104,9 @@ object EliminateOuterJoin extends Rule[LogicalPlan] with PredicateHelper { if (!e.deterministic || SubqueryExpression.hasCorrelatedSubquery(e)) return false val attributes = e.references.toSeq val emptyRow = new GenericInternalRow(attributes.length) -val v = BindReferences.bindReference(e, attributes).eval(emptyRow) +val boundE = BindReferences.bindReference(e, attributes) +if (boundE.find(_.isInstanceOf[Unevaluable]).isDefined) return false +val v = boundE.eval(emptyRow) v == null || v == false } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-16439] [SQL] bring back the separator in SQL UI
Repository: spark Updated Branches: refs/heads/branch-2.0 d6191a067 -> fef3ec151 [SPARK-16439] [SQL] bring back the separator in SQL UI ## What changes were proposed in this pull request? Currently, the SQL metrics looks like `number of rows: `, it's very hard to read how large the number is. So a separator was added by #12425, but removed by #14142, because the separator is weird in some locales (for example, pl_PL), this PR will add that back, but always use "," as the separator, since the SQL UI are all in English. ## How was this patch tested? Existing tests. ![metrics](https://cloud.githubusercontent.com/assets/40902/14573908/21ad2f00-030d-11e6-9e2c-c544f30039ea.png) Author: Davies LiuCloses #15106 from davies/metric_sep. (cherry picked from commit e0632062635c37cbc77df7ebd2a1846655193e12) Signed-off-by: Davies Liu Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fef3ec15 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fef3ec15 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fef3ec15 Branch: refs/heads/branch-2.0 Commit: fef3ec151a67348ce05fbbec95b74a0a4fe1aa4b Parents: d6191a0 Author: Davies Liu Authored: Mon Sep 19 11:49:03 2016 -0700 Committer: Davies Liu Committed: Mon Sep 19 11:49:34 2016 -0700 -- .../scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/fef3ec15/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala index 15afa0b..0cc1edd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.execution.metric import java.text.NumberFormat +import java.util.Locale import org.apache.spark.SparkContext import org.apache.spark.scheduler.AccumulableInfo @@ -101,8 +102,7 @@ object SQLMetrics { */ def stringValue(metricsType: String, values: Seq[Long]): String = { if (metricsType == SUM_METRIC) { - val numberFormat = NumberFormat.getInstance() - numberFormat.setGroupingUsed(false) + val numberFormat = NumberFormat.getIntegerInstance(Locale.ENGLISH) numberFormat.format(values.sum) } else { val strFormat: Long => String = if (metricsType == SIZE_METRIC) { - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-16439] [SQL] bring back the separator in SQL UI
Repository: spark Updated Branches: refs/heads/master 80d665592 -> e06320626 [SPARK-16439] [SQL] bring back the separator in SQL UI ## What changes were proposed in this pull request? Currently, the SQL metrics looks like `number of rows: `, it's very hard to read how large the number is. So a separator was added by #12425, but removed by #14142, because the separator is weird in some locales (for example, pl_PL), this PR will add that back, but always use "," as the separator, since the SQL UI are all in English. ## How was this patch tested? Existing tests. ![metrics](https://cloud.githubusercontent.com/assets/40902/14573908/21ad2f00-030d-11e6-9e2c-c544f30039ea.png) Author: Davies LiuCloses #15106 from davies/metric_sep. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e0632062 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e0632062 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e0632062 Branch: refs/heads/master Commit: e0632062635c37cbc77df7ebd2a1846655193e12 Parents: 80d6655 Author: Davies Liu Authored: Mon Sep 19 11:49:03 2016 -0700 Committer: Davies Liu Committed: Mon Sep 19 11:49:03 2016 -0700 -- .../scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/e0632062/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala index 15afa0b..0cc1edd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.execution.metric import java.text.NumberFormat +import java.util.Locale import org.apache.spark.SparkContext import org.apache.spark.scheduler.AccumulableInfo @@ -101,8 +102,7 @@ object SQLMetrics { */ def stringValue(metricsType: String, values: Seq[Long]): String = { if (metricsType == SUM_METRIC) { - val numberFormat = NumberFormat.getInstance() - numberFormat.setGroupingUsed(false) + val numberFormat = NumberFormat.getIntegerInstance(Locale.ENGLISH) numberFormat.format(values.sum) } else { val strFormat: Long => String = if (metricsType == SIZE_METRIC) { - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-17438][WEBUI] Show Application.executorLimit in the application page
Repository: spark Updated Branches: refs/heads/branch-2.0 f56035ba6 -> d6191a067 [SPARK-17438][WEBUI] Show Application.executorLimit in the application page ## What changes were proposed in this pull request? This PR adds `Application.executorLimit` to the applicatino page ## How was this patch tested? Checked the UI manually. Screenshots: 1. Dynamic allocation is disabled https://cloud.githubusercontent.com/assets/1000778/18332029/210056ea-7518-11e6-9f52-76d96046c1c0.png;> 2. Dynamic allocation is enabled. https://cloud.githubusercontent.com/assets/1000778/18332034/2c07700a-7518-11e6-8fce-aebe25014902.png;> Author: Shixiong ZhuCloses #15001 from zsxwing/fix-core-info. (cherry picked from commit 80d6655921bea9b1bb27c1d95c2b46654e7a8cca) Signed-off-by: Andrew Or Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d6191a06 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d6191a06 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d6191a06 Branch: refs/heads/branch-2.0 Commit: d6191a0671effe32f5c07397679c17a62e1cdaff Parents: f56035b Author: Shixiong Zhu Authored: Mon Sep 19 14:00:42 2016 -0400 Committer: Andrew Or Committed: Mon Sep 19 14:01:02 2016 -0400 -- .../apache/spark/deploy/master/ui/ApplicationPage.scala | 12 +++- core/src/main/scala/org/apache/spark/ui/ToolTips.scala | 6 ++ 2 files changed, 17 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/d6191a06/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala -- diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala index 8875fc2..18c5d0b 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala @@ -24,7 +24,7 @@ import scala.xml.Node import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, RequestMasterState} import org.apache.spark.deploy.ExecutorState import org.apache.spark.deploy.master.ExecutorDesc -import org.apache.spark.ui.{UIUtils, WebUIPage} +import org.apache.spark.ui.{ToolTips, UIUtils, WebUIPage} import org.apache.spark.util.Utils private[ui] class ApplicationPage(parent: MasterWebUI) extends WebUIPage("app") { @@ -70,6 +70,16 @@ private[ui] class ApplicationPage(parent: MasterWebUI) extends WebUIPage("app") } + +Executor Limit: +{ + if (app.executorLimit == Int.MaxValue) "Unlimited" else app.executorLimit +} +({app.executors.size} granted) + + + Executor Memory: {Utils.megabytesToString(app.desc.memoryPerExecutorMB)} http://git-wip-us.apache.org/repos/asf/spark/blob/d6191a06/core/src/main/scala/org/apache/spark/ui/ToolTips.scala -- diff --git a/core/src/main/scala/org/apache/spark/ui/ToolTips.scala b/core/src/main/scala/org/apache/spark/ui/ToolTips.scala index 2d2d80b..3cc5353 100644 --- a/core/src/main/scala/org/apache/spark/ui/ToolTips.scala +++ b/core/src/main/scala/org/apache/spark/ui/ToolTips.scala @@ -90,4 +90,10 @@ private[spark] object ToolTips { val TASK_TIME = "Shaded red when garbage collection (GC) time is over 10% of task time" + + val APPLICATION_EXECUTOR_LIMIT = +"""Maximum number of executors that this application will use. This limit is finite only when + dynamic allocation is enabled. The number of granted executors may exceed the limit + ephemerally when executors are being killed. +""" } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-17438][WEBUI] Show Application.executorLimit in the application page
Repository: spark Updated Branches: refs/heads/master cdea1d134 -> 80d665592 [SPARK-17438][WEBUI] Show Application.executorLimit in the application page ## What changes were proposed in this pull request? This PR adds `Application.executorLimit` to the applicatino page ## How was this patch tested? Checked the UI manually. Screenshots: 1. Dynamic allocation is disabled https://cloud.githubusercontent.com/assets/1000778/18332029/210056ea-7518-11e6-9f52-76d96046c1c0.png;> 2. Dynamic allocation is enabled. https://cloud.githubusercontent.com/assets/1000778/18332034/2c07700a-7518-11e6-8fce-aebe25014902.png;> Author: Shixiong ZhuCloses #15001 from zsxwing/fix-core-info. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/80d66559 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/80d66559 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/80d66559 Branch: refs/heads/master Commit: 80d6655921bea9b1bb27c1d95c2b46654e7a8cca Parents: cdea1d1 Author: Shixiong Zhu Authored: Mon Sep 19 14:00:42 2016 -0400 Committer: Andrew Or Committed: Mon Sep 19 14:00:42 2016 -0400 -- .../apache/spark/deploy/master/ui/ApplicationPage.scala | 12 +++- core/src/main/scala/org/apache/spark/ui/ToolTips.scala | 6 ++ 2 files changed, 17 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/80d66559/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala -- diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala index 17c521c..18cff31 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala @@ -24,7 +24,7 @@ import scala.xml.Node import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, RequestMasterState} import org.apache.spark.deploy.ExecutorState import org.apache.spark.deploy.master.ExecutorDesc -import org.apache.spark.ui.{UIUtils, WebUIPage} +import org.apache.spark.ui.{ToolTips, UIUtils, WebUIPage} import org.apache.spark.util.Utils private[ui] class ApplicationPage(parent: MasterWebUI) extends WebUIPage("app") { @@ -70,6 +70,16 @@ private[ui] class ApplicationPage(parent: MasterWebUI) extends WebUIPage("app") } + +Executor Limit: +{ + if (app.executorLimit == Int.MaxValue) "Unlimited" else app.executorLimit +} +({app.executors.size} granted) + + + Executor Memory: {Utils.megabytesToString(app.desc.memoryPerExecutorMB)} http://git-wip-us.apache.org/repos/asf/spark/blob/80d66559/core/src/main/scala/org/apache/spark/ui/ToolTips.scala -- diff --git a/core/src/main/scala/org/apache/spark/ui/ToolTips.scala b/core/src/main/scala/org/apache/spark/ui/ToolTips.scala index 2d2d80b..3cc5353 100644 --- a/core/src/main/scala/org/apache/spark/ui/ToolTips.scala +++ b/core/src/main/scala/org/apache/spark/ui/ToolTips.scala @@ -90,4 +90,10 @@ private[spark] object ToolTips { val TASK_TIME = "Shaded red when garbage collection (GC) time is over 10% of task time" + + val APPLICATION_EXECUTOR_LIMIT = +"""Maximum number of executors that this application will use. This limit is finite only when + dynamic allocation is enabled. The number of granted executors may exceed the limit + ephemerally when executors are being killed. +""" } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-17473][SQL] fixing docker integration tests error due to different versions of jars.
Repository: spark Updated Branches: refs/heads/branch-2.0 c4660d607 -> f56035ba6 [SPARK-17473][SQL] fixing docker integration tests error due to different versions of jars. ## What changes were proposed in this pull request? Docker tests are using older version of jersey jars (1.19), which was used in older releases of spark. In 2.0 releases Spark was upgraded to use 2.x verison of Jersey. After upgrade to new versions, docker tests are failing with AbstractMethodError. Now that spark is upgraded to 2.x jersey version, using of shaded docker jars may not be required any more. Removed the exclusions/overrides of jersey related classes from pom file, and changed the docker-client to use regular jar instead of shaded one. ## How was this patch tested? Tested using existing docker-integration-tests Author: sureshthalamatiCloses #15114 from sureshthalamati/docker_testfix-spark-17473. (cherry picked from commit cdea1d1343d02f0077e1f3c92ca46d04a3d30414) Signed-off-by: Josh Rosen Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f56035ba Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f56035ba Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f56035ba Branch: refs/heads/branch-2.0 Commit: f56035ba6c86fe93a45fd437f98f812431df0069 Parents: c4660d6 Author: sureshthalamati Authored: Mon Sep 19 09:56:16 2016 -0700 Committer: Josh Rosen Committed: Mon Sep 19 10:29:57 2016 -0700 -- external/docker-integration-tests/pom.xml | 68 -- pom.xml | 1 - 2 files changed, 69 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/f56035ba/external/docker-integration-tests/pom.xml -- diff --git a/external/docker-integration-tests/pom.xml b/external/docker-integration-tests/pom.xml index 18e14c7..0ca94e5 100644 --- a/external/docker-integration-tests/pom.xml +++ b/external/docker-integration-tests/pom.xml @@ -49,38 +49,7 @@ com.spotify docker-client - shaded test - - - - com.fasterxml.jackson.jaxrs - jackson-jaxrs-json-provider - - - com.fasterxml.jackson.datatype - jackson-datatype-guava - - - com.fasterxml.jackson.core - jackson-databind - - - org.glassfish.jersey.core - jersey-client - - - org.glassfish.jersey.connectors - jersey-apache-connector - - - org.glassfish.jersey.media - jersey-media-json-jackson - - org.apache.httpcomponents @@ -152,43 +121,6 @@ test - - - com.sun.jersey - jersey-server - 1.19 - test - - - com.sun.jersey - jersey-core - 1.19 - test - - - com.sun.jersey - jersey-servlet - 1.19 - test - - - com.sun.jersey - jersey-json - 1.19 - test - - - stax - stax-api - - - - -
spark git commit: [SPARK-17589][TEST][2.0] Fix test case `create external table` in MetastoreDataSourcesSuite
Repository: spark Updated Branches: refs/heads/branch-2.0 ac060397c -> c4660d607 [SPARK-17589][TEST][2.0] Fix test case `create external table` in MetastoreDataSourcesSuite ### What changes were proposed in this pull request? This PR is to fix a test failure on the branch 2.0 builds: https://amplab.cs.berkeley.edu/jenkins/view/Spark%20QA%20Test%20(Dashboard)/job/spark-branch-2.0-test-maven-hadoop-2.7/711/ ``` Error Message "Table `default`.`createdJsonTable` already exists.;" did not contain "Table default.createdJsonTable already exists." We should complain that createdJsonTable already exists ``` ### How was this patch tested? N/A Author: gatorsmileCloses #15145 from gatorsmile/fixTestCase. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c4660d60 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c4660d60 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c4660d60 Branch: refs/heads/branch-2.0 Commit: c4660d607fbeacc9bdbe2bb1293e4401d19a4bd5 Parents: ac06039 Author: gatorsmile Authored: Mon Sep 19 10:21:33 2016 -0700 Committer: Yin Huai Committed: Mon Sep 19 10:21:33 2016 -0700 -- .../spark/sql/execution/command/createDataSourceTables.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/c4660d60/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala index 06965ff..f282d54 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala @@ -82,7 +82,7 @@ case class CreateDataSourceTableCommand( if (ignoreIfExists) { return Seq.empty[Row] } else { -throw new AnalysisException(s"Table ${tableIdentWithDB.quotedString} already exists.") +throw new AnalysisException(s"Table ${tableIdentWithDB.unquotedString} already exists.") } } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-17473][SQL] fixing docker integration tests error due to different versions of jars.
Repository: spark Updated Branches: refs/heads/master d720a4019 -> cdea1d134 [SPARK-17473][SQL] fixing docker integration tests error due to different versions of jars. ## What changes were proposed in this pull request? Docker tests are using older version of jersey jars (1.19), which was used in older releases of spark. In 2.0 releases Spark was upgraded to use 2.x verison of Jersey. After upgrade to new versions, docker tests are failing with AbstractMethodError. Now that spark is upgraded to 2.x jersey version, using of shaded docker jars may not be required any more. Removed the exclusions/overrides of jersey related classes from pom file, and changed the docker-client to use regular jar instead of shaded one. ## How was this patch tested? Tested using existing docker-integration-tests Author: sureshthalamatiCloses #15114 from sureshthalamati/docker_testfix-spark-17473. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/cdea1d13 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/cdea1d13 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/cdea1d13 Branch: refs/heads/master Commit: cdea1d1343d02f0077e1f3c92ca46d04a3d30414 Parents: d720a40 Author: sureshthalamati Authored: Mon Sep 19 09:56:16 2016 -0700 Committer: Josh Rosen Committed: Mon Sep 19 09:56:16 2016 -0700 -- external/docker-integration-tests/pom.xml | 68 -- pom.xml | 1 - 2 files changed, 69 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/cdea1d13/external/docker-integration-tests/pom.xml -- diff --git a/external/docker-integration-tests/pom.xml b/external/docker-integration-tests/pom.xml index 7417199..57d553b 100644 --- a/external/docker-integration-tests/pom.xml +++ b/external/docker-integration-tests/pom.xml @@ -49,38 +49,7 @@ com.spotify docker-client - shaded test - - - - com.fasterxml.jackson.jaxrs - jackson-jaxrs-json-provider - - - com.fasterxml.jackson.datatype - jackson-datatype-guava - - - com.fasterxml.jackson.core - jackson-databind - - - org.glassfish.jersey.core - jersey-client - - - org.glassfish.jersey.connectors - jersey-apache-connector - - - org.glassfish.jersey.media - jersey-media-json-jackson - - org.apache.httpcomponents @@ -152,43 +121,6 @@ test - - - com.sun.jersey - jersey-server - 1.19 - test - - - com.sun.jersey - jersey-core - 1.19 - test - - - com.sun.jersey - jersey-servlet - 1.19 - test - - - com.sun.jersey - jersey-json - 1.19 - test - - - stax - stax-api - - - - -
spark git commit: [SPARK-17297][DOCS] Clarify window/slide duration as absolute time, not relative to a calendar
Repository: spark Updated Branches: refs/heads/branch-2.0 27ce39cf2 -> ac060397c [SPARK-17297][DOCS] Clarify window/slide duration as absolute time, not relative to a calendar ## What changes were proposed in this pull request? Clarify that slide and window duration are absolute, and not relative to a calendar. ## How was this patch tested? Doc build (no functional change) Author: Sean OwenCloses #15142 from srowen/SPARK-17297. (cherry picked from commit d720a4019460b6c284d0473249303c349df60a1f) Signed-off-by: Sean Owen Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ac060397 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ac060397 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ac060397 Branch: refs/heads/branch-2.0 Commit: ac060397c109158e84a2b57355c8dee5ab24ab7b Parents: 27ce39c Author: Sean Owen Authored: Mon Sep 19 09:38:25 2016 +0100 Committer: Sean Owen Committed: Mon Sep 19 09:38:36 2016 +0100 -- R/pkg/R/functions.R | 8 ++-- .../main/scala/org/apache/spark/sql/functions.scala | 15 +++ 2 files changed, 17 insertions(+), 6 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/ac060397/R/pkg/R/functions.R -- diff --git a/R/pkg/R/functions.R b/R/pkg/R/functions.R index ceedbe7..4d94b4c 100644 --- a/R/pkg/R/functions.R +++ b/R/pkg/R/functions.R @@ -2713,11 +2713,15 @@ setMethod("from_unixtime", signature(x = "Column"), #' @param x a time Column. Must be of TimestampType. #' @param windowDuration a string specifying the width of the window, e.g. '1 second', #' '1 day 12 hours', '2 minutes'. Valid interval strings are 'week', -#' 'day', 'hour', 'minute', 'second', 'millisecond', 'microsecond'. +#' 'day', 'hour', 'minute', 'second', 'millisecond', 'microsecond'. Note that +#' the duration is a fixed length of time, and does not vary over time +#' according to a calendar. For example, '1 day' always means 86,400,000 +#' milliseconds, not a calendar day. #' @param slideDuration a string specifying the sliding interval of the window. Same format as #' \code{windowDuration}. A new window will be generated every #' \code{slideDuration}. Must be less than or equal to -#' the \code{windowDuration}. +#' the \code{windowDuration}. This duration is likewise absolute, and does not +#' vary according to a calendar. #' @param startTime the offset with respect to 1970-01-01 00:00:00 UTC with which to start #' window intervals. For example, in order to have hourly tumbling windows #' that start 15 minutes past the hour, e.g. 12:15-13:15, 13:15-14:15... provide http://git-wip-us.apache.org/repos/asf/spark/blob/ac060397/sql/core/src/main/scala/org/apache/spark/sql/functions.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala index 4e185b8..eb504c8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala @@ -2596,12 +2596,15 @@ object functions { * The time column must be of TimestampType. * @param windowDuration A string specifying the width of the window, e.g. `10 minutes`, * `1 second`. Check [[org.apache.spark.unsafe.types.CalendarInterval]] for - * valid duration identifiers. + * valid duration identifiers. Note that the duration is a fixed length of + * time, and does not vary over time according to a calendar. For example, + * `1 day` always means 86,400,000 milliseconds, not a calendar day. * @param slideDuration A string specifying the sliding interval of the window, e.g. `1 minute`. * A new window will be generated every `slideDuration`. Must be less than * or equal to the `windowDuration`. Check * [[org.apache.spark.unsafe.types.CalendarInterval]] for valid duration - * identifiers. + * identifiers. This duration is likewise absolute, and does not vary +* according to a calendar. * @param startTime The offset with respect to
spark git commit: [SPARK-17297][DOCS] Clarify window/slide duration as absolute time, not relative to a calendar
Repository: spark Updated Branches: refs/heads/master 8f0c35a4d -> d720a4019 [SPARK-17297][DOCS] Clarify window/slide duration as absolute time, not relative to a calendar ## What changes were proposed in this pull request? Clarify that slide and window duration are absolute, and not relative to a calendar. ## How was this patch tested? Doc build (no functional change) Author: Sean OwenCloses #15142 from srowen/SPARK-17297. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d720a401 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d720a401 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d720a401 Branch: refs/heads/master Commit: d720a4019460b6c284d0473249303c349df60a1f Parents: 8f0c35a Author: Sean Owen Authored: Mon Sep 19 09:38:25 2016 +0100 Committer: Sean Owen Committed: Mon Sep 19 09:38:25 2016 +0100 -- R/pkg/R/functions.R | 8 ++-- .../main/scala/org/apache/spark/sql/functions.scala | 15 +++ 2 files changed, 17 insertions(+), 6 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/d720a401/R/pkg/R/functions.R -- diff --git a/R/pkg/R/functions.R b/R/pkg/R/functions.R index ceedbe7..4d94b4c 100644 --- a/R/pkg/R/functions.R +++ b/R/pkg/R/functions.R @@ -2713,11 +2713,15 @@ setMethod("from_unixtime", signature(x = "Column"), #' @param x a time Column. Must be of TimestampType. #' @param windowDuration a string specifying the width of the window, e.g. '1 second', #' '1 day 12 hours', '2 minutes'. Valid interval strings are 'week', -#' 'day', 'hour', 'minute', 'second', 'millisecond', 'microsecond'. +#' 'day', 'hour', 'minute', 'second', 'millisecond', 'microsecond'. Note that +#' the duration is a fixed length of time, and does not vary over time +#' according to a calendar. For example, '1 day' always means 86,400,000 +#' milliseconds, not a calendar day. #' @param slideDuration a string specifying the sliding interval of the window. Same format as #' \code{windowDuration}. A new window will be generated every #' \code{slideDuration}. Must be less than or equal to -#' the \code{windowDuration}. +#' the \code{windowDuration}. This duration is likewise absolute, and does not +#' vary according to a calendar. #' @param startTime the offset with respect to 1970-01-01 00:00:00 UTC with which to start #' window intervals. For example, in order to have hourly tumbling windows #' that start 15 minutes past the hour, e.g. 12:15-13:15, 13:15-14:15... provide http://git-wip-us.apache.org/repos/asf/spark/blob/d720a401/sql/core/src/main/scala/org/apache/spark/sql/functions.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala index 18e736a..960c87f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala @@ -2606,12 +2606,15 @@ object functions { * The time column must be of TimestampType. * @param windowDuration A string specifying the width of the window, e.g. `10 minutes`, * `1 second`. Check [[org.apache.spark.unsafe.types.CalendarInterval]] for - * valid duration identifiers. + * valid duration identifiers. Note that the duration is a fixed length of + * time, and does not vary over time according to a calendar. For example, + * `1 day` always means 86,400,000 milliseconds, not a calendar day. * @param slideDuration A string specifying the sliding interval of the window, e.g. `1 minute`. * A new window will be generated every `slideDuration`. Must be less than * or equal to the `windowDuration`. Check * [[org.apache.spark.unsafe.types.CalendarInterval]] for valid duration - * identifiers. + * identifiers. This duration is likewise absolute, and does not vary +* according to a calendar. * @param startTime The offset with respect to 1970-01-01 00:00:00 UTC with which to start * window intervals. For example, in order to have hourly