[GitHub] spark issue #18659: [SPARK-21190][PYSPARK][WIP] Python Vectorized UDFs
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/18659 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/81955/ Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18659: [SPARK-21190][PYSPARK][WIP] Python Vectorized UDFs
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/18659 **[Test build #81955 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81955/testReport)** for PR 18659 at commit [`f451d65`](https://github.com/apache/spark/commit/f451d652a2656113cce1f0763e17c73ed2d03c44). * This patch **fails PySpark unit tests**. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19259: [BACKPORT-2.1][SPARK-19318][SPARK-22041][SQL] Doc...
Github user wangyum closed the pull request at: https://github.com/apache/spark/pull/19259 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19284: [SPARK-22067][SQL] ArrowWriter should use positio...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/19284 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19284: [SPARK-22067][SQL] ArrowWriter should use position when ...
Github user ueshin commented on the issue: https://github.com/apache/spark/pull/19284 Thanks! merging to master. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15544: [SPARK-17997] [SQL] Add an aggregation function f...
Github user wzhfy commented on a diff in the pull request: https://github.com/apache/spark/pull/15544#discussion_r139859176 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/aggregate/ApproxCountDistinctForIntervalsSuite.scala --- @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions.aggregate + +import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.analysis.TypeCheckResult.TypeCheckFailure +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, BoundReference, CreateArray, Literal, SpecificInternalRow} +import org.apache.spark.sql.catalyst.util.ArrayData +import org.apache.spark.sql.types._ + +class ApproxCountDistinctForIntervalsSuite extends SparkFunSuite { + + test("fails analysis if parameters are invalid") { +def assertEqual[T](left: T, right: T): Unit = { --- End diff -- oh, I'll remove this. Previously I put some other logic here, but we should remove it now. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19196: [SPARK-21977] SinglePartition optimizations break certai...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19196 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19196: [SPARK-21977] SinglePartition optimizations break certai...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19196 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/81954/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19196: [SPARK-21977] SinglePartition optimizations break certai...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19196 **[Test build #81954 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81954/testReport)** for PR 19196 at commit [`4eb7f4f`](https://github.com/apache/spark/commit/4eb7f4f6df3f2d5ae831bf15715651598e52c3e6). * This patch passes all tests. * This patch merges cleanly. * This patch adds the following public classes _(experimental)_: * `case class TestStatefulOperator(` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19278: [SPARK-22060][ML] Fix CrossValidator/TrainValidationSpli...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19278 **[Test build #81959 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81959/testReport)** for PR 19278 at commit [`cc30578`](https://github.com/apache/spark/commit/cc30578d2d25d3345821793fcf2ce030cf991a92). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19208: [SPARK-21087] [ML] CrossValidator, TrainValidationSplit ...
Github user WeichenXu123 commented on the issue: https://github.com/apache/spark/pull/19208 @smurching I will update this PR after #19278 merged. Because now this PR depend on that one. Thanks! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19284: [SPARK-22067][SQL] ArrowWriter should use position when ...
Github user ueshin commented on the issue: https://github.com/apache/spark/pull/19284 LGTM. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19278: [SPARK-22060][ML] Fix CrossValidator/TrainValidationSpli...
Github user WeichenXu123 commented on the issue: https://github.com/apache/spark/pull/19278 @BryanCutler The reason I add `skipParams` is that, if we don't use `DefaultParamReader.getAndSetParams`, we have to hardcoding all params which are very troublesome. And every time we add new params, we have to also update the hardcoding params, it is very easy to forget and cause bug. But if use `DefaultParamReader.getAndSetParams` but do not support `skipParams`, the `estimatorParamMaps` is difficult to handle. So this design is a balance of this concerns, although it makes the code a little weird. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18659: [SPARK-21190][PYSPARK][WIP] Python Vectorized UDF...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/18659#discussion_r139856165 --- Diff: python/pyspark/sql/functions.py --- @@ -2142,18 +2159,26 @@ def udf(f=None, returnType=StringType()): | 8| JOHN DOE| 22| +--+--++ """ -def _udf(f, returnType=StringType()): -udf_obj = UserDefinedFunction(f, returnType) -return udf_obj._wrapped() +return _create_udf(f, returnType=returnType, vectorized=False) -# decorator @udf, @udf() or @udf(dataType()) -if f is None or isinstance(f, (str, DataType)): -# If DataType has been passed as a positional argument -# for decorator use it as a returnType -return_type = f or returnType -return functools.partial(_udf, returnType=return_type) + +@since(2.3) +def pandas_udf(f=None, returnType=StringType()): +""" +Creates a :class:`Column` expression representing a user defined function (UDF) that accepts +`Pandas.Series` as input arguments and outputs a `Pandas.Series` of the same length. + +:param f: python function if used as a standalone function +:param returnType: a :class:`pyspark.sql.types.DataType` object + +# TODO: doctest +""" +import inspect +# If function "f" does not define the optional kwargs, then wrap with a kwargs placeholder +if inspect.getargspec(f).keywords is None: +return _create_udf(lambda *a, **kwargs: f(*a), returnType=returnType, vectorized=True) --- End diff -- Ah, I was thinking that disallowing 0-parameter panda_udf could be an option ... --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18659: [SPARK-21190][PYSPARK][WIP] Python Vectorized UDF...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/18659#discussion_r139855188 --- Diff: python/pyspark/serializers.py --- @@ -199,6 +211,46 @@ def __repr__(self): return "ArrowSerializer" +class ArrowPandasSerializer(ArrowSerializer): +""" +Serializes Pandas.Series as Arrow data. +""" + +def __init__(self): +super(ArrowPandasSerializer, self).__init__() + +def dumps(self, series): +""" +Make an ArrowRecordBatch from a Pandas Series and serialize. Input is a single series or +a list of series accompanied by an optional pyarrow type to coerce the data to. +""" +import pyarrow as pa +# Make input conform to [(series1, type1), (series2, type2), ...] +if not isinstance(series, (list, tuple)) or \ +(len(series) == 2 and isinstance(series[1], pa.DataType)): +series = [series] +series = [(s, None) if not isinstance(s, (list, tuple)) else s for s in series] --- End diff -- This might not be a big deal but .. I usually use generator if it iterates once and is discarded. This should consume less memory too as list comprehension should be evaluated once first. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19271: [SPARK-22053][SS] Stream-stream inner join in Append Mod...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19271 **[Test build #81958 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81958/testReport)** for PR 19271 at commit [`8e13959`](https://github.com/apache/spark/commit/8e139594ce164a18fa54df680afbc213691da081). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19278: [SPARK-22060][ML] Fix CrossValidator/TrainValidat...
Github user WeichenXu123 commented on a diff in the pull request: https://github.com/apache/spark/pull/19278#discussion_r139855087 --- Diff: mllib/src/main/scala/org/apache/spark/ml/tuning/CrossValidator.scala --- @@ -303,16 +304,16 @@ object CrossValidatorModel extends MLReadable[CrossValidatorModel] { val (metadata, estimator, evaluator, estimatorParamMaps) = ValidatorParams.loadImpl(path, sc, className) val numFolds = (metadata.params \ "numFolds").extract[Int] - val seed = (metadata.params \ "seed").extract[Long] val bestModelPath = new Path(path, "bestModel").toString val bestModel = DefaultParamsReader.loadParamsInstance[Model[_]](bestModelPath, sc) val avgMetrics = (metadata.metadata \ "avgMetrics").extract[Seq[Double]].toArray + val model = new CrossValidatorModel(metadata.uid, bestModel, avgMetrics) model.set(model.estimator, estimator) .set(model.evaluator, evaluator) .set(model.estimatorParamMaps, estimatorParamMaps) -.set(model.numFolds, numFolds) -.set(model.seed, seed) + DefaultParamsReader.getAndSetParams(model, metadata, skipParams = List("estimatorParamMaps")) --- End diff -- No. Because estimator and evaluator isn't included in metadata. You can check the saveImpl. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19278: [SPARK-22060][ML] Fix CrossValidator/TrainValidat...
Github user WeichenXu123 commented on a diff in the pull request: https://github.com/apache/spark/pull/19278#discussion_r139854984 --- Diff: mllib/src/main/scala/org/apache/spark/ml/tuning/CrossValidator.scala --- @@ -212,14 +213,12 @@ object CrossValidator extends MLReadable[CrossValidator] { val (metadata, estimator, evaluator, estimatorParamMaps) = ValidatorParams.loadImpl(path, sc, className) - val numFolds = (metadata.params \ "numFolds").extract[Int] - val seed = (metadata.params \ "seed").extract[Long] - new CrossValidator(metadata.uid) + val cv = new CrossValidator(metadata.uid) .setEstimator(estimator) .setEvaluator(evaluator) .setEstimatorParamMaps(estimatorParamMaps) -.setNumFolds(numFolds) -.setSeed(seed) + DefaultParamsReader.getAndSetParams(cv, metadata, skipParams = List("estimatorParamMaps")) --- End diff -- No. Because `estimator` and `evaluator` isn't included in metadata. You can check the `saveImpl`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19278: [SPARK-22060][ML] Fix CrossValidator/TrainValidat...
Github user WeichenXu123 commented on a diff in the pull request: https://github.com/apache/spark/pull/19278#discussion_r139854872 --- Diff: mllib/src/main/scala/org/apache/spark/ml/tuning/TrainValidationSplit.scala --- @@ -122,7 +123,7 @@ class TrainValidationSplit @Since("1.5.0") (@Since("1.5.0") override val uid: St // Fit models in a Future for training in parallel logDebug(s"Train split with multiple sets of parameters.") -val modelFutures = epm.map { paramMap => +val modelFutures = epm.map { case paramMap => --- End diff -- No. I will remove it. sorry. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19278: [SPARK-22060][ML] Fix CrossValidator/TrainValidat...
Github user WeichenXu123 commented on a diff in the pull request: https://github.com/apache/spark/pull/19278#discussion_r139854853 --- Diff: mllib/src/main/scala/org/apache/spark/ml/tuning/CrossValidator.scala --- @@ -283,6 +282,8 @@ object CrossValidatorModel extends MLReadable[CrossValidatorModel] { ValidatorParams.validateParams(instance) +protected var shouldPersistSubModels: Boolean = false + --- End diff -- Yes. I will remove it. sorry. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19270: [SPARK-21809] : Change Stage Page to use datatables to s...
Github user pgandhi999 commented on the issue: https://github.com/apache/spark/pull/19270 No problem. Thank you for your valuable comments. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19284: [SPARK-22067][SQL] ArrowWriter should use position when ...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19284 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19284: [SPARK-22067][SQL] ArrowWriter should use position when ...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19284 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/81952/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19284: [SPARK-22067][SQL] ArrowWriter should use position when ...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19284 **[Test build #81952 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81952/testReport)** for PR 19284 at commit [`5ac572d`](https://github.com/apache/spark/commit/5ac572d3cb2422f57e101fa2cbc761f4b748daa6). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18704: [SPARK-20783][SQL] Create ColumnVector to abstract exist...
Github user rxin commented on the issue: https://github.com/apache/spark/pull/18704 cc @michal-databricks any thoughts on this? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19285: [SPARK-22068][CORE]Reduce the duplicate code between put...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19285 Can one of the admins verify this patch? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19281: [SPARK-21998][SQL] SortMergeJoinExec did not calculate i...
Github user maropu commented on the issue: https://github.com/apache/spark/pull/19281 @gatorsmile @cloud-fan could you trigger tests if it is worth fixing? Thanks. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19285: [SPARK-22068][CORE]Reduce the duplicate code between put...
Github user ConeyLiu commented on the issue: https://github.com/apache/spark/pull/19285 Hi @cloud-fan @jiangxb1987 , would you mind take a look ? Thanks a lot. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19285: [SPARK-22068][CORE]Reduce the duplicate code betw...
GitHub user ConeyLiu opened a pull request: https://github.com/apache/spark/pull/19285 [SPARK-22068][CORE]Reduce the duplicate code between putIteratorAsValues and putIteratorAsBytes ## What changes were proposed in this pull request? The code logic between `MemoryStore.putIteratorAsValues` and `Memory.putIteratorAsBytes` are almost same, so we should reduce the duplicate code between them. ## How was this patch tested? Existing UT. You can merge this pull request into a Git repository by running: $ git pull https://github.com/ConeyLiu/spark rmemorystore Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/19285.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #19285 commit 2c20dcbcf499aee5d6fbbb80f4803b3cad37c17c Author: Xianyang Liu Date: 2017-09-17T09:53:49Z refactor memorystore commit 120564303641a92d32ec434dba5076771f6d6e80 Author: Xianyang Liu Date: 2017-09-19T08:47:24Z fix conflicts commit 92e1d51b18a810307a0b6d0cb761925a0429ead2 Author: Xianyang Liu Date: 2017-09-19T23:45:17Z fix bug and add some comments commit 6e2e29be7ad9d4bf3aae2d55fb4bf93c3286009b Author: Xianyang Liu Date: 2017-09-20T00:28:35Z better variable name --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19271: [SPARK-22053][SS] Stream-stream inner join in Append Mod...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19271 **[Test build #81957 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81957/testReport)** for PR 19271 at commit [`8a551d7`](https://github.com/apache/spark/commit/8a551d7fc045fd633cc15c14b27133a13eacb727). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19271: [SPARK-22053][SS] Stream-stream inner join in App...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/19271#discussion_r139852354 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExecHelper.scala --- @@ -0,0 +1,303 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import scala.util.control.NonFatal + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.expressions.{Add, Attribute, AttributeReference, BoundReference, Cast, CheckOverflow, Expression, ExpressionSet, GreaterThan, GreaterThanOrEqual, LessThan, LessThanOrEqual, Literal, Multiply, NamedExpression, PredicateHelper, Subtract, TimeAdd, TimeSub, UnaryMinus} +import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark._ +import org.apache.spark.sql.execution.streaming.WatermarkSupport.watermarkExpression +import org.apache.spark.sql.types._ +import org.apache.spark.unsafe.types.CalendarInterval + + +/** + * Helper object for [[StreamingSymmetricHashJoinExec]]. + */ +object StreamingSymmetricHashJoinExecHelper extends PredicateHelper with Logging { + + sealed trait JoinSide + case object LeftSide extends JoinSide { override def toString(): String = "left" } + case object RightSide extends JoinSide { override def toString(): String = "right" } + + sealed trait JoinStateWatermarkPredicate + case class JoinStateKeyWatermarkPredicate(expr: Expression) extends JoinStateWatermarkPredicate + case class JoinStateValueWatermarkPredicate(expr: Expression) extends JoinStateWatermarkPredicate + + case class JoinStateWatermarkPredicates( +left: Option[JoinStateWatermarkPredicate] = None, +right: Option[JoinStateWatermarkPredicate] = None) + + def getStateWatermarkPredicates( + leftAttributes: Seq[Attribute], + rightAttributes: Seq[Attribute], + leftKeys: Seq[Expression], + rightKeys: Seq[Expression], + condition: Option[Expression], + eventTimeWatermark: Option[Long]): JoinStateWatermarkPredicates = { +val joinKeyOrdinalForWatermark: Option[Int] = { + leftKeys.zipWithIndex.collectFirst { +case (ne: NamedExpression, index) if ne.metadata.contains(delayKey) => index + } orElse { +rightKeys.zipWithIndex.collectFirst { + case (ne: NamedExpression, index) if ne.metadata.contains(delayKey) => index +} + } +} + +def getOneSideStateWatermarkPredicate( +oneSideInputAttributes: Seq[Attribute], +oneSideJoinKeys: Seq[Expression], +otherSideInputAttributes: Seq[Attribute]): Option[JoinStateWatermarkPredicate] = { + val isWatermarkDefinedOnInput = oneSideInputAttributes.exists(_.metadata.contains(delayKey)) + val isWatermarkDefinedOnJoinKey = joinKeyOrdinalForWatermark.isDefined + + if (isWatermarkDefinedOnJoinKey) { // case 1 and 3 explained in the class docs +val keyExprWithWatermark = BoundReference( + joinKeyOrdinalForWatermark.get, + oneSideJoinKeys(joinKeyOrdinalForWatermark.get).dataType, + oneSideJoinKeys(joinKeyOrdinalForWatermark.get).nullable) +val expr = watermarkExpression(Some(keyExprWithWatermark), eventTimeWatermark) +expr.map(JoinStateKeyWatermarkPredicate) + + } else if (isWatermarkDefinedOnInput) { // case 2 explained in the class docs +val stateValueWatermark = getStateValueWatermark( + attributesToFindStateWatemarkFor = oneSideInputAttributes, + attributesWithEventWatermark = otherSideInputAttributes, + condition, + eventTimeWatermark) +val inputAttributeWithWatermark = oneSideInputAttributes.find(_.metadata.contains(delayKey)) +val expr = watermarkExpression(inputAttributeWithWatermark, stateValueWatermark) +expr.map(JoinStat
[GitHub] spark pull request #18659: [SPARK-21190][PYSPARK][WIP] Python Vectorized UDF...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/18659#discussion_r139852189 --- Diff: python/pyspark/serializers.py --- @@ -199,6 +211,46 @@ def __repr__(self): return "ArrowSerializer" +class ArrowPandasSerializer(ArrowSerializer): +""" +Serializes Pandas.Series as Arrow data. +""" + +def __init__(self): +super(ArrowPandasSerializer, self).__init__() + +def dumps(self, series): +""" +Make an ArrowRecordBatch from a Pandas Series and serialize. Input is a single series or +a list of series accompanied by an optional pyarrow type to coerce the data to. +""" +import pyarrow as pa +# Make input conform to [(series1, type1), (series2, type2), ...] +if not isinstance(series, (list, tuple)) or \ +(len(series) == 2 and isinstance(series[1], pa.DataType)): +series = [series] +series = [(s, None) if not isinstance(s, (list, tuple)) else s for s in series] --- End diff -- Yea, it actually affects the performance because we can avoid an extra loop: ```python def im_map(x): print("I am map %s" % x) return x def im_gen(x): print("I am gen %s" % x) return x def im_list(x): print("I am list %s" % x) return x items = list(range(3)) map(im_map, [im_list(item) for item in items]) map(im_map, (im_gen(item) for item in items)) ``` And .. this actually affects the performance up to my knowledge: ```python import time items = list(xrange(int(1e8))) for _ in xrange(10): s = time.time() _ = map(lambda x: x, [item for item in items]) print "I am list comprehension with a list: %s" % (time.time() - s) s = time.time() _ = map(lambda x: x, (item for item in items)) print "I am generator expression with a list: %s" % (time.time() - s) ``` This gives me ~13% improvement :). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19271: [SPARK-22053][SS] Stream-stream inner join in App...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/19271#discussion_r139852081 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExecHelper.scala --- @@ -0,0 +1,303 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import scala.util.control.NonFatal + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.expressions.{Add, Attribute, AttributeReference, BoundReference, Cast, CheckOverflow, Expression, ExpressionSet, GreaterThan, GreaterThanOrEqual, LessThan, LessThanOrEqual, Literal, Multiply, NamedExpression, PredicateHelper, Subtract, TimeAdd, TimeSub, UnaryMinus} +import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark._ +import org.apache.spark.sql.execution.streaming.WatermarkSupport.watermarkExpression +import org.apache.spark.sql.types._ +import org.apache.spark.unsafe.types.CalendarInterval + + +/** + * Helper object for [[StreamingSymmetricHashJoinExec]]. + */ +object StreamingSymmetricHashJoinExecHelper extends PredicateHelper with Logging { + + sealed trait JoinSide + case object LeftSide extends JoinSide { override def toString(): String = "left" } + case object RightSide extends JoinSide { override def toString(): String = "right" } + + sealed trait JoinStateWatermarkPredicate + case class JoinStateKeyWatermarkPredicate(expr: Expression) extends JoinStateWatermarkPredicate + case class JoinStateValueWatermarkPredicate(expr: Expression) extends JoinStateWatermarkPredicate + + case class JoinStateWatermarkPredicates( +left: Option[JoinStateWatermarkPredicate] = None, +right: Option[JoinStateWatermarkPredicate] = None) + + def getStateWatermarkPredicates( + leftAttributes: Seq[Attribute], + rightAttributes: Seq[Attribute], + leftKeys: Seq[Expression], + rightKeys: Seq[Expression], + condition: Option[Expression], + eventTimeWatermark: Option[Long]): JoinStateWatermarkPredicates = { +val joinKeyOrdinalForWatermark: Option[Int] = { + leftKeys.zipWithIndex.collectFirst { +case (ne: NamedExpression, index) if ne.metadata.contains(delayKey) => index + } orElse { +rightKeys.zipWithIndex.collectFirst { + case (ne: NamedExpression, index) if ne.metadata.contains(delayKey) => index +} + } +} + +def getOneSideStateWatermarkPredicate( +oneSideInputAttributes: Seq[Attribute], +oneSideJoinKeys: Seq[Expression], +otherSideInputAttributes: Seq[Attribute]): Option[JoinStateWatermarkPredicate] = { + val isWatermarkDefinedOnInput = oneSideInputAttributes.exists(_.metadata.contains(delayKey)) + val isWatermarkDefinedOnJoinKey = joinKeyOrdinalForWatermark.isDefined + + if (isWatermarkDefinedOnJoinKey) { // case 1 and 3 explained in the class docs +val keyExprWithWatermark = BoundReference( + joinKeyOrdinalForWatermark.get, + oneSideJoinKeys(joinKeyOrdinalForWatermark.get).dataType, + oneSideJoinKeys(joinKeyOrdinalForWatermark.get).nullable) +val expr = watermarkExpression(Some(keyExprWithWatermark), eventTimeWatermark) +expr.map(JoinStateKeyWatermarkPredicate) + + } else if (isWatermarkDefinedOnInput) { // case 2 explained in the class docs +val stateValueWatermark = getStateValueWatermark( + attributesToFindStateWatemarkFor = oneSideInputAttributes, + attributesWithEventWatermark = otherSideInputAttributes, + condition, + eventTimeWatermark) +val inputAttributeWithWatermark = oneSideInputAttributes.find(_.metadata.contains(delayKey)) +val expr = watermarkExpression(inputAttributeWithWatermark, stateValueWatermark) +expr.map(JoinStat
[GitHub] spark pull request #19278: [SPARK-22060][ML] Fix CrossValidator/TrainValidat...
Github user BryanCutler commented on a diff in the pull request: https://github.com/apache/spark/pull/19278#discussion_r139850475 --- Diff: mllib/src/main/scala/org/apache/spark/ml/tuning/CrossValidator.scala --- @@ -283,6 +282,8 @@ object CrossValidatorModel extends MLReadable[CrossValidatorModel] { ValidatorParams.validateParams(instance) +protected var shouldPersistSubModels: Boolean = false + --- End diff -- Is this included by accident? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19278: [SPARK-22060][ML] Fix CrossValidator/TrainValidat...
Github user BryanCutler commented on a diff in the pull request: https://github.com/apache/spark/pull/19278#discussion_r139850997 --- Diff: mllib/src/main/scala/org/apache/spark/ml/tuning/TrainValidationSplit.scala --- @@ -122,7 +123,7 @@ class TrainValidationSplit @Since("1.5.0") (@Since("1.5.0") override val uid: St // Fit models in a Future for training in parallel logDebug(s"Train split with multiple sets of parameters.") -val modelFutures = epm.map { paramMap => +val modelFutures = epm.map { case paramMap => --- End diff -- Was this intentional? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19278: [SPARK-22060][ML] Fix CrossValidator/TrainValidat...
Github user BryanCutler commented on a diff in the pull request: https://github.com/apache/spark/pull/19278#discussion_r139851719 --- Diff: mllib/src/main/scala/org/apache/spark/ml/tuning/CrossValidator.scala --- @@ -212,14 +213,12 @@ object CrossValidator extends MLReadable[CrossValidator] { val (metadata, estimator, evaluator, estimatorParamMaps) = ValidatorParams.loadImpl(path, sc, className) - val numFolds = (metadata.params \ "numFolds").extract[Int] - val seed = (metadata.params \ "seed").extract[Long] - new CrossValidator(metadata.uid) + val cv = new CrossValidator(metadata.uid) .setEstimator(estimator) .setEvaluator(evaluator) .setEstimatorParamMaps(estimatorParamMaps) -.setNumFolds(numFolds) -.setSeed(seed) + DefaultParamsReader.getAndSetParams(cv, metadata, skipParams = List("estimatorParamMaps")) --- End diff -- do you also need to skip `estimator` and `evaluator` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19278: [SPARK-22060][ML] Fix CrossValidator/TrainValidat...
Github user BryanCutler commented on a diff in the pull request: https://github.com/apache/spark/pull/19278#discussion_r139851109 --- Diff: mllib/src/main/scala/org/apache/spark/ml/tuning/CrossValidator.scala --- @@ -303,16 +304,16 @@ object CrossValidatorModel extends MLReadable[CrossValidatorModel] { val (metadata, estimator, evaluator, estimatorParamMaps) = ValidatorParams.loadImpl(path, sc, className) val numFolds = (metadata.params \ "numFolds").extract[Int] - val seed = (metadata.params \ "seed").extract[Long] val bestModelPath = new Path(path, "bestModel").toString val bestModel = DefaultParamsReader.loadParamsInstance[Model[_]](bestModelPath, sc) val avgMetrics = (metadata.metadata \ "avgMetrics").extract[Seq[Double]].toArray + val model = new CrossValidatorModel(metadata.uid, bestModel, avgMetrics) model.set(model.estimator, estimator) .set(model.evaluator, evaluator) .set(model.estimatorParamMaps, estimatorParamMaps) -.set(model.numFolds, numFolds) -.set(model.seed, seed) + DefaultParamsReader.getAndSetParams(model, metadata, skipParams = List("estimatorParamMaps")) --- End diff -- Should this also skip `estimator` and `evaluator`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19281: [SPARK-21998][SQL] SortMergeJoinExec did not calc...
Github user tejasapatil commented on a diff in the pull request: https://github.com/apache/spark/pull/19281#discussion_r139850950 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala --- @@ -101,14 +101,15 @@ case class SortMergeJoinExec( s"${getClass.getSimpleName} should not take $x as the JoinType") } - /** - * For SMJ, child's output must have been sorted on key or expressions with the same order as - * key, so we can get ordering for key from child's output ordering. - */ private def getKeyOrdering(keys: Seq[Expression], childOutputOrdering: Seq[SortOrder]) : Seq[SortOrder] = { -keys.zip(childOutputOrdering).map { case (key, childOrder) => - SortOrder(key, Ascending, childOrder.sameOrderExpressions + childOrder.child - key) +val requiredOrdering = requiredOrders(keys) +if (SparkPlan.orderingSatisfies(childOutputOrdering, requiredOrdering)) { --- End diff -- This looks good to me. cc @wzhfy who last touched this code --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19281: [SPARK-21998][SQL] SortMergeJoinExec did not calc...
Github user tejasapatil commented on a diff in the pull request: https://github.com/apache/spark/pull/19281#discussion_r139850236 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala --- @@ -64,6 +67,42 @@ class JoinSuite extends QueryTest with SharedSQLContext { } } + def assertJoinOrdering(sqlString: String, numOfJoin: Int, numOfSort: Int): Any = { --- End diff -- BTW: since this is only used by one test case, we could put it inside the test case method and not make it class level --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19281: [SPARK-21998][SQL] SortMergeJoinExec did not calc...
Github user tejasapatil commented on a diff in the pull request: https://github.com/apache/spark/pull/19281#discussion_r139850127 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala --- @@ -64,6 +67,42 @@ class JoinSuite extends QueryTest with SharedSQLContext { } } + def assertJoinOrdering(sqlString: String, numOfJoin: Int, numOfSort: Int): Any = { --- End diff -- nit: comparing the counts does not ensure that the sorts are in right place. I wish there was an easier way to pass that here but I can't think of any --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19281: [SPARK-21998][SQL] SortMergeJoinExec did not calc...
Github user tejasapatil commented on a diff in the pull request: https://github.com/apache/spark/pull/19281#discussion_r139849801 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala --- @@ -396,6 +396,26 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ object SparkPlan { private[execution] val subqueryExecutionContext = ExecutionContext.fromExecutorService( ThreadUtils.newDaemonCachedThreadPool("subquery", 16)) + + /** + * Returns if the actual ordering satisfies the required ordering. + * + * Ordering A satisfies ordering B if and only if B is an equivalent of A or of A's prefix. + */ + def orderingSatisfies(actual: Seq[SortOrder], required: Seq[SortOrder]): Boolean = { --- End diff -- SparkPlan is the node for physical operator in SQL so doesn't feel like a good place to have this. Since one would want to have all methods related to `SortOrder` in a single place, the object class feels better option. We can revisit if there are more such methods being added to that object and refac to a class. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19170: [SPARK-21961][Core] Filter out BlockStatuses Accumulator...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19170 Merged build finished. Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19170: [SPARK-21961][Core] Filter out BlockStatuses Accumulator...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19170 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/81951/ Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19170: [SPARK-21961][Core] Filter out BlockStatuses Accumulator...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19170 **[Test build #81951 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81951/testReport)** for PR 19170 at commit [`04c1e2a`](https://github.com/apache/spark/commit/04c1e2aa24c61f13f1df5148416bb00f0649fcaf). * This patch **fails Spark unit tests**. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18659: [SPARK-21190][PYSPARK][WIP] Python Vectorized UDFs
Github user BryanCutler commented on the issue: https://github.com/apache/spark/pull/18659 > what if users installed an older version of pyarrow? Shall we throw exception and ask them to upgrade, or work around type casting issue? @cloud-fan , in regards to handling of problems that might come up if using different versions of Arrow, I think we should first decide on a minimum supported version, then maybe we could put that version of pyarrow as a requirement for PySpark. If we decide to use 0.4.1 which we currently use, then we should probably work around the type casting issue and make sure this PR works with that version. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19280: [SPARK-21928][CORE] Set classloader on SerializerManager...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19280 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/81949/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19280: [SPARK-21928][CORE] Set classloader on SerializerManager...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19280 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19280: [SPARK-21928][CORE] Set classloader on SerializerManager...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19280 **[Test build #81949 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81949/testReport)** for PR 19280 at commit [`20e3585`](https://github.com/apache/spark/commit/20e3585eac16ef3bfe403ec23f57a2705ff47ecb). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...
Github user ArtRand commented on a diff in the pull request: https://github.com/apache/spark/pull/19272#discussion_r139847070 --- Diff: resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCredentialRenewer.scala --- @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler.cluster.mesos + +import java.security.PrivilegedExceptionAction +import java.util.concurrent.{Executors, TimeUnit} + +import scala.collection.JavaConverters._ +import scala.util.Try + +import org.apache.hadoop.security.UserGroupInformation + +import org.apache.spark.SparkConf +import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.deploy.security.HadoopDelegationTokenManager +import org.apache.spark.internal.Logging +import org.apache.spark.rpc.RpcEndpointRef +import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.UpdateDelegationTokens +import org.apache.spark.util.ThreadUtils + + +class MesosCredentialRenewer( +conf: SparkConf, +tokenManager: HadoopDelegationTokenManager, +nextRenewal: Long, +de: RpcEndpointRef) extends Logging { + private val credentialRenewerThread = +Executors.newSingleThreadScheduledExecutor( + ThreadUtils.namedThreadFactory("Credential Refresh Thread")) + + @volatile private var timeOfNextRenewal = nextRenewal + + private val principal = conf.get("spark.yarn.principal") + + private val (secretFile, mode) = getSecretFile(conf) + + private def getSecretFile(conf: SparkConf): (String, String) = { +val keytab64 = conf.get("spark.yarn.keytab", null) +val tgt64 = System.getenv("KRB5CCNAME") +require(keytab64 != null || tgt64 != null, "keytab or tgt required") +require(keytab64 == null || tgt64 == null, "keytab and tgt cannot be used at the same time") +val mode = if (keytab64 != null) "keytab" else "tgt" +val secretFile = if (keytab64 != null) keytab64 else tgt64 +logInfo(s"Logging in as $principal with mode $mode to retrieve HDFS delegation tokens") +logDebug(s"secretFile is $secretFile") +(secretFile, mode) + } + + def scheduleTokenRenewal(): Unit = { +def scheduleRenewal(runnable: Runnable): Unit = { + val remainingTime = timeOfNextRenewal - System.currentTimeMillis() + if (remainingTime <= 0) { +logInfo("Credentials have expired, creating new ones now.") +runnable.run() + } else { +logInfo(s"Scheduling login from keytab in $remainingTime millis.") +credentialRenewerThread.schedule(runnable, remainingTime, TimeUnit.MILLISECONDS) + } +} + +val credentialRenewerRunnable = + new Runnable { +override def run(): Unit = { + try { +val creds = getRenewedDelegationTokens(conf) +broadcastDelegationTokens(creds) + } catch { +case e: Exception => + // Log the error and try to write new tokens back in an hour + logWarning("Couldn't broadcast tokens, trying agin in 20 seconds", e) + credentialRenewerThread.schedule(this, 20, TimeUnit.SECONDS) + return + } + scheduleRenewal(this) +} + } +scheduleRenewal(credentialRenewerRunnable) + } + + private def getRenewedDelegationTokens(conf: SparkConf): Array[Byte] = { +logInfo(s"Attempting to login with ${conf.get("spark.yarn.principal", null)}") +// Get new delegation tokens by logging in with a new UGI +// inspired by AMCredentialRenewer.scala:L174 +val ugi = if (mode == "keytab") { + UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, secretFile) +} else { + UserGroupInformation.getUGIFromTicketCache(secretFile, principal) +} +val tempCreds = ugi.getCredentia
[GitHub] spark issue #19259: [BACKPORT-2.1][SPARK-19318][SPARK-22041][SQL] Docker tes...
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/19259 Thanks! Merged to 2.1. Could you close this PR? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17819: [SPARK-20542][ML][SQL] Add an API to Bucketizer that can...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/17819 **[Test build #81956 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81956/testReport)** for PR 17819 at commit [`92ef9bd`](https://github.com/apache/spark/commit/92ef9bde1e048eef7e3b530286723cad5773debc). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17819: [SPARK-20542][ML][SQL] Add an API to Bucketizer that can...
Github user viirya commented on the issue: https://github.com/apache/spark/pull/17819 @MLnick I have no strong option but @WeichenXu123 seems more preferring merging the new API into current `Bucketizer`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17819: [SPARK-20542][ML][SQL] Add an API to Bucketizer that can...
Github user viirya commented on the issue: https://github.com/apache/spark/pull/17819 retest this please. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18659: [SPARK-21190][PYSPARK][WIP] Python Vectorized UDFs
Github user BryanCutler commented on the issue: https://github.com/apache/spark/pull/18659 Regarding the upgrade of Arrow, the concerns of #18974 are still valid - namely it has some risk and upgrading the Python side is a good amount of work that only a couple of people have the access to do. Would it be better to discuss the upgrade strategy in another JIRA? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19196: [SPARK-21977] SinglePartition optimizations break certai...
Github user tdas commented on the issue: https://github.com/apache/spark/pull/19196 LGTM. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19281: [SPARK-21998][SQL] SortMergeJoinExec did not calc...
Github user maryannxue commented on a diff in the pull request: https://github.com/apache/spark/pull/19281#discussion_r139844429 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala --- @@ -396,6 +396,26 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ object SparkPlan { private[execution] val subqueryExecutionContext = ExecutionContext.fromExecutorService( ThreadUtils.newDaemonCachedThreadPool("subquery", 16)) + + /** + * Returns if the actual ordering satisfies the required ordering. + * + * Ordering A satisfies ordering B if and only if B is an equivalent of A or of A's prefix. + */ + def orderingSatisfies(actual: Seq[SortOrder], required: Seq[SortOrder]): Boolean = { --- End diff -- Actually I had struggled where to put this, in SortOrder or SparkPlan. It doesn't look like anywhere else is using Seq[SortOrder] so far, so I chose to leave this out of SortOrder. I think, though, if we see potential usage of Seq[SortOrder] elsewhere, it might be worth to wrap it as a class. Agree? Either way, I could put it into SortOrder for now. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19271: [SPARK-22053][SS] Stream-stream inner join in App...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/19271#discussion_r139844415 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala --- @@ -0,0 +1,405 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.state + +import scala.reflect.ClassTag + +import org.apache.hadoop.conf.Configuration + +import org.apache.spark.{Partition, SparkContext, TaskContext} +import org.apache.spark.internal.Logging +import org.apache.spark.rdd.{RDD, ZippedPartitionsRDD2} +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, BindReferences, Expression, LessThanOrEqual, Literal, SpecificInternalRow, UnsafeProjection, UnsafeRow} +import org.apache.spark.sql.catalyst.expressions.codegen.Predicate +import org.apache.spark.sql.execution.streaming.{StatefulOperatorStateInfo, StreamingSymmetricHashJoinExec} +import org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinHelper._ +import org.apache.spark.sql.types.{LongType, StructField, StructType} +import org.apache.spark.util.NextIterator + +/** + * Helper class to manage state required by a single side of [[StreamingSymmetricHashJoinExec]]. + * The interface of this class is basically that of a multi-map: + * - Get: Returns an iterator of multiple values for given key + * - Append: Append a new value to the given key + * - Remove Data by predicate: Drop any state using a predicate condition on keys or values + * + * @param joinSide Defines the join side --- End diff -- nit: why the weird indentation --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19284: [SPARK-22067][SQL] ArrowWriter should use position when ...
Github user BryanCutler commented on the issue: https://github.com/apache/spark/pull/19284 Ooops, I reference the wrong JIRA, it was ARROW-1443 PR: https://github.com/apache/arrow/pull/1022 ArrowBuf.setBytes was not using the destination buffer properly --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18659: [SPARK-21190][PYSPARK][WIP] Python Vectorized UDF...
Github user BryanCutler commented on a diff in the pull request: https://github.com/apache/spark/pull/18659#discussion_r139843267 --- Diff: python/pyspark/sql/functions.py --- @@ -2142,18 +2159,26 @@ def udf(f=None, returnType=StringType()): | 8| JOHN DOE| 22| +--+--++ """ -def _udf(f, returnType=StringType()): -udf_obj = UserDefinedFunction(f, returnType) -return udf_obj._wrapped() +return _create_udf(f, returnType=returnType, vectorized=False) -# decorator @udf, @udf() or @udf(dataType()) -if f is None or isinstance(f, (str, DataType)): -# If DataType has been passed as a positional argument -# for decorator use it as a returnType -return_type = f or returnType -return functools.partial(_udf, returnType=return_type) + +@since(2.3) +def pandas_udf(f=None, returnType=StringType()): +""" +Creates a :class:`Column` expression representing a user defined function (UDF) that accepts +`Pandas.Series` as input arguments and outputs a `Pandas.Series` of the same length. + +:param f: python function if used as a standalone function +:param returnType: a :class:`pyspark.sql.types.DataType` object + +# TODO: doctest +""" +import inspect +# If function "f" does not define the optional kwargs, then wrap with a kwargs placeholder +if inspect.getargspec(f).keywords is None: +return _create_udf(lambda *a, **kwargs: f(*a), returnType=returnType, vectorized=True) --- End diff -- I agree it is still a bit weird.. Did you mean disallowing 0-parameter panda_udfs or requiring 0-parameter panda_udfs to accept `kwargs`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18659: [SPARK-21190][PYSPARK][WIP] Python Vectorized UDFs
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/18659 **[Test build #81955 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81955/testReport)** for PR 18659 at commit [`f451d65`](https://github.com/apache/spark/commit/f451d652a2656113cce1f0763e17c73ed2d03c44). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18659: [SPARK-21190][PYSPARK][WIP] Python Vectorized UDFs
Github user BryanCutler commented on the issue: https://github.com/apache/spark/pull/18659 Thanks for the reviews @ueshin @viirya and @HyukjinKwon ! I updated with your comments --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19284: [SPARK-22067][SQL] ArrowWriter should use position when ...
Github user icexelloss commented on the issue: https://github.com/apache/spark/pull/19284 LGTM. What's the Arrow bug you mentioned? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19282: [SPARK-22066][BUILD] Update checkstyle to 8.2, enable it...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19282 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/81948/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19282: [SPARK-22066][BUILD] Update checkstyle to 8.2, enable it...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19282 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19282: [SPARK-22066][BUILD] Update checkstyle to 8.2, enable it...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19282 **[Test build #81948 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81948/testReport)** for PR 19282 at commit [`93ff675`](https://github.com/apache/spark/commit/93ff67576566e93eaca3220507049166133ad4b1). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18659: [SPARK-21190][PYSPARK][WIP] Python Vectorized UDF...
Github user BryanCutler commented on a diff in the pull request: https://github.com/apache/spark/pull/18659#discussion_r139841884 --- Diff: python/pyspark/serializers.py --- @@ -199,6 +211,46 @@ def __repr__(self): return "ArrowSerializer" +class ArrowPandasSerializer(ArrowSerializer): +""" +Serializes Pandas.Series as Arrow data. +""" + +def __init__(self): +super(ArrowPandasSerializer, self).__init__() + +def dumps(self, series): +""" +Make an ArrowRecordBatch from a Pandas Series and serialize. Input is a single series or +a list of series accompanied by an optional pyarrow type to coerce the data to. +""" +import pyarrow as pa +# Make input conform to [(series1, type1), (series2, type2), ...] +if not isinstance(series, (list, tuple)) or \ +(len(series) == 2 and isinstance(series[1], pa.DataType)): +series = [series] +series = [(s, None) if not isinstance(s, (list, tuple)) else s for s in series] +arrs = [pa.Array.from_pandas(s[0], type=s[1], mask=s[0].isnull()) for s in series] +batch = pa.RecordBatch.from_arrays(arrs, ["_%d" % i for i in range(len(arrs))]) +return super(ArrowPandasSerializer, self).dumps(batch) + +def loads(self, obj): +""" +Deserialize an ArrowRecordBatch to an Arrow table and return as a list of pandas.Series +followed by a dictionary containing length of the loaded batches. +""" +import pyarrow as pa +reader = pa.RecordBatchFileReader(pa.BufferReader(obj)) +batches = [reader.get_batch(i) for i in range(reader.num_record_batches)] +# NOTE: a 0-parameter pandas_udf will produce an empty batch that can have num_rows set +num_rows = sum([batch.num_rows for batch in batches]) --- End diff -- I guess this makes sense because its a summation, no sense in making a list then adding it all up --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18659: [SPARK-21190][PYSPARK][WIP] Python Vectorized UDF...
Github user BryanCutler commented on a diff in the pull request: https://github.com/apache/spark/pull/18659#discussion_r139841646 --- Diff: python/pyspark/serializers.py --- @@ -199,6 +211,46 @@ def __repr__(self): return "ArrowSerializer" +class ArrowPandasSerializer(ArrowSerializer): +""" +Serializes Pandas.Series as Arrow data. +""" + +def __init__(self): +super(ArrowPandasSerializer, self).__init__() + +def dumps(self, series): +""" +Make an ArrowRecordBatch from a Pandas Series and serialize. Input is a single series or +a list of series accompanied by an optional pyarrow type to coerce the data to. +""" +import pyarrow as pa +# Make input conform to [(series1, type1), (series2, type2), ...] +if not isinstance(series, (list, tuple)) or \ +(len(series) == 2 and isinstance(series[1], pa.DataType)): +series = [series] +series = [(s, None) if not isinstance(s, (list, tuple)) else s for s in series] --- End diff -- That would work, but does it help much since `series` will already be a list or tuple? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19271: [SPARK-22053][SS] Stream-stream inner join in App...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/19271#discussion_r139841490 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala --- @@ -0,0 +1,330 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import java.util.concurrent.TimeUnit.NANOSECONDS + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, JoinedRow, NamedExpression, UnsafeProjection, UnsafeRow} +import org.apache.spark.sql.catalyst.plans._ +import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark._ +import org.apache.spark.sql.catalyst.plans.physical._ +import org.apache.spark.sql.execution.{BinaryExecNode, SparkPlan} +import org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExecHelper._ +import org.apache.spark.sql.execution.streaming.state._ +import org.apache.spark.sql.internal.SessionState +import org.apache.spark.util.{CompletionIterator, SerializableConfiguration} + + +/** + * Performs stream-stream join using symmetric hash join algorithm. It works as follows. + * + * /---\ + * left side input ->|left side state|--\ + * \---/ | + *|> joined output + * /---\ | + * right side input >|right side state |--/ + * \---/ + * + * Each join side buffers past input rows as streaming state so that the past input can be joined + * with future input on the other side. This buffer state is effectively a multi-map: + *equi-join key -> list of past input rows received with the join key + * + * For each input row in each side, the following operations take place. + * - Calculate join key from the row. + * - Use the join key to append the row to the buffer state of the side that the row came from. + * - Find past buffered values for the key from the other side. For each such value, emit the + * "joined row" (left-row, right-row) + * - Apply the optional condition to filter the joined rows as the final output. + * + * If a timestamp column with event time watermark is present in the join keys or in the input + * data, then the it uses the watermark figure out which rows in the buffer will not join with + * and new data, and therefore can be discarded. Depending on the provided query conditions, we + * can define thresholds on both state key (i.e. joining keys) and state value (i.e. input rows). + * There are three kinds of queries possible regarding this as explained below. + * Assume that watermark has been defined on both `leftTime` and `rightTime` columns used below. + * + * 1. When timestamp/time-window + watermark is in the join keys. Example (pseudo-SQL): + * + * SELECT * FROM leftTable, rightTable + * ON + *leftKey = rightKey AND + *window(leftTime, "1 hour") = window(rightTime, "1 hour")// 1hr tumbling windows + * + *In this case, this operator will join rows newer than watermark which fall in the same + *1 hour window. Say the event-time watermark is "12:34" (both left and right input). + *Then input rows can only have time > 12:34. Hence, they can only join with buffered rows + *where window >= 12:00 - 1:00 and all buffered rows with join window < 12:00 can be + *discarded. In other words, the operator will discard all state where + *window in state key (i.e. join key) < event time watermark. This threshold is called + *State Key Watermark. + * + * 2. When timestamp range conditions
[GitHub] spark pull request #19269: [SPARK-22026][SQL][WIP] data source v2 write path
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/19269#discussion_r139841435 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Command.scala --- @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.v2 + +import org.apache.spark.{SparkException, TaskContext} +import org.apache.spark.internal.Logging +import org.apache.spark.sql.{Row, SparkSession} +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.encoders.{ExpressionEncoder, RowEncoder} +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.sql.execution.command.RunnableCommand +import org.apache.spark.sql.sources.v2.writer._ +import org.apache.spark.sql.types.StructType +import org.apache.spark.util.Utils + +case class WriteToDataSourceV2Command(writer: DataSourceV2Writer, query: LogicalPlan) + extends RunnableCommand { --- End diff -- I know similar tasks do the same, but this should not implement `RunnableCommand`. I'm not sure the original intent for it, but I think `RunnableCommand` should be used for small tasks that are carried out on the driver, like DDL. Using `RunnableCommand` in cases like this where a job needs to run ends up effectively linking a logical plan into a physical plan, which has caused a few messy issues. For example, the problem where the Spark SQL tab doesn't show the entire operation and only shows the outer command without metrics. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...
Github user skonto commented on a diff in the pull request: https://github.com/apache/spark/pull/19272#discussion_r139841458 --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala --- @@ -159,6 +159,13 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp scheduler.getExecutorsAliveOnHost(host).foreach { exec => killExecutors(exec.toSeq, replace = true, force = true) } + + case UpdateDelegationTokens(tokens) => +logDebug("Asking each executor to update HDFS delegation tokens") +for ((x, executorData) <- executorDataMap) { --- End diff -- Alternatively executorDataMap.values.foreach(...) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19271: [SPARK-22053][SS] Stream-stream inner join in App...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/19271#discussion_r139841304 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala --- @@ -114,6 +115,16 @@ class IncrementalExecution( stateInfo = Some(nextStatefulOperationStateInfo), batchTimestampMs = Some(offsetSeqMetadata.batchTimestampMs), eventTimeWatermark = Some(offsetSeqMetadata.batchWatermarkMs)) + + case j @ StreamingSymmetricHashJoinExec(lKeys, rKeys, _, cond, _, _, _, left, right) => +j.copy( + stateInfo = Some(nextStatefulOperationStateInfo), --- End diff -- That is the case **if** you don't change Spark versions. A more recent Spark version may include new optimizer rules that may change the ordering. Just something to think about. Would be nice to add a test with aggregation + join and join + aggregation. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19196: [SPARK-21977] SinglePartition optimizations break certai...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19196 **[Test build #81954 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81954/testReport)** for PR 19196 at commit [`4eb7f4f`](https://github.com/apache/spark/commit/4eb7f4f6df3f2d5ae831bf15715651598e52c3e6). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19196: [SPARK-21977] SinglePartition optimizations break...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/19196#discussion_r139840990 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/EnsureStatefulOpPartitioningSuite.scala --- @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.streaming + +import java.util.UUID + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.catalyst.plans.physical._ +import org.apache.spark.sql.execution.{SparkPlan, SparkPlanTest, UnaryExecNode} +import org.apache.spark.sql.execution.exchange.{Exchange, ShuffleExchange} +import org.apache.spark.sql.execution.streaming.{IncrementalExecution, OffsetSeqMetadata, StatefulOperator, StatefulOperatorStateInfo} +import org.apache.spark.sql.test.SharedSQLContext + +class EnsureStatefulOpPartitioningSuite extends SparkPlanTest with SharedSQLContext { + + import testImplicits._ + super.beforeAll() + + private val baseDf = Seq((1, "A"), (2, "b")).toDF("num", "char") + + testEnsureStatefulOpPartitioning( +"ClusteredDistribution generates Exchange with HashPartitioning", +baseDf.queryExecution.sparkPlan, +requiredDistribution = keys => ClusteredDistribution(keys), +expectedPartitioning = + keys => HashPartitioning(keys, spark.sessionState.conf.numShufflePartitions), +expectShuffle = true) + + testEnsureStatefulOpPartitioning( +"ClusteredDistribution with coalesce(1) generates Exchange with HashPartitioning", +baseDf.coalesce(1).queryExecution.sparkPlan, +requiredDistribution = keys => ClusteredDistribution(keys), +expectedPartitioning = + keys => HashPartitioning(keys, spark.sessionState.conf.numShufflePartitions), +expectShuffle = true) + + testEnsureStatefulOpPartitioning( +"AllTuples generates Exchange with SinglePartition", +baseDf.queryExecution.sparkPlan, +requiredDistribution = _ => AllTuples, +expectedPartitioning = _ => SinglePartition, +expectShuffle = true) + + testEnsureStatefulOpPartitioning( +"AllTuples with coalesce(1) doesn't need Exchange", +baseDf.coalesce(1).queryExecution.sparkPlan, +requiredDistribution = _ => AllTuples, +expectedPartitioning = _ => SinglePartition, +expectShuffle = false) + + /** + * For `StatefulOperator` with the given `requiredChildDistribution`, and child SparkPlan + * `inputPlan`, ensures that the incremental planner adds exchanges, if required, in order to + * ensure the expected partitioning. + */ + private def testEnsureStatefulOpPartitioning( + testName: String, + inputPlan: SparkPlan, + requiredDistribution: Seq[Attribute] => Distribution, + expectedPartitioning: Seq[Attribute] => Partitioning, + expectShuffle: Boolean): Unit = { +test(testName) { + val operator = TestStatefulOperator(inputPlan, requiredDistribution(inputPlan.output.take(1))) + val executed = executePlan(operator, OutputMode.Complete()) + if (expectShuffle) { +val exchange = executed.children.find(_.isInstanceOf[Exchange]) +if (exchange.isEmpty) { + fail(s"Was expecting an exchange but didn't get one in:\n$executed") +} +assert(exchange.get === + ShuffleExchange(expectedPartitioning(inputPlan.output.take(1)), inputPlan), + s"Exchange didn't have expected properties:\n${exchange.get}") + } else { +assert(!executed.children.exists(_.isInstanceOf[Exchange]), + s"Unexpected exchange found in:\n$executed") + } +} + } + + /** Executes a SparkPlan using the IncrementalPlanner used for Structured
[GitHub] spark pull request #18659: [SPARK-21190][PYSPARK][WIP] Python Vectorized UDF...
Github user BryanCutler commented on a diff in the pull request: https://github.com/apache/spark/pull/18659#discussion_r139840306 --- Diff: python/pyspark/worker.py --- @@ -71,7 +73,19 @@ def wrap_udf(f, return_type): return lambda *a: f(*a) -def read_single_udf(pickleSer, infile): +def wrap_pandas_udf(f, return_type): +def verify_result_length(*a): +kwargs = a[-1] +result = f(*a[:-1], **kwargs) +if len(result) != kwargs["length"]: +raise RuntimeError("Result vector from pandas_udf was not the required length: " + "expected %d, got %d\nUse input vector length or kwarg['length']" + % (kwargs["length"], len(result))) +return result, toArrowType(return_type) --- End diff -- sure, that sounds good thanks! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19270: [SPARK-21809] : Change Stage Page to use datatables to s...
Github user ajbozarth commented on the issue: https://github.com/apache/spark/pull/19270 On a second look I think I figured out my misunderstanding, and I've realized a through review will take quite a bit of time, I'll do my best to finish by the end of the week but no promises. As for the MiMa failure, any change to a public api (even additions) must be added to the MiMa excludes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19196: [SPARK-21977] SinglePartition optimizations break certai...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19196 Merged build finished. Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19196: [SPARK-21977] SinglePartition optimizations break certai...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19196 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/81950/ Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19196: [SPARK-21977] SinglePartition optimizations break certai...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19196 **[Test build #81950 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81950/testReport)** for PR 19196 at commit [`8a6eafe`](https://github.com/apache/spark/commit/8a6eafef056b2a64ee0be07ce886ad69dc295537). * This patch **fails Spark unit tests**. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19269: [SPARK-22026][SQL][WIP] data source v2 write path
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/19269#discussion_r139839037 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataWriter.java --- @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2.writer; + +import org.apache.spark.annotation.InterfaceStability; + +/** + * A data writer returned by {@link WriteTask#createWriter(int, int, int)} and is responsible for + * writing data for an input RDD partition. + * + * Note that, Currently the type `T` can only be {@link org.apache.spark.sql.Row} for normal data + * source writers, or {@link org.apache.spark.sql.catalyst.expressions.UnsafeRow} for data source + * writers that mix in {@link SupportsWriteUnsafeRow}. + */ +@InterfaceStability.Evolving +public interface DataWriter { + + void write(T record); --- End diff -- What happens if this throws an exception? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19269: [SPARK-22026][SQL][WIP] data source v2 write path
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/19269#discussion_r139838908 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataSourceV2Writer.java --- @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2.writer; + +import org.apache.spark.annotation.InterfaceStability; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SaveMode; +import org.apache.spark.sql.sources.v2.DataSourceV2Options; +import org.apache.spark.sql.sources.v2.WriteSupport; +import org.apache.spark.sql.types.StructType; + +/** + * A data source writer that is returned by + * {@link WriteSupport#createWriter(StructType, SaveMode, DataSourceV2Options)}. + * It can mix in various writing optimization interfaces to speed up the data saving. The actual + * writing logic is delegated to {@link WriteTask} that is returned by {@link #createWriteTask()}. + * + * The writing procedure is: + * 1. Create a write task by {@link #createWriteTask()}, serialize and send it to all the + * partitions of the input data(RDD). + * 2. For each partition, create a data writer with the write task, and write the data of the + * partition with this writer. If all the data are written successfully, call + * {@link DataWriter#commit()}. If exception happens during the writing, call + * {@link DataWriter#abort()}. This step may repeat several times as Spark will retry failed + * tasks. + * 3. Wait until all the writers/partitions are finished, i.e., either committed or aborted. If + * all partitions are written successfully, call {@link #commit(WriterCommitMessage[])}. If + * some partitions failed and aborted, call {@link #abort()}. + * + * Note that, data sources are responsible for providing transaction ability by implementing the + * `commit` and `abort` methods of {@link DataSourceV2Writer} and {@link DataWriter} correctly. + * The transaction here is Spark-level transaction, which may not be the underlying storage + * transaction. For example, Spark successfully write data to a Cassandra data source, but + * Cassandra may need some more time to reach consistency at storage level. + */ +@InterfaceStability.Evolving +public interface DataSourceV2Writer { + + /** + * Creates a write task which will be serialized and sent to executors. For each partition of the + * input data(RDD), there will be one write task to write the records. + */ + WriteTask createWriteTask(); + + /** + * Commits this writing job with a list of commit messages. The commit messages are collected from + * all data writers for this writing job and are produced by {@link DataWriter#commit()}. This + * also means all the data are written successfully and all data writers are committed. + */ + void commit(WriterCommitMessage[] messages); + + /** + * Aborts this writing job because some data writers are failed to write the records and aborted. + */ + void abort(); --- End diff -- Should this accept the commit messages for committed tasks, or will tasks be aborted? I'm thinking of the case where you're writing to S3. Say a data source writes all attempt files to the final locations, then removes any attempts that are aborted. If the job aborts with some tasks that have already committed, then either this should have the option of cleaning up those files (passed in the commit message) or all of the tasks should be individually aborted. I'd prefer to have this abort clean up successful/committed tasks because the logic may be different. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19269: [SPARK-22026][SQL][WIP] data source v2 write path
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/19269#discussion_r139838459 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataSourceV2Writer.java --- @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2.writer; + +import org.apache.spark.annotation.InterfaceStability; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SaveMode; +import org.apache.spark.sql.sources.v2.DataSourceV2Options; +import org.apache.spark.sql.sources.v2.WriteSupport; +import org.apache.spark.sql.types.StructType; + +/** + * A data source writer that is returned by + * {@link WriteSupport#createWriter(StructType, SaveMode, DataSourceV2Options)}. + * It can mix in various writing optimization interfaces to speed up the data saving. The actual + * writing logic is delegated to {@link WriteTask} that is returned by {@link #createWriteTask()}. + * + * The writing procedure is: + * 1. Create a write task by {@link #createWriteTask()}, serialize and send it to all the + * partitions of the input data(RDD). + * 2. For each partition, create a data writer with the write task, and write the data of the + * partition with this writer. If all the data are written successfully, call + * {@link DataWriter#commit()}. If exception happens during the writing, call + * {@link DataWriter#abort()}. This step may repeat several times as Spark will retry failed + * tasks. + * 3. Wait until all the writers/partitions are finished, i.e., either committed or aborted. If + * all partitions are written successfully, call {@link #commit(WriterCommitMessage[])}. If + * some partitions failed and aborted, call {@link #abort()}. + * + * Note that, data sources are responsible for providing transaction ability by implementing the + * `commit` and `abort` methods of {@link DataSourceV2Writer} and {@link DataWriter} correctly. + * The transaction here is Spark-level transaction, which may not be the underlying storage + * transaction. For example, Spark successfully write data to a Cassandra data source, but + * Cassandra may need some more time to reach consistency at storage level. + */ +@InterfaceStability.Evolving +public interface DataSourceV2Writer { + + /** + * Creates a write task which will be serialized and sent to executors. For each partition of the + * input data(RDD), there will be one write task to write the records. + */ + WriteTask createWriteTask(); + + /** + * Commits this writing job with a list of commit messages. The commit messages are collected from + * all data writers for this writing job and are produced by {@link DataWriter#commit()}. This + * also means all the data are written successfully and all data writers are committed. --- End diff -- I think this should state the guarantees when this method is called: * One and only one attempt for every task has committed successfully * Messages contains the commit message from every committed task attempt, which is no more than one per task. * All other attempts have been successfully aborted (is this a guarantee, or just that aborts have been attemtped?) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19141: [SPARK-21384] [YARN] Spark + YARN fails with LocalFileSy...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19141 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/81953/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19141: [SPARK-21384] [YARN] Spark + YARN fails with LocalFileSy...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19141 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19141: [SPARK-21384] [YARN] Spark + YARN fails with LocalFileSy...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19141 **[Test build #81953 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81953/testReport)** for PR 19141 at commit [`d2d13fe`](https://github.com/apache/spark/commit/d2d13fe82aec1c0fc43d688dbd315385ef99be19). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19269: [SPARK-22026][SQL][WIP] data source v2 write path
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/19269#discussion_r139836068 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataSourceV2Writer.java --- @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2.writer; + +import org.apache.spark.annotation.InterfaceStability; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SaveMode; +import org.apache.spark.sql.sources.v2.DataSourceV2Options; +import org.apache.spark.sql.sources.v2.WriteSupport; +import org.apache.spark.sql.types.StructType; + +/** + * A data source writer that is returned by + * {@link WriteSupport#createWriter(StructType, SaveMode, DataSourceV2Options)}. + * It can mix in various writing optimization interfaces to speed up the data saving. The actual + * writing logic is delegated to {@link WriteTask} that is returned by {@link #createWriteTask()}. + * + * The writing procedure is: + * 1. Create a write task by {@link #createWriteTask()}, serialize and send it to all the + * partitions of the input data(RDD). + * 2. For each partition, create a data writer with the write task, and write the data of the --- End diff -- How does this handle speculative execution? This description makes it sound like attempts are only run serially. I'd like to have an interface that signals support for concurrent tasks, for data sources that act like the direct committer and can't handle speculation. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19271: [SPARK-22053][SS] Stream-stream inner join in App...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/19271#discussion_r139835816 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExecHelper.scala --- @@ -0,0 +1,303 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import scala.util.control.NonFatal + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.expressions.{Add, Attribute, AttributeReference, BoundReference, Cast, CheckOverflow, Expression, ExpressionSet, GreaterThan, GreaterThanOrEqual, LessThan, LessThanOrEqual, Literal, Multiply, NamedExpression, PredicateHelper, Subtract, TimeAdd, TimeSub, UnaryMinus} +import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark._ +import org.apache.spark.sql.execution.streaming.WatermarkSupport.watermarkExpression +import org.apache.spark.sql.types._ +import org.apache.spark.unsafe.types.CalendarInterval + + +/** + * Helper object for [[StreamingSymmetricHashJoinExec]]. + */ +object StreamingSymmetricHashJoinExecHelper extends PredicateHelper with Logging { + + sealed trait JoinSide + case object LeftSide extends JoinSide { override def toString(): String = "left" } + case object RightSide extends JoinSide { override def toString(): String = "right" } + + sealed trait JoinStateWatermarkPredicate + case class JoinStateKeyWatermarkPredicate(expr: Expression) extends JoinStateWatermarkPredicate + case class JoinStateValueWatermarkPredicate(expr: Expression) extends JoinStateWatermarkPredicate + + case class JoinStateWatermarkPredicates( +left: Option[JoinStateWatermarkPredicate] = None, +right: Option[JoinStateWatermarkPredicate] = None) + + def getStateWatermarkPredicates( + leftAttributes: Seq[Attribute], + rightAttributes: Seq[Attribute], + leftKeys: Seq[Expression], + rightKeys: Seq[Expression], + condition: Option[Expression], + eventTimeWatermark: Option[Long]): JoinStateWatermarkPredicates = { +val joinKeyOrdinalForWatermark: Option[Int] = { + leftKeys.zipWithIndex.collectFirst { +case (ne: NamedExpression, index) if ne.metadata.contains(delayKey) => index + } orElse { +rightKeys.zipWithIndex.collectFirst { + case (ne: NamedExpression, index) if ne.metadata.contains(delayKey) => index +} + } +} + +def getOneSideStateWatermarkPredicate( +oneSideInputAttributes: Seq[Attribute], +oneSideJoinKeys: Seq[Expression], +otherSideInputAttributes: Seq[Attribute]): Option[JoinStateWatermarkPredicate] = { + val isWatermarkDefinedOnInput = oneSideInputAttributes.exists(_.metadata.contains(delayKey)) + val isWatermarkDefinedOnJoinKey = joinKeyOrdinalForWatermark.isDefined + + if (isWatermarkDefinedOnJoinKey) { // case 1 and 3 explained in the class docs +val keyExprWithWatermark = BoundReference( + joinKeyOrdinalForWatermark.get, + oneSideJoinKeys(joinKeyOrdinalForWatermark.get).dataType, + oneSideJoinKeys(joinKeyOrdinalForWatermark.get).nullable) +val expr = watermarkExpression(Some(keyExprWithWatermark), eventTimeWatermark) +expr.map(JoinStateKeyWatermarkPredicate) + + } else if (isWatermarkDefinedOnInput) { // case 2 explained in the class docs +val stateValueWatermark = getStateValueWatermark( + attributesToFindStateWatemarkFor = oneSideInputAttributes, + attributesWithEventWatermark = otherSideInputAttributes, + condition, + eventTimeWatermark) +val inputAttributeWithWatermark = oneSideInputAttributes.find(_.metadata.contains(delayKey)) +val expr = watermarkExpression(inputAttributeWithWatermark, stateValueWatermark) +expr.map(JoinStat
[GitHub] spark pull request #19269: [SPARK-22026][SQL][WIP] data source v2 write path
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/19269#discussion_r139835603 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataSourceV2Writer.java --- @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2.writer; + +import org.apache.spark.annotation.InterfaceStability; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SaveMode; +import org.apache.spark.sql.sources.v2.DataSourceV2Options; +import org.apache.spark.sql.sources.v2.WriteSupport; +import org.apache.spark.sql.types.StructType; + +/** + * A data source writer that is returned by + * {@link WriteSupport#createWriter(StructType, SaveMode, DataSourceV2Options)}. + * It can mix in various writing optimization interfaces to speed up the data saving. The actual + * writing logic is delegated to {@link WriteTask} that is returned by {@link #createWriteTask()}. + * + * The writing procedure is: + * 1. Create a write task by {@link #createWriteTask()}, serialize and send it to all the + * partitions of the input data(RDD). + * 2. For each partition, create a data writer with the write task, and write the data of the + * partition with this writer. If all the data are written successfully, call + * {@link DataWriter#commit()}. If exception happens during the writing, call + * {@link DataWriter#abort()}. This step may repeat several times as Spark will retry failed + * tasks. + * 3. Wait until all the writers/partitions are finished, i.e., either committed or aborted. If + * all partitions are written successfully, call {@link #commit(WriterCommitMessage[])}. If + * some partitions failed and aborted, call {@link #abort()}. + * + * Note that, data sources are responsible for providing transaction ability by implementing the + * `commit` and `abort` methods of {@link DataSourceV2Writer} and {@link DataWriter} correctly. + * The transaction here is Spark-level transaction, which may not be the underlying storage + * transaction. For example, Spark successfully write data to a Cassandra data source, but + * Cassandra may need some more time to reach consistency at storage level. + */ +@InterfaceStability.Evolving +public interface DataSourceV2Writer { + + /** + * Creates a write task which will be serialized and sent to executors. For each partition of the + * input data(RDD), there will be one write task to write the records. + */ + WriteTask createWriteTask(); --- End diff -- I think it's confusing to have only one write "task" that is serialized and used everywhere. It is implicitly copied by the serialization into multiple distinct tasks. Is there a better name for it? Maybe call the `DataWriter` the `WriteTask` and serialize something with a better name? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19271: [SPARK-22053][SS] Stream-stream inner join in App...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/19271#discussion_r139835427 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExecHelper.scala --- @@ -0,0 +1,303 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import scala.util.control.NonFatal + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.expressions.{Add, Attribute, AttributeReference, BoundReference, Cast, CheckOverflow, Expression, ExpressionSet, GreaterThan, GreaterThanOrEqual, LessThan, LessThanOrEqual, Literal, Multiply, NamedExpression, PredicateHelper, Subtract, TimeAdd, TimeSub, UnaryMinus} +import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark._ +import org.apache.spark.sql.execution.streaming.WatermarkSupport.watermarkExpression +import org.apache.spark.sql.types._ +import org.apache.spark.unsafe.types.CalendarInterval + + +/** + * Helper object for [[StreamingSymmetricHashJoinExec]]. + */ +object StreamingSymmetricHashJoinExecHelper extends PredicateHelper with Logging { + + sealed trait JoinSide + case object LeftSide extends JoinSide { override def toString(): String = "left" } + case object RightSide extends JoinSide { override def toString(): String = "right" } + + sealed trait JoinStateWatermarkPredicate + case class JoinStateKeyWatermarkPredicate(expr: Expression) extends JoinStateWatermarkPredicate + case class JoinStateValueWatermarkPredicate(expr: Expression) extends JoinStateWatermarkPredicate + + case class JoinStateWatermarkPredicates( +left: Option[JoinStateWatermarkPredicate] = None, +right: Option[JoinStateWatermarkPredicate] = None) + + def getStateWatermarkPredicates( + leftAttributes: Seq[Attribute], + rightAttributes: Seq[Attribute], + leftKeys: Seq[Expression], + rightKeys: Seq[Expression], + condition: Option[Expression], + eventTimeWatermark: Option[Long]): JoinStateWatermarkPredicates = { +val joinKeyOrdinalForWatermark: Option[Int] = { + leftKeys.zipWithIndex.collectFirst { +case (ne: NamedExpression, index) if ne.metadata.contains(delayKey) => index + } orElse { +rightKeys.zipWithIndex.collectFirst { + case (ne: NamedExpression, index) if ne.metadata.contains(delayKey) => index +} + } +} + +def getOneSideStateWatermarkPredicate( +oneSideInputAttributes: Seq[Attribute], +oneSideJoinKeys: Seq[Expression], +otherSideInputAttributes: Seq[Attribute]): Option[JoinStateWatermarkPredicate] = { + val isWatermarkDefinedOnInput = oneSideInputAttributes.exists(_.metadata.contains(delayKey)) + val isWatermarkDefinedOnJoinKey = joinKeyOrdinalForWatermark.isDefined + + if (isWatermarkDefinedOnJoinKey) { // case 1 and 3 explained in the class docs +val keyExprWithWatermark = BoundReference( + joinKeyOrdinalForWatermark.get, + oneSideJoinKeys(joinKeyOrdinalForWatermark.get).dataType, + oneSideJoinKeys(joinKeyOrdinalForWatermark.get).nullable) +val expr = watermarkExpression(Some(keyExprWithWatermark), eventTimeWatermark) +expr.map(JoinStateKeyWatermarkPredicate) + + } else if (isWatermarkDefinedOnInput) { // case 2 explained in the class docs +val stateValueWatermark = getStateValueWatermark( + attributesToFindStateWatemarkFor = oneSideInputAttributes, + attributesWithEventWatermark = otherSideInputAttributes, + condition, + eventTimeWatermark) +val inputAttributeWithWatermark = oneSideInputAttributes.find(_.metadata.contains(delayKey)) +val expr = watermarkExpression(inputAttributeWithWatermark, stateValueWatermark) +expr.map(JoinStat
[GitHub] spark pull request #18754: [WIP][SPARK-21552][SQL] Add DecimalType support t...
Github user BryanCutler commented on a diff in the pull request: https://github.com/apache/spark/pull/18754#discussion_r139835312 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowWriter.scala --- @@ -224,6 +226,25 @@ private[arrow] class DoubleWriter(val valueVector: NullableFloat8Vector) extends } } +private[arrow] class DecimalWriter( +val valueVector: NullableDecimalVector, +precision: Int, +scale: Int) extends ArrowFieldWriter { + + override def valueMutator: NullableDecimalVector#Mutator = valueVector.getMutator() + + override def setNull(): Unit = { +valueMutator.setNull(count) + } + + override def setValue(input: SpecializedGetters, ordinal: Int): Unit = { +valueMutator.setIndexDefined(count) +val decimal = input.getDecimal(ordinal, precision, scale) +decimal.changePrecision(precision, scale) +DecimalUtility.writeBigDecimalToArrowBuf(decimal.toJavaBigDecimal, valueVector.getBuffer, count) --- End diff -- It was an issue with StringWriter, I put the fix in #19284 please take a look, thanks! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19141: [SPARK-21384] [YARN] Spark + YARN fails with LocalFileSy...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19141 **[Test build #81953 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81953/testReport)** for PR 19141 at commit [`d2d13fe`](https://github.com/apache/spark/commit/d2d13fe82aec1c0fc43d688dbd315385ef99be19). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19271: [SPARK-22053][SS] Stream-stream inner join in App...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/19271#discussion_r139835014 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExecHelper.scala --- @@ -0,0 +1,303 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import scala.util.control.NonFatal + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.expressions.{Add, Attribute, AttributeReference, BoundReference, Cast, CheckOverflow, Expression, ExpressionSet, GreaterThan, GreaterThanOrEqual, LessThan, LessThanOrEqual, Literal, Multiply, NamedExpression, PredicateHelper, Subtract, TimeAdd, TimeSub, UnaryMinus} +import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark._ +import org.apache.spark.sql.execution.streaming.WatermarkSupport.watermarkExpression +import org.apache.spark.sql.types._ +import org.apache.spark.unsafe.types.CalendarInterval + + +/** + * Helper object for [[StreamingSymmetricHashJoinExec]]. + */ +object StreamingSymmetricHashJoinExecHelper extends PredicateHelper with Logging { + + sealed trait JoinSide + case object LeftSide extends JoinSide { override def toString(): String = "left" } + case object RightSide extends JoinSide { override def toString(): String = "right" } + + sealed trait JoinStateWatermarkPredicate + case class JoinStateKeyWatermarkPredicate(expr: Expression) extends JoinStateWatermarkPredicate + case class JoinStateValueWatermarkPredicate(expr: Expression) extends JoinStateWatermarkPredicate + + case class JoinStateWatermarkPredicates( +left: Option[JoinStateWatermarkPredicate] = None, +right: Option[JoinStateWatermarkPredicate] = None) + + def getStateWatermarkPredicates( + leftAttributes: Seq[Attribute], + rightAttributes: Seq[Attribute], + leftKeys: Seq[Expression], + rightKeys: Seq[Expression], + condition: Option[Expression], + eventTimeWatermark: Option[Long]): JoinStateWatermarkPredicates = { +val joinKeyOrdinalForWatermark: Option[Int] = { + leftKeys.zipWithIndex.collectFirst { +case (ne: NamedExpression, index) if ne.metadata.contains(delayKey) => index + } orElse { +rightKeys.zipWithIndex.collectFirst { + case (ne: NamedExpression, index) if ne.metadata.contains(delayKey) => index +} + } +} + +def getOneSideStateWatermarkPredicate( +oneSideInputAttributes: Seq[Attribute], +oneSideJoinKeys: Seq[Expression], +otherSideInputAttributes: Seq[Attribute]): Option[JoinStateWatermarkPredicate] = { + val isWatermarkDefinedOnInput = oneSideInputAttributes.exists(_.metadata.contains(delayKey)) + val isWatermarkDefinedOnJoinKey = joinKeyOrdinalForWatermark.isDefined + + if (isWatermarkDefinedOnJoinKey) { // case 1 and 3 explained in the class docs +val keyExprWithWatermark = BoundReference( + joinKeyOrdinalForWatermark.get, + oneSideJoinKeys(joinKeyOrdinalForWatermark.get).dataType, + oneSideJoinKeys(joinKeyOrdinalForWatermark.get).nullable) +val expr = watermarkExpression(Some(keyExprWithWatermark), eventTimeWatermark) +expr.map(JoinStateKeyWatermarkPredicate) + + } else if (isWatermarkDefinedOnInput) { // case 2 explained in the class docs +val stateValueWatermark = getStateValueWatermark( + attributesToFindStateWatemarkFor = oneSideInputAttributes, + attributesWithEventWatermark = otherSideInputAttributes, + condition, + eventTimeWatermark) +val inputAttributeWithWatermark = oneSideInputAttributes.find(_.metadata.contains(delayKey)) +val expr = watermarkExpression(inputAttributeWithWatermark, stateValueWatermark) +expr.map(JoinStat
[GitHub] spark issue #19284: [SPARK-22067][SQL] ArrowWriter should use position when ...
Github user BryanCutler commented on the issue: https://github.com/apache/spark/pull/19284 @ueshin and @icexellos this came up while testing with Arrow 0.7.0. It seems that when Spark gets row data as a UTF8String ByteBuffer, the data can start at an offset which becomes the ByteBuffer position when this line is called https://github.com/apache/spark/blob/master/common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java#L171 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19271: [SPARK-22053][SS] Stream-stream inner join in App...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/19271#discussion_r139834609 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExecHelper.scala --- @@ -0,0 +1,303 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import scala.util.control.NonFatal + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.expressions.{Add, Attribute, AttributeReference, BoundReference, Cast, CheckOverflow, Expression, ExpressionSet, GreaterThan, GreaterThanOrEqual, LessThan, LessThanOrEqual, Literal, Multiply, NamedExpression, PredicateHelper, Subtract, TimeAdd, TimeSub, UnaryMinus} +import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark._ +import org.apache.spark.sql.execution.streaming.WatermarkSupport.watermarkExpression +import org.apache.spark.sql.types._ +import org.apache.spark.unsafe.types.CalendarInterval + + +/** + * Helper object for [[StreamingSymmetricHashJoinExec]]. + */ +object StreamingSymmetricHashJoinExecHelper extends PredicateHelper with Logging { + + sealed trait JoinSide + case object LeftSide extends JoinSide { override def toString(): String = "left" } + case object RightSide extends JoinSide { override def toString(): String = "right" } + + sealed trait JoinStateWatermarkPredicate + case class JoinStateKeyWatermarkPredicate(expr: Expression) extends JoinStateWatermarkPredicate + case class JoinStateValueWatermarkPredicate(expr: Expression) extends JoinStateWatermarkPredicate + + case class JoinStateWatermarkPredicates( +left: Option[JoinStateWatermarkPredicate] = None, +right: Option[JoinStateWatermarkPredicate] = None) + + def getStateWatermarkPredicates( + leftAttributes: Seq[Attribute], + rightAttributes: Seq[Attribute], + leftKeys: Seq[Expression], + rightKeys: Seq[Expression], + condition: Option[Expression], + eventTimeWatermark: Option[Long]): JoinStateWatermarkPredicates = { +val joinKeyOrdinalForWatermark: Option[Int] = { + leftKeys.zipWithIndex.collectFirst { +case (ne: NamedExpression, index) if ne.metadata.contains(delayKey) => index + } orElse { +rightKeys.zipWithIndex.collectFirst { + case (ne: NamedExpression, index) if ne.metadata.contains(delayKey) => index +} + } +} + +def getOneSideStateWatermarkPredicate( +oneSideInputAttributes: Seq[Attribute], +oneSideJoinKeys: Seq[Expression], +otherSideInputAttributes: Seq[Attribute]): Option[JoinStateWatermarkPredicate] = { + val isWatermarkDefinedOnInput = oneSideInputAttributes.exists(_.metadata.contains(delayKey)) + val isWatermarkDefinedOnJoinKey = joinKeyOrdinalForWatermark.isDefined + + if (isWatermarkDefinedOnJoinKey) { // case 1 and 3 explained in the class docs +val keyExprWithWatermark = BoundReference( + joinKeyOrdinalForWatermark.get, + oneSideJoinKeys(joinKeyOrdinalForWatermark.get).dataType, + oneSideJoinKeys(joinKeyOrdinalForWatermark.get).nullable) +val expr = watermarkExpression(Some(keyExprWithWatermark), eventTimeWatermark) +expr.map(JoinStateKeyWatermarkPredicate) + + } else if (isWatermarkDefinedOnInput) { // case 2 explained in the class docs +val stateValueWatermark = getStateValueWatermark( + attributesToFindStateWatemarkFor = oneSideInputAttributes, + attributesWithEventWatermark = otherSideInputAttributes, + condition, + eventTimeWatermark) +val inputAttributeWithWatermark = oneSideInputAttributes.find(_.metadata.contains(delayKey)) +val expr = watermarkExpression(inputAttributeWithWatermark, stateValueWatermark) +expr.map(JoinStat
[GitHub] spark issue #19141: [SPARK-21384] [YARN] Spark + YARN fails with LocalFileSy...
Github user vanzin commented on the issue: https://github.com/apache/spark/pull/19141 ok to test --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19269: [SPARK-22026][SQL][WIP] data source v2 write path
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/19269#discussion_r139834571 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/ReadSupport.java --- @@ -30,9 +30,8 @@ /** * Creates a {@link DataSourceV2Reader} to scan the data from this data source. * - * @param options the options for this data source reader, which is an immutable case-insensitive - *string-to-string map. - * @return a reader that implements the actual read logic. + * @param options the options for the returned data source reader, which is an immutable + *case-insensitive string-to-string map. --- End diff -- It would make this much easier to review if changes to the read path were taken out and committed in a follow-up to #19136. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19271: [SPARK-22053][SS] Stream-stream inner join in App...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/19271#discussion_r139834356 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExecHelper.scala --- @@ -0,0 +1,303 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import scala.util.control.NonFatal + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.expressions.{Add, Attribute, AttributeReference, BoundReference, Cast, CheckOverflow, Expression, ExpressionSet, GreaterThan, GreaterThanOrEqual, LessThan, LessThanOrEqual, Literal, Multiply, NamedExpression, PredicateHelper, Subtract, TimeAdd, TimeSub, UnaryMinus} +import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark._ +import org.apache.spark.sql.execution.streaming.WatermarkSupport.watermarkExpression +import org.apache.spark.sql.types._ +import org.apache.spark.unsafe.types.CalendarInterval + + +/** + * Helper object for [[StreamingSymmetricHashJoinExec]]. + */ +object StreamingSymmetricHashJoinExecHelper extends PredicateHelper with Logging { + + sealed trait JoinSide + case object LeftSide extends JoinSide { override def toString(): String = "left" } + case object RightSide extends JoinSide { override def toString(): String = "right" } + + sealed trait JoinStateWatermarkPredicate + case class JoinStateKeyWatermarkPredicate(expr: Expression) extends JoinStateWatermarkPredicate + case class JoinStateValueWatermarkPredicate(expr: Expression) extends JoinStateWatermarkPredicate + + case class JoinStateWatermarkPredicates( +left: Option[JoinStateWatermarkPredicate] = None, +right: Option[JoinStateWatermarkPredicate] = None) + + def getStateWatermarkPredicates( + leftAttributes: Seq[Attribute], + rightAttributes: Seq[Attribute], + leftKeys: Seq[Expression], + rightKeys: Seq[Expression], + condition: Option[Expression], + eventTimeWatermark: Option[Long]): JoinStateWatermarkPredicates = { +val joinKeyOrdinalForWatermark: Option[Int] = { + leftKeys.zipWithIndex.collectFirst { +case (ne: NamedExpression, index) if ne.metadata.contains(delayKey) => index + } orElse { +rightKeys.zipWithIndex.collectFirst { + case (ne: NamedExpression, index) if ne.metadata.contains(delayKey) => index +} + } +} + +def getOneSideStateWatermarkPredicate( +oneSideInputAttributes: Seq[Attribute], +oneSideJoinKeys: Seq[Expression], +otherSideInputAttributes: Seq[Attribute]): Option[JoinStateWatermarkPredicate] = { + val isWatermarkDefinedOnInput = oneSideInputAttributes.exists(_.metadata.contains(delayKey)) + val isWatermarkDefinedOnJoinKey = joinKeyOrdinalForWatermark.isDefined + + if (isWatermarkDefinedOnJoinKey) { // case 1 and 3 explained in the class docs +val keyExprWithWatermark = BoundReference( + joinKeyOrdinalForWatermark.get, + oneSideJoinKeys(joinKeyOrdinalForWatermark.get).dataType, + oneSideJoinKeys(joinKeyOrdinalForWatermark.get).nullable) +val expr = watermarkExpression(Some(keyExprWithWatermark), eventTimeWatermark) +expr.map(JoinStateKeyWatermarkPredicate) + + } else if (isWatermarkDefinedOnInput) { // case 2 explained in the class docs +val stateValueWatermark = getStateValueWatermark( + attributesToFindStateWatemarkFor = oneSideInputAttributes, + attributesWithEventWatermark = otherSideInputAttributes, + condition, + eventTimeWatermark) +val inputAttributeWithWatermark = oneSideInputAttributes.find(_.metadata.contains(delayKey)) +val expr = watermarkExpression(inputAttributeWithWatermark, stateValueWatermark) +expr.map(JoinStat
[GitHub] spark issue #19284: [SPARK-22067][SQL] ArrowWriter should use position when ...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19284 **[Test build #81952 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81952/testReport)** for PR 19284 at commit [`5ac572d`](https://github.com/apache/spark/commit/5ac572d3cb2422f57e101fa2cbc761f4b748daa6). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19284: [SPARK-22067][SQL] ArrowWriter should use positio...
GitHub user BryanCutler opened a pull request: https://github.com/apache/spark/pull/19284 [SPARK-22067][SQL] ArrowWriter should use position when setting UTF8String ByteBuffer ## What changes were proposed in this pull request? The ArrowWriter StringWriter was setting Arrow data using a position of 0 instead of the actual position in the ByteBuffer. This was currently working because of a bug ARROW-1447, and has been fixed as of Arrow 0.7.0. Testing with this version revealed the error in ArrowConvertersSuite test string conversion. ## How was this patch tested? Existing tests, manually verified working with Arrow 0.7.0 You can merge this pull request into a Git repository by running: $ git pull https://github.com/BryanCutler/spark arrow-ArrowWriter-StringWriter-position-SPARK-22067 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/19284.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #19284 commit 5ac572d3cb2422f57e101fa2cbc761f4b748daa6 Author: Bryan Cutler Date: 2017-09-19T22:17:23Z pass in position of ByteBuffer when using StringWriter --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19271: [SPARK-22053][SS] Stream-stream inner join in App...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/19271#discussion_r139833753 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExecHelper.scala --- @@ -0,0 +1,303 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import scala.util.control.NonFatal + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.expressions.{Add, Attribute, AttributeReference, BoundReference, Cast, CheckOverflow, Expression, ExpressionSet, GreaterThan, GreaterThanOrEqual, LessThan, LessThanOrEqual, Literal, Multiply, NamedExpression, PredicateHelper, Subtract, TimeAdd, TimeSub, UnaryMinus} +import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark._ +import org.apache.spark.sql.execution.streaming.WatermarkSupport.watermarkExpression +import org.apache.spark.sql.types._ +import org.apache.spark.unsafe.types.CalendarInterval + + +/** + * Helper object for [[StreamingSymmetricHashJoinExec]]. + */ +object StreamingSymmetricHashJoinExecHelper extends PredicateHelper with Logging { + + sealed trait JoinSide + case object LeftSide extends JoinSide { override def toString(): String = "left" } + case object RightSide extends JoinSide { override def toString(): String = "right" } + + sealed trait JoinStateWatermarkPredicate + case class JoinStateKeyWatermarkPredicate(expr: Expression) extends JoinStateWatermarkPredicate + case class JoinStateValueWatermarkPredicate(expr: Expression) extends JoinStateWatermarkPredicate + + case class JoinStateWatermarkPredicates( +left: Option[JoinStateWatermarkPredicate] = None, +right: Option[JoinStateWatermarkPredicate] = None) + + def getStateWatermarkPredicates( + leftAttributes: Seq[Attribute], + rightAttributes: Seq[Attribute], + leftKeys: Seq[Expression], + rightKeys: Seq[Expression], + condition: Option[Expression], + eventTimeWatermark: Option[Long]): JoinStateWatermarkPredicates = { +val joinKeyOrdinalForWatermark: Option[Int] = { + leftKeys.zipWithIndex.collectFirst { +case (ne: NamedExpression, index) if ne.metadata.contains(delayKey) => index + } orElse { +rightKeys.zipWithIndex.collectFirst { + case (ne: NamedExpression, index) if ne.metadata.contains(delayKey) => index +} + } +} + +def getOneSideStateWatermarkPredicate( +oneSideInputAttributes: Seq[Attribute], +oneSideJoinKeys: Seq[Expression], +otherSideInputAttributes: Seq[Attribute]): Option[JoinStateWatermarkPredicate] = { + val isWatermarkDefinedOnInput = oneSideInputAttributes.exists(_.metadata.contains(delayKey)) + val isWatermarkDefinedOnJoinKey = joinKeyOrdinalForWatermark.isDefined + + if (isWatermarkDefinedOnJoinKey) { // case 1 and 3 explained in the class docs +val keyExprWithWatermark = BoundReference( + joinKeyOrdinalForWatermark.get, + oneSideJoinKeys(joinKeyOrdinalForWatermark.get).dataType, + oneSideJoinKeys(joinKeyOrdinalForWatermark.get).nullable) +val expr = watermarkExpression(Some(keyExprWithWatermark), eventTimeWatermark) +expr.map(JoinStateKeyWatermarkPredicate) + + } else if (isWatermarkDefinedOnInput) { // case 2 explained in the class docs +val stateValueWatermark = getStateValueWatermark( + attributesToFindStateWatemarkFor = oneSideInputAttributes, + attributesWithEventWatermark = otherSideInputAttributes, + condition, + eventTimeWatermark) +val inputAttributeWithWatermark = oneSideInputAttributes.find(_.metadata.contains(delayKey)) +val expr = watermarkExpression(inputAttributeWithWatermark, stateValueWatermark) +expr.map(JoinStat
[GitHub] spark pull request #19271: [SPARK-22053][SS] Stream-stream inner join in App...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/19271#discussion_r139833518 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExecHelper.scala --- @@ -0,0 +1,303 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import scala.util.control.NonFatal + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.expressions.{Add, Attribute, AttributeReference, BoundReference, Cast, CheckOverflow, Expression, ExpressionSet, GreaterThan, GreaterThanOrEqual, LessThan, LessThanOrEqual, Literal, Multiply, NamedExpression, PredicateHelper, Subtract, TimeAdd, TimeSub, UnaryMinus} +import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark._ +import org.apache.spark.sql.execution.streaming.WatermarkSupport.watermarkExpression +import org.apache.spark.sql.types._ +import org.apache.spark.unsafe.types.CalendarInterval + + +/** + * Helper object for [[StreamingSymmetricHashJoinExec]]. + */ +object StreamingSymmetricHashJoinExecHelper extends PredicateHelper with Logging { + + sealed trait JoinSide + case object LeftSide extends JoinSide { override def toString(): String = "left" } + case object RightSide extends JoinSide { override def toString(): String = "right" } + + sealed trait JoinStateWatermarkPredicate + case class JoinStateKeyWatermarkPredicate(expr: Expression) extends JoinStateWatermarkPredicate + case class JoinStateValueWatermarkPredicate(expr: Expression) extends JoinStateWatermarkPredicate + + case class JoinStateWatermarkPredicates( +left: Option[JoinStateWatermarkPredicate] = None, +right: Option[JoinStateWatermarkPredicate] = None) + + def getStateWatermarkPredicates( + leftAttributes: Seq[Attribute], + rightAttributes: Seq[Attribute], + leftKeys: Seq[Expression], + rightKeys: Seq[Expression], + condition: Option[Expression], + eventTimeWatermark: Option[Long]): JoinStateWatermarkPredicates = { +val joinKeyOrdinalForWatermark: Option[Int] = { + leftKeys.zipWithIndex.collectFirst { +case (ne: NamedExpression, index) if ne.metadata.contains(delayKey) => index + } orElse { +rightKeys.zipWithIndex.collectFirst { + case (ne: NamedExpression, index) if ne.metadata.contains(delayKey) => index +} + } +} + +def getOneSideStateWatermarkPredicate( +oneSideInputAttributes: Seq[Attribute], +oneSideJoinKeys: Seq[Expression], +otherSideInputAttributes: Seq[Attribute]): Option[JoinStateWatermarkPredicate] = { + val isWatermarkDefinedOnInput = oneSideInputAttributes.exists(_.metadata.contains(delayKey)) + val isWatermarkDefinedOnJoinKey = joinKeyOrdinalForWatermark.isDefined + + if (isWatermarkDefinedOnJoinKey) { // case 1 and 3 explained in the class docs +val keyExprWithWatermark = BoundReference( + joinKeyOrdinalForWatermark.get, + oneSideJoinKeys(joinKeyOrdinalForWatermark.get).dataType, + oneSideJoinKeys(joinKeyOrdinalForWatermark.get).nullable) +val expr = watermarkExpression(Some(keyExprWithWatermark), eventTimeWatermark) +expr.map(JoinStateKeyWatermarkPredicate) + + } else if (isWatermarkDefinedOnInput) { // case 2 explained in the class docs +val stateValueWatermark = getStateValueWatermark( + attributesToFindStateWatemarkFor = oneSideInputAttributes, + attributesWithEventWatermark = otherSideInputAttributes, + condition, + eventTimeWatermark) +val inputAttributeWithWatermark = oneSideInputAttributes.find(_.metadata.contains(delayKey)) +val expr = watermarkExpression(inputAttributeWithWatermark, stateValueWatermark) +expr.map(JoinStat