spark git commit: [SPARK-21789][PYTHON] Remove obsolete codes for parsing abstract schema strings
Repository: spark Updated Branches: refs/heads/master 5cd8ea99f -> 648a8626b [SPARK-21789][PYTHON] Remove obsolete codes for parsing abstract schema strings ## What changes were proposed in this pull request? This PR proposes to remove private functions that look not used in the main codes, `_split_schema_abstract`, `_parse_field_abstract`, `_parse_schema_abstract` and `_infer_schema_type`. ## How was this patch tested? Existing tests. Author: hyukjinkwon Closes #18647 from HyukjinKwon/remove-abstract. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/648a8626 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/648a8626 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/648a8626 Branch: refs/heads/master Commit: 648a8626b82d27d84db3e48bccfd73d020828586 Parents: 5cd8ea9 Author: hyukjinkwon Authored: Fri Sep 1 13:09:24 2017 +0900 Committer: hyukjinkwon Committed: Fri Sep 1 13:09:24 2017 +0900 -- python/pyspark/sql/tests.py | 10 --- python/pyspark/sql/types.py | 129 --- 2 files changed, 139 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/648a8626/python/pyspark/sql/tests.py -- diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index a2a3ceb..3d87ccf 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -894,16 +894,6 @@ class SQLTests(ReusedPySparkTestCase): self.assertEqual((126, -127, -32767, 32766, 2147483646, 2.5), tuple(r)) -from pyspark.sql.types import _parse_schema_abstract, _infer_schema_type -rdd = self.sc.parallelize([(127, -32768, 1.0, datetime(2010, 1, 1, 1, 1, 1), -{"a": 1}, (2,), [1, 2, 3])]) -abstract = "byte1 short1 float1 time1 map1{} struct1(b) list1[]" -schema = _parse_schema_abstract(abstract) -typedSchema = _infer_schema_type(rdd.first(), schema) -df = self.spark.createDataFrame(rdd, typedSchema) -r = (127, -32768, 1.0, datetime(2010, 1, 1, 1, 1, 1), {"a": 1}, Row(b=2), [1, 2, 3]) -self.assertEqual(r, tuple(df.first())) - def test_struct_in_map(self): d = [Row(m={Row(i=1): Row(s="")})] df = self.sc.parallelize(d).toDF() http://git-wip-us.apache.org/repos/asf/spark/blob/648a8626/python/pyspark/sql/types.py -- diff --git a/python/pyspark/sql/types.py b/python/pyspark/sql/types.py index ecb8eb9..51bf7be 100644 --- a/python/pyspark/sql/types.py +++ b/python/pyspark/sql/types.py @@ -1187,135 +1187,6 @@ def _create_converter(dataType): return convert_struct -def _split_schema_abstract(s): -""" -split the schema abstract into fields - ->>> _split_schema_abstract("a b c") -['a', 'b', 'c'] ->>> _split_schema_abstract("a(a b)") -['a(a b)'] ->>> _split_schema_abstract("a b[] c{a b}") -['a', 'b[]', 'c{a b}'] ->>> _split_schema_abstract(" ") -[] -""" - -r = [] -w = '' -brackets = [] -for c in s: -if c == ' ' and not brackets: -if w: -r.append(w) -w = '' -else: -w += c -if c in _BRACKETS: -brackets.append(c) -elif c in _BRACKETS.values(): -if not brackets or c != _BRACKETS[brackets.pop()]: -raise ValueError("unexpected " + c) - -if brackets: -raise ValueError("brackets not closed: %s" % brackets) -if w: -r.append(w) -return r - - -def _parse_field_abstract(s): -""" -Parse a field in schema abstract - ->>> _parse_field_abstract("a") -StructField(a,NullType,true) ->>> _parse_field_abstract("b(c d)") -StructField(b,StructType(...c,NullType,true),StructField(d... ->>> _parse_field_abstract("a[]") -StructField(a,ArrayType(NullType,true),true) ->>> _parse_field_abstract("a{[]}") -StructField(a,MapType(NullType,ArrayType(NullType,true),true),true) -""" -if set(_BRACKETS.keys()) & set(s): -idx = min((s.index(c) for c in _BRACKETS if c in s)) -name = s[:idx] -return StructField(name, _parse_schema_abstract(s[idx:]), True) -else: -return StructField(s, NullType(), True) - - -def _parse_schema_abstract(s): -""" -parse abstract into schema - ->>> _parse_schema_abstract("a b c") -StructType...a...b...c... ->>> _parse_schema_abstract("a[b c] b{}") -StructType...a,ArrayType...b...c...b,MapType... ->>> _parse_schema_abstract("c{} d{a b}") -StructType...c,MapType...d,MapType...a...b... ->>> _parse_schema_abstract("a b(t)").fields[1] -StructField(b,StructTyp
spark git commit: [SPARK-21779][PYTHON] Simpler DataFrame.sample API in Python
Repository: spark Updated Branches: refs/heads/master f5e10a34e -> 5cd8ea99f [SPARK-21779][PYTHON] Simpler DataFrame.sample API in Python ## What changes were proposed in this pull request? This PR make `DataFrame.sample(...)` can omit `withReplacement` defaulting `False`, consistently with equivalent Scala / Java API. In short, the following examples are allowed: ```python >>> df = spark.range(10) >>> df.sample(0.5).count() 7 >>> df.sample(fraction=0.5).count() 3 >>> df.sample(0.5, seed=42).count() 5 >>> df.sample(fraction=0.5, seed=42).count() 5 ``` In addition, this PR also adds some type checking logics as below: ```python >>> df = spark.range(10) >>> df.sample().count() ... TypeError: withReplacement (optional), fraction (required) and seed (optional) should be a bool, float and number; however, got []. >>> df.sample(True).count() ... TypeError: withReplacement (optional), fraction (required) and seed (optional) should be a bool, float and number; however, got []. >>> df.sample(42).count() ... TypeError: withReplacement (optional), fraction (required) and seed (optional) should be a bool, float and number; however, got []. >>> df.sample(fraction=False, seed="a").count() ... TypeError: withReplacement (optional), fraction (required) and seed (optional) should be a bool, float and number; however, got [, ]. >>> df.sample(seed=[1]).count() ... TypeError: withReplacement (optional), fraction (required) and seed (optional) should be a bool, float and number; however, got []. >>> df.sample(withReplacement="a", fraction=0.5, seed=1) ... TypeError: withReplacement (optional), fraction (required) and seed (optional) should be a bool, float and number; however, got [, , ]. ``` ## How was this patch tested? Manually tested, unit tests added in doc tests and manually checked the built documentation for Python. Author: hyukjinkwon Closes #18999 from HyukjinKwon/SPARK-21779. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5cd8ea99 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5cd8ea99 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5cd8ea99 Branch: refs/heads/master Commit: 5cd8ea99f084bee40ee18a0c8e33d0ca0aa6bb60 Parents: f5e10a3 Author: hyukjinkwon Authored: Fri Sep 1 13:01:23 2017 +0900 Committer: hyukjinkwon Committed: Fri Sep 1 13:01:23 2017 +0900 -- python/pyspark/sql/dataframe.py | 64 +--- python/pyspark/sql/tests.py | 18 ++ .../scala/org/apache/spark/sql/Dataset.scala| 3 +- 3 files changed, 77 insertions(+), 8 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/5cd8ea99/python/pyspark/sql/dataframe.py -- diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py index d1b2a9c..c19e599 100644 --- a/python/pyspark/sql/dataframe.py +++ b/python/pyspark/sql/dataframe.py @@ -659,19 +659,69 @@ class DataFrame(object): return DataFrame(self._jdf.distinct(), self.sql_ctx) @since(1.3) -def sample(self, withReplacement, fraction, seed=None): +def sample(self, withReplacement=None, fraction=None, seed=None): """Returns a sampled subset of this :class:`DataFrame`. +:param withReplacement: Sample with replacement or not (default False). +:param fraction: Fraction of rows to generate, range [0.0, 1.0]. +:param seed: Seed for sampling (default a random seed). + .. note:: This is not guaranteed to provide exactly the fraction specified of the total count of the given :class:`DataFrame`. ->>> df.sample(False, 0.5, 42).count() -2 +.. note:: `fraction` is required and, `withReplacement` and `seed` are optional. + +>>> df = spark.range(10) +>>> df.sample(0.5, 3).count() +4 +>>> df.sample(fraction=0.5, seed=3).count() +4 +>>> df.sample(withReplacement=True, fraction=0.5, seed=3).count() +1 +>>> df.sample(1.0).count() +10 +>>> df.sample(fraction=1.0).count() +10 +>>> df.sample(False, fraction=1.0).count() +10 """ -assert fraction >= 0.0, "Negative fraction value: %s" % fraction -seed = seed if seed is not None else random.randint(0, sys.maxsize) -rdd = self._jdf.sample(withReplacement, fraction, long(seed)) -return DataFrame(rdd, self.sql_ctx) + +# For the cases below: +# sample(True, 0.5 [, seed]) +# sample(True, fraction=0.5 [, seed]) +# sample(withReplacement=False, fraction=0.5 [, seed]) +is_withReplacement_set = \ +type(withReplacement) == bool and isinstance(fraction, float) + +# For the case
spark git commit: [SPARK-21862][ML] Add overflow check in PCA
Repository: spark Updated Branches: refs/heads/master 96028e36b -> f5e10a34e [SPARK-21862][ML] Add overflow check in PCA ## What changes were proposed in this pull request? add overflow check in PCA, otherwise it is possible to throw `NegativeArraySizeException` when `k` and `numFeatures` are too large. The overflow checking formula is here: https://github.com/scalanlp/breeze/blob/master/math/src/main/scala/breeze/linalg/functions/svd.scala#L87 ## How was this patch tested? N/A Author: WeichenXu Closes #19078 from WeichenXu123/SVD_overflow_check. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f5e10a34 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f5e10a34 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f5e10a34 Branch: refs/heads/master Commit: f5e10a34e644edf3cbce9a7714d31bc433f3ccbd Parents: 96028e3 Author: WeichenXu Authored: Thu Aug 31 16:25:10 2017 -0700 Committer: Joseph K. Bradley Committed: Thu Aug 31 16:25:10 2017 -0700 -- .../org/apache/spark/mllib/feature/PCA.scala | 19 +++ .../apache/spark/mllib/feature/PCASuite.scala| 6 ++ 2 files changed, 25 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/f5e10a34/mllib/src/main/scala/org/apache/spark/mllib/feature/PCA.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/mllib/feature/PCA.scala b/mllib/src/main/scala/org/apache/spark/mllib/feature/PCA.scala index aaecfa8..a01503f 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/feature/PCA.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/feature/PCA.scala @@ -44,6 +44,11 @@ class PCA @Since("1.4.0") (@Since("1.4.0") val k: Int) { require(k <= numFeatures, s"source vector size $numFeatures must be no less than k=$k") +require(PCAUtil.memoryCost(k, numFeatures) < Int.MaxValue, + "The param k and numFeatures is too large for SVD computation. " + + "Try reducing the parameter k for PCA, or reduce the input feature " + + "vector dimension to make this tractable.") + val mat = new RowMatrix(sources) val (pc, explainedVariance) = mat.computePrincipalComponentsAndExplainedVariance(k) val densePC = pc match { @@ -110,3 +115,17 @@ class PCAModel private[spark] ( } } } + +private[feature] object PCAUtil { + + // This memory cost formula is from breeze code: + // https://github.com/scalanlp/breeze/blob/ + // 6e541be066d547a097f5089165cd7c38c3ca276d/math/src/main/scala/breeze/linalg/ + // functions/svd.scala#L87 + def memoryCost(k: Int, numFeatures: Int): Long = { +3L * math.min(k, numFeatures) * math.min(k, numFeatures) ++ math.max(math.max(k, numFeatures), 4L * math.min(k, numFeatures) +* math.min(k, numFeatures) + 4L * math.min(k, numFeatures)) + } + +} http://git-wip-us.apache.org/repos/asf/spark/blob/f5e10a34/mllib/src/test/scala/org/apache/spark/mllib/feature/PCASuite.scala -- diff --git a/mllib/src/test/scala/org/apache/spark/mllib/feature/PCASuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/feature/PCASuite.scala index 2f90afd..8eab124 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/feature/PCASuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/feature/PCASuite.scala @@ -48,4 +48,10 @@ class PCASuite extends SparkFunSuite with MLlibTestSparkContext { } assert(pca.explainedVariance ~== explainedVariance relTol 1e-8) } + + test("memory cost computation") { +assert(PCAUtil.memoryCost(10, 100) < Int.MaxValue) +// check overflowing +assert(PCAUtil.memoryCost(4, 6) > Int.MaxValue) + } } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-17139][ML][FOLLOW-UP] Add convenient method `asBinary` for casting to BinaryLogisticRegressionSummary
Repository: spark Updated Branches: refs/heads/master cba69aeb4 -> 96028e36b [SPARK-17139][ML][FOLLOW-UP] Add convenient method `asBinary` for casting to BinaryLogisticRegressionSummary ## What changes were proposed in this pull request? add an "asBinary" method to LogisticRegressionSummary for convenient casting to BinaryLogisticRegressionSummary. ## How was this patch tested? Testcase updated. Author: WeichenXu Closes #19072 from WeichenXu123/mlor_summary_as_binary. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/96028e36 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/96028e36 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/96028e36 Branch: refs/heads/master Commit: 96028e36b4d08427fdd94df55595849c2346ead4 Parents: cba69ae Author: WeichenXu Authored: Thu Aug 31 16:22:40 2017 -0700 Committer: Joseph K. Bradley Committed: Thu Aug 31 16:22:40 2017 -0700 -- .../spark/ml/classification/LogisticRegression.scala | 11 +++ .../ml/classification/LogisticRegressionSuite.scala | 6 ++ project/MimaExcludes.scala | 1 + 3 files changed, 18 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/96028e36/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala index 1869d51..f491a67 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala @@ -1473,6 +1473,17 @@ sealed trait LogisticRegressionSummary extends Serializable { /** Returns weighted averaged f1-measure. */ @Since("2.3.0") def weightedFMeasure: Double = multiclassMetrics.weightedFMeasure(1.0) + + /** + * Convenient method for casting to binary logistic regression summary. + * This method will throws an Exception if the summary is not a binary summary. + */ + @Since("2.3.0") + def asBinary: BinaryLogisticRegressionSummary = this match { +case b: BinaryLogisticRegressionSummary => b +case _ => + throw new RuntimeException("Cannot cast to a binary summary.") + } } /** http://git-wip-us.apache.org/repos/asf/spark/blob/96028e36/mllib/src/test/scala/org/apache/spark/ml/classification/LogisticRegressionSuite.scala -- diff --git a/mllib/src/test/scala/org/apache/spark/ml/classification/LogisticRegressionSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/classification/LogisticRegressionSuite.scala index 6649fa4..6bf1253 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/classification/LogisticRegressionSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/classification/LogisticRegressionSuite.scala @@ -256,6 +256,7 @@ class LogisticRegressionSuite val blorModel = lr.fit(smallBinaryDataset) assert(blorModel.summary.isInstanceOf[BinaryLogisticRegressionTrainingSummary]) + assert(blorModel.summary.asBinary.isInstanceOf[BinaryLogisticRegressionSummary]) assert(blorModel.binarySummary.isInstanceOf[BinaryLogisticRegressionTrainingSummary]) val mlorModel = lr.setFamily("multinomial").fit(smallMultinomialDataset) @@ -265,6 +266,11 @@ class LogisticRegressionSuite mlorModel.binarySummary } } +withClue("cannot cast summary to binary summary multiclass model") { + intercept[RuntimeException] { +mlorModel.summary.asBinary + } +} val mlorBinaryModel = lr.setFamily("multinomial").fit(smallBinaryDataset) assert(mlorBinaryModel.summary.isInstanceOf[BinaryLogisticRegressionTrainingSummary]) http://git-wip-us.apache.org/repos/asf/spark/blob/96028e36/project/MimaExcludes.scala -- diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index eecda26..27e4183 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -62,6 +62,7 @@ object MimaExcludes { ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.ml.classification.LogisticRegressionSummary.weightedRecall"), ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.ml.classification.LogisticRegressionSummary.weightedPrecision"), ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.ml.classification.LogisticRegressionSummary.weightedFMeasure"), + ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.ml.classification.LogisticRegressionSumm
spark git commit: [SPARK-21110][SQL] Structs, arrays, and other orderable datatypes should be usable in inequalities
Repository: spark Updated Branches: refs/heads/master 7ce110828 -> cba69aeb4 [SPARK-21110][SQL] Structs, arrays, and other orderable datatypes should be usable in inequalities ## What changes were proposed in this pull request? Allows `BinaryComparison` operators to work on any data type that actually supports ordering as verified by `TypeUtils.checkForOrderingExpr` instead of relying on the incomplete list `TypeCollection.Ordered` (which is removed by this PR). ## How was this patch tested? Updated unit tests to cover structs and arrays. Author: Andrew Ray Closes #18818 from aray/SPARK-21110. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/cba69aeb Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/cba69aeb Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/cba69aeb Branch: refs/heads/master Commit: cba69aeb453d2489830f3e6e0473a64dee81989e Parents: 7ce1108 Author: Andrew Ray Authored: Thu Aug 31 15:08:03 2017 -0700 Committer: gatorsmile Committed: Thu Aug 31 15:08:03 2017 -0700 -- .../expressions/codegen/CodeGenerator.scala | 1 + .../sql/catalyst/expressions/predicates.scala | 58 +--- .../spark/sql/catalyst/util/TypeUtils.scala | 1 + .../spark/sql/types/AbstractDataType.scala | 12 .../catalyst/analysis/AnalysisErrorSuite.scala | 2 +- .../analysis/ExpressionTypeCheckingSuite.scala | 15 ++--- .../catalyst/expressions/PredicateSuite.scala | 37 +++-- 7 files changed, 58 insertions(+), 68 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/cba69aeb/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala index 3853863..4373971 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala @@ -594,6 +594,7 @@ class CodegenContext { case array: ArrayType => genComp(array, c1, c2) + " == 0" case struct: StructType => genComp(struct, c1, c2) + " == 0" case udt: UserDefinedType[_] => genEqual(udt.sqlType, c1, c2) +case NullType => "false" case _ => throw new IllegalArgumentException( "cannot generate equality code for un-comparable type: " + dataType.simpleString) http://git-wip-us.apache.org/repos/asf/spark/blob/cba69aeb/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala index 613d620..d3071c5 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala @@ -448,6 +448,16 @@ case class Or(left: Expression, right: Expression) extends BinaryOperator with P abstract class BinaryComparison extends BinaryOperator with Predicate { + // Note that we need to give a superset of allowable input types since orderable types are not + // finitely enumerable. The allowable types are checked below by checkInputDataTypes. + override def inputType: AbstractDataType = AnyDataType + + override def checkInputDataTypes(): TypeCheckResult = super.checkInputDataTypes() match { +case TypeCheckResult.TypeCheckSuccess => + TypeUtils.checkForOrderingExpr(left.dataType, this.getClass.getSimpleName) +case failure => failure + } + override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { if (ctx.isPrimitiveType(left.dataType) && left.dataType != BooleanType // java boolean doesn't support > or < operator @@ -460,7 +470,7 @@ abstract class BinaryComparison extends BinaryOperator with Predicate { } } - protected lazy val ordering = TypeUtils.getInterpretedOrdering(left.dataType) + protected lazy val ordering: Ordering[Any] = TypeUtils.getInterpretedOrdering(left.dataType) } @@ -478,28 +488,13 @@ object Equality { } } +// TODO: although map type is not orderable, technically map type should be able to be used +// in equality comparison @ExpressionDescription( usage = "expr1 _FUNC_ expr2 - Returns true if `expr1` equals `expr2`, or false otherwise.") case class EqualTo(left: Expression, right: Expression) exte
spark git commit: [SPARK-17107][SQL][FOLLOW-UP] Remove redundant pushdown rule for Union
Repository: spark Updated Branches: refs/heads/master 501370d9d -> 7ce110828 [SPARK-17107][SQL][FOLLOW-UP] Remove redundant pushdown rule for Union ## What changes were proposed in this pull request? Also remove useless function `partitionByDeterministic` after the changes of https://github.com/apache/spark/pull/14687 ## How was this patch tested? N/A Author: gatorsmile Closes #19097 from gatorsmile/followupSPARK-17107. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7ce11082 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7ce11082 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7ce11082 Branch: refs/heads/master Commit: 7ce110828608551f22f6cd2abdbd964844b45975 Parents: 501370d Author: gatorsmile Authored: Thu Aug 31 14:15:34 2017 -0700 Committer: gatorsmile Committed: Thu Aug 31 14:15:34 2017 -0700 -- .../spark/sql/catalyst/optimizer/Optimizer.scala | 15 --- 1 file changed, 15 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/7ce11082/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 75d83bc..b73f70a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -380,21 +380,6 @@ object PushProjectionThroughUnion extends Rule[LogicalPlan] with PredicateHelper result.asInstanceOf[A] } - /** - * Splits the condition expression into small conditions by `And`, and partition them by - * deterministic, and finally recombine them by `And`. It returns an expression containing - * all deterministic expressions (the first field of the returned Tuple2) and an expression - * containing all non-deterministic expressions (the second field of the returned Tuple2). - */ - private def partitionByDeterministic(condition: Expression): (Expression, Expression) = { -val andConditions = splitConjunctivePredicates(condition) -andConditions.partition(_.deterministic) match { - case (deterministic, nondeterministic) => -deterministic.reduceOption(And).getOrElse(Literal(true)) -> -nondeterministic.reduceOption(And).getOrElse(Literal(true)) -} - } - def apply(plan: LogicalPlan): LogicalPlan = plan transform { // Push down deterministic projection through UNION ALL - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-21583][HOTFIX] Removed intercept in test causing failures
Repository: spark Updated Branches: refs/heads/master fc45c2c88 -> 501370d9d [SPARK-21583][HOTFIX] Removed intercept in test causing failures Removing a check in the ColumnarBatchSuite that depended on a Java assertion. This assertion is being compiled out in the Maven builds causing the test to fail. This part of the test is not specifically from to the functionality that is being tested here. Author: Bryan Cutler Closes #19098 from BryanCutler/hotfix-ColumnarBatchSuite-assertion. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/501370d9 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/501370d9 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/501370d9 Branch: refs/heads/master Commit: 501370d9d54acea398ab86b5c45cedd5d9471f66 Parents: fc45c2c Author: Bryan Cutler Authored: Thu Aug 31 11:32:10 2017 -0700 Committer: gatorsmile Committed: Thu Aug 31 11:32:10 2017 -0700 -- .../spark/sql/execution/vectorized/ColumnarBatchSuite.scala | 4 1 file changed, 4 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/501370d9/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala index 1f21d3c..ebf7661 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala @@ -1308,10 +1308,6 @@ class ColumnarBatchSuite extends SparkFunSuite { } } -intercept[java.lang.AssertionError] { - batch.getRow(100) -} - batch.close() allocator.close() } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-20812][MESOS] Add secrets support to the dispatcher
Repository: spark Updated Branches: refs/heads/master 9696580c3 -> fc45c2c88 [SPARK-20812][MESOS] Add secrets support to the dispatcher Mesos has secrets primitives for environment and file-based secrets, this PR adds that functionality to the Spark dispatcher and the appropriate configuration flags. Unit tested and manually tested against a DC/OS cluster with Mesos 1.4. Author: ArtRand Closes #18837 from ArtRand/spark-20812-dispatcher-secrets-and-labels. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fc45c2c8 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fc45c2c8 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fc45c2c8 Branch: refs/heads/master Commit: fc45c2c88a838b8f46659ebad2a8f3a9923bc95f Parents: 9696580 Author: ArtRand Authored: Thu Aug 31 10:58:13 2017 -0700 Committer: Marcelo Vanzin Committed: Thu Aug 31 10:58:41 2017 -0700 -- dev/deps/spark-deps-hadoop-2.6 | 2 +- dev/deps/spark-deps-hadoop-2.7 | 2 +- docs/running-on-mesos.md| 43 - docs/security.md| 3 + resource-managers/mesos/pom.xml | 2 +- .../org/apache/spark/deploy/mesos/config.scala | 33 +++- .../cluster/mesos/MesosClusterScheduler.scala | 136 +++- .../mesos/MesosSchedulerBackendUtil.scala | 7 +- .../cluster/mesos/MesosSchedulerUtils.scala | 16 +- .../mesos/MesosClusterSchedulerSuite.scala | 162 ++- 10 files changed, 386 insertions(+), 20 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/fc45c2c8/dev/deps/spark-deps-hadoop-2.6 -- diff --git a/dev/deps/spark-deps-hadoop-2.6 b/dev/deps/spark-deps-hadoop-2.6 index de17507..e481b4d 100644 --- a/dev/deps/spark-deps-hadoop-2.6 +++ b/dev/deps/spark-deps-hadoop-2.6 @@ -138,7 +138,7 @@ lz4-java-1.4.0.jar machinist_2.11-0.6.1.jar macro-compat_2.11-1.1.1.jar mail-1.4.7.jar -mesos-1.0.0-shaded-protobuf.jar +mesos-1.3.0-shaded-protobuf.jar metrics-core-3.1.2.jar metrics-graphite-3.1.2.jar metrics-json-3.1.2.jar http://git-wip-us.apache.org/repos/asf/spark/blob/fc45c2c8/dev/deps/spark-deps-hadoop-2.7 -- diff --git a/dev/deps/spark-deps-hadoop-2.7 b/dev/deps/spark-deps-hadoop-2.7 index da826a7..b8046b1 100644 --- a/dev/deps/spark-deps-hadoop-2.7 +++ b/dev/deps/spark-deps-hadoop-2.7 @@ -139,7 +139,7 @@ lz4-java-1.4.0.jar machinist_2.11-0.6.1.jar macro-compat_2.11-1.1.1.jar mail-1.4.7.jar -mesos-1.0.0-shaded-protobuf.jar +mesos-1.3.0-shaded-protobuf.jar metrics-core-3.1.2.jar metrics-graphite-3.1.2.jar metrics-json-3.1.2.jar http://git-wip-us.apache.org/repos/asf/spark/blob/fc45c2c8/docs/running-on-mesos.md -- diff --git a/docs/running-on-mesos.md b/docs/running-on-mesos.md index c12b858..e0944bc 100644 --- a/docs/running-on-mesos.md +++ b/docs/running-on-mesos.md @@ -33,7 +33,8 @@ To get started, follow the steps below to install Mesos and deploy Spark jobs vi # Installing Mesos Spark {{site.SPARK_VERSION}} is designed for use with Mesos {{site.MESOS_VERSION}} or newer and does not -require any special patches of Mesos. +require any special patches of Mesos. File and environment-based secrets support requires Mesos 1.3.0 or +newer. If you already have a Mesos cluster running, you can skip this Mesos installation step. @@ -430,7 +431,8 @@ See the [configuration page](configuration.html) for information on Spark config spark.mesos.secret (none) -Set the secret with which Spark framework will use to authenticate with Mesos. +Set the secret with which Spark framework will use to authenticate with Mesos. Used, for example, when +authenticating with the registry. @@ -483,6 +485,43 @@ See the [configuration page](configuration.html) for information on Spark config + spark.mesos.driver.secret.envkeys + (none) + +A comma-separated list that, if set, the contents of the secret referenced +by spark.mesos.driver.secret.names or spark.mesos.driver.secret.values will be +set to the provided environment variable in the driver's process. + + + +spark.mesos.driver.secret.filenames + (none) + +A comma-separated list that, if set, the contents of the secret referenced by +spark.mesos.driver.secret.names or spark.mesos.driver.secret.values will be +written to the provided file. Paths are relative to the container's work +directory. Absolute paths must already exist. Consult the Mesos Secret +protobuf for more information. + + + + spark.mesos.driver.secret.names + (none) + +
spark git commit: [SPARK-21886][SQL] Use SparkSession.internalCreateDataFrame to create…
Repository: spark Updated Branches: refs/heads/master 19b0240d4 -> 9696580c3 [SPARK-21886][SQL] Use SparkSession.internalCreateDataFrame to create⦠⦠Dataset with LogicalRDD logical operator ## What changes were proposed in this pull request? Reusing `SparkSession.internalCreateDataFrame` wherever possible (to cut dups) ## How was this patch tested? Local build and waiting for Jenkins Author: Jacek Laskowski Closes #19095 from jaceklaskowski/SPARK-21886-internalCreateDataFrame. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9696580c Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9696580c Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9696580c Branch: refs/heads/master Commit: 9696580c33c68c3de32694fbefb93c509d525d94 Parents: 19b0240 Author: Jacek Laskowski Authored: Thu Aug 31 09:44:29 2017 -0700 Committer: gatorsmile Committed: Thu Aug 31 09:44:29 2017 -0700 -- .../org/apache/spark/sql/DataFrameReader.scala | 10 ++ .../scala/org/apache/spark/sql/SparkSession.scala | 16 2 files changed, 10 insertions(+), 16 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/9696580c/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala index 8209cec..4f375e5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala @@ -410,10 +410,7 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { parsedOptions.columnNameOfCorruptRecord) iter.flatMap(parser.parse) } - -Dataset.ofRows( - sparkSession, - LogicalRDD(schema.toAttributes, parsed, isStreaming = jsonDataset.isStreaming)(sparkSession)) +sparkSession.internalCreateDataFrame(parsed, schema, isStreaming = jsonDataset.isStreaming) } /** @@ -473,10 +470,7 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { parsedOptions.columnNameOfCorruptRecord) iter.flatMap(parser.parse) } - -Dataset.ofRows( - sparkSession, - LogicalRDD(schema.toAttributes, parsed, isStreaming = csvDataset.isStreaming)(sparkSession)) +sparkSession.internalCreateDataFrame(parsed, schema, isStreaming = csvDataset.isStreaming) } /** http://git-wip-us.apache.org/repos/asf/spark/blob/9696580c/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala index 863c316..d5ab53a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala @@ -559,8 +559,7 @@ class SparkSession private( } /** - * Creates a `DataFrame` from an RDD[Row]. - * User can specify whether the input rows should be converted to Catalyst rows. + * Creates a `DataFrame` from an `RDD[InternalRow]`. */ private[sql] def internalCreateDataFrame( catalystRows: RDD[InternalRow], @@ -576,7 +575,7 @@ class SparkSession private( } /** - * Creates a `DataFrame` from an RDD[Row]. + * Creates a `DataFrame` from an `RDD[Row]`. * User can specify whether the input rows should be converted to Catalyst rows. */ private[sql] def createDataFrame( @@ -589,10 +588,9 @@ class SparkSession private( val encoder = RowEncoder(schema) rowRDD.map(encoder.toRow) } else { - rowRDD.map{r: Row => InternalRow.fromSeq(r.toSeq)} + rowRDD.map { r: Row => InternalRow.fromSeq(r.toSeq) } } -val logicalPlan = LogicalRDD(schema.toAttributes, catalystRows)(self) -Dataset.ofRows(self, logicalPlan) +internalCreateDataFrame(catalystRows, schema) } @@ -737,13 +735,15 @@ class SparkSession private( } /** - * Apply a schema defined by the schema to an RDD. It is only used by PySpark. + * Apply `schema` to an RDD. + * + * @note Used by PySpark only */ private[sql] def applySchemaToPythonRDD( rdd: RDD[Array[Any]], schema: StructType): DataFrame = { val rowRdd = rdd.map(r => python.EvaluatePython.fromJava(r, schema).asInstanceOf[InternalRow]) -Dataset.ofRows(self, LogicalRDD(schema.toAttributes, rowRdd)(self)) +internalCreateDataFrame(rowRdd, schema) } /** - To unsubscribe, e-mail: commits-uns
spark git commit: [SPARK-21878][SQL][TEST] Create SQLMetricsTestUtils
Repository: spark Updated Branches: refs/heads/master 964b507c7 -> 19b0240d4 [SPARK-21878][SQL][TEST] Create SQLMetricsTestUtils ## What changes were proposed in this pull request? Creates `SQLMetricsTestUtils` for the utility functions of both Hive-specific and the other SQLMetrics test cases. Also, move two SQLMetrics test cases from sql/hive to sql/core. ## How was this patch tested? N/A Author: gatorsmile Closes #19092 from gatorsmile/rewriteSQLMetrics. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/19b0240d Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/19b0240d Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/19b0240d Branch: refs/heads/master Commit: 19b0240d42f81008d918e7a67cb17647bca1500b Parents: 964b507 Author: gatorsmile Authored: Thu Aug 31 09:16:26 2017 -0700 Committer: gatorsmile Committed: Thu Aug 31 09:16:26 2017 -0700 -- .../sql/execution/metric/SQLMetricsSuite.scala | 151 +-- .../execution/metric/SQLMetricsTestUtils.scala | 270 +++ .../sql/hive/execution/SQLMetricsSuite.scala| 106 +--- 3 files changed, 277 insertions(+), 250 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/19b0240d/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala index fd79323..0dc612e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala @@ -19,99 +19,21 @@ package org.apache.spark.sql.execution.metric import java.io.File -import scala.collection.mutable.HashMap import scala.util.Random import org.apache.spark.SparkFunSuite -import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd} import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.plans.logical.LocalRelation -import org.apache.spark.sql.execution.SparkPlanInfo -import org.apache.spark.sql.execution.ui.SparkPlanGraph import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.util.{AccumulatorContext, JsonProtocol} -class SQLMetricsSuite extends SparkFunSuite with SharedSQLContext { +class SQLMetricsSuite extends SparkFunSuite with SQLMetricsTestUtils with SharedSQLContext { import testImplicits._ /** - * Call `df.collect()` and collect necessary metrics from execution data. - * - * @param df `DataFrame` to run - * @param expectedNumOfJobs number of jobs that will run - * @param expectedNodeIds the node ids of the metrics to collect from execution data. - */ - private def getSparkPlanMetrics( - df: DataFrame, - expectedNumOfJobs: Int, - expectedNodeIds: Set[Long], - enableWholeStage: Boolean = false): Option[Map[Long, (String, Map[String, Any])]] = { -val previousExecutionIds = spark.sharedState.listener.executionIdToData.keySet -withSQLConf("spark.sql.codegen.wholeStage" -> enableWholeStage.toString) { - df.collect() -} -sparkContext.listenerBus.waitUntilEmpty(1) -val executionIds = - spark.sharedState.listener.executionIdToData.keySet.diff(previousExecutionIds) -assert(executionIds.size === 1) -val executionId = executionIds.head -val jobs = spark.sharedState.listener.getExecution(executionId).get.jobs -// Use "<=" because there is a race condition that we may miss some jobs -// TODO Change it to "=" once we fix the race condition that missing the JobStarted event. -assert(jobs.size <= expectedNumOfJobs) -if (jobs.size == expectedNumOfJobs) { - // If we can track all jobs, check the metric values - val metricValues = spark.sharedState.listener.getExecutionMetrics(executionId) - val metrics = SparkPlanGraph(SparkPlanInfo.fromSparkPlan( -df.queryExecution.executedPlan)).allNodes.filter { node => -expectedNodeIds.contains(node.id) - }.map { node => -val nodeMetrics = node.metrics.map { metric => - val metricValue = metricValues(metric.accumulatorId) - (metric.name, metricValue) -}.toMap -(node.id, node.name -> nodeMetrics) - }.toMap - Some(metrics) -} else { - // TODO Remove this "else" once we fix the race condition that missing the JobStarted event. - // Since we cannot track all jobs, the metric values could be wrong and we should not check - // them. - logWarning("Due to a race condition, we