[GitHub] spark pull request #21650: [SPARK-24624][SQL][PYTHON] Support mixture of Pyt...
Github user BryanCutler commented on a diff in the pull request: https://github.com/apache/spark/pull/21650#discussion_r205187569 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala --- @@ -94,36 +95,94 @@ object ExtractPythonUDFFromAggregate extends Rule[LogicalPlan] { */ object ExtractPythonUDFs extends Rule[SparkPlan] with PredicateHelper { - private def hasPythonUDF(e: Expression): Boolean = { + private case class LazyEvalType(var evalType: Int = -1) { + +def isSet: Boolean = evalType >= 0 + +def set(evalType: Int): Unit = { + if (isSet) { +throw new IllegalStateException("Eval type has already been set") + } else { +this.evalType = evalType + } +} + +def get(): Int = { + if (!isSet) { +throw new IllegalStateException("Eval type is not set") + } else { +evalType + } +} + } + + private def hasScalarPythonUDF(e: Expression): Boolean = { e.find(PythonUDF.isScalarPythonUDF).isDefined } - private def canEvaluateInPython(e: PythonUDF): Boolean = { -e.children match { - // single PythonUDF child could be chained and evaluated in Python - case Seq(u: PythonUDF) => canEvaluateInPython(u) - // Python UDF can't be evaluated directly in JVM - case children => !children.exists(hasPythonUDF) + /** + * Check whether a PythonUDF expression can be evaluated in Python. + * + * If the lazy eval type is not set, this method checks for either Batched Python UDF and Scalar + * Pandas UDF. If the lazy eval type is set, this method checks for the expression of the + * specified eval type. + * + * This method will also set the lazy eval type to be the type of the first evaluable expression, + * i.e., if lazy eval type is not set and we find a evaluable Python UDF expression, lazy eval + * type will be set to the eval type of the expression. + * + */ + private def canEvaluateInPython(e: PythonUDF, lazyEvalType: LazyEvalType): Boolean = { --- End diff -- The one method seems overly complicated, so I prefer the code from my suggestion. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21821: [SPARK-24867] [SQL] Add AnalysisBarrier to DataFrameWrit...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21821 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/1319/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21873: [SPARK-24919][BUILD] New linter rule for sparkContext.ha...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21873 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/1318/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21821: [SPARK-24867] [SQL] Add AnalysisBarrier to DataFrameWrit...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21821 **[Test build #93553 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/93553/testReport)** for PR 21821 at commit [`328addd`](https://github.com/apache/spark/commit/328adddc0c1870400e92934827150df2c98731f6). * This patch **fails Scala style tests**. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21821: [SPARK-24867] [SQL] Add AnalysisBarrier to DataFrameWrit...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21821 Merged build finished. Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21821: [SPARK-24867] [SQL] Add AnalysisBarrier to DataFrameWrit...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21821 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/93553/ Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21821: [SPARK-24867] [SQL] Add AnalysisBarrier to DataFrameWrit...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21821 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21748: [SPARK-23146][K8S] Support client mode.
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21748 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21873: [SPARK-24919][BUILD] New linter rule for sparkContext.ha...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21873 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21748: [SPARK-23146][K8S] Support client mode.
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21748 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/1317/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21748: [SPARK-23146][K8S] Support client mode.
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21748 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/93551/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21748: [SPARK-23146][K8S] Support client mode.
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21748 **[Test build #93551 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/93551/testReport)** for PR 21748 at commit [`ded1ff6`](https://github.com/apache/spark/commit/ded1ff6081da6f0b3879f6bf63b73caf01983bea). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21748: [SPARK-23146][K8S] Support client mode.
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21748 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21103: [SPARK-23915][SQL] Add array_except function
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/21103#discussion_r205186167 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -3805,3 +3799,330 @@ object ArrayUnion { new GenericArrayData(arrayBuffer) } } + +/** + * Returns an array of the elements in the intersect of x and y, without duplicates + */ +@ExpressionDescription( + usage = """ + _FUNC_(array1, array2) - Returns an array of the elements in array1 but not in array2, +without duplicates. + """, + examples = """ +Examples: + > SELECT _FUNC_(array(1, 2, 3), array(1, 3, 5)); + array(2) + """, + since = "2.4.0") +case class ArrayExcept(left: Expression, right: Expression) extends ArraySetLike { + override def dataType: DataType = left.dataType + + var hsInt: OpenHashSet[Int] = _ + var hsLong: OpenHashSet[Long] = _ + + def assignInt(array: ArrayData, idx: Int, resultArray: ArrayData, pos: Int): Boolean = { +val elem = array.getInt(idx) +if (!hsInt.contains(elem)) { + if (resultArray != null) { +resultArray.setInt(pos, elem) + } + hsInt.add(elem) + true +} else { + false +} + } + + def assignLong(array: ArrayData, idx: Int, resultArray: ArrayData, pos: Int): Boolean = { +val elem = array.getLong(idx) +if (!hsLong.contains(elem)) { + if (resultArray != null) { +resultArray.setLong(pos, elem) + } + hsLong.add(elem) + true +} else { + false +} + } + + def evalIntLongPrimitiveType( + array1: ArrayData, + array2: ArrayData, + resultArray: ArrayData, + isLongType: Boolean): Int = { +// store elements into resultArray +var notFoundNullElement = true +var i = 0 +while (i < array2.numElements()) { + if (array2.isNullAt(i)) { +notFoundNullElement = false + } else { +val assigned = if (!isLongType) { + hsInt.add(array2.getInt(i)) +} else { + hsLong.add(array2.getLong(i)) +} + } + i += 1 +} +var pos = 0 +i = 0 +while (i < array1.numElements()) { + if (array1.isNullAt(i)) { +if (notFoundNullElement) { + if (resultArray != null) { +resultArray.setNullAt(pos) + } + pos += 1 + notFoundNullElement = false +} + } else { +val assigned = if (!isLongType) { + assignInt(array1, i, resultArray, pos) +} else { + assignLong(array1, i, resultArray, pos) +} +if (assigned) { + pos += 1 +} + } + i += 1 +} +pos + } + + override def nullSafeEval(input1: Any, input2: Any): Any = { +val array1 = input1.asInstanceOf[ArrayData] +val array2 = input2.asInstanceOf[ArrayData] + +if (elementTypeSupportEquals) { + elementType match { +case IntegerType => + // avoid boxing of primitive int array elements + // calculate result array size + hsInt = new OpenHashSet[Int] + val elements = evalIntLongPrimitiveType(array1, array2, null, false) + // allocate result array + hsInt = new OpenHashSet[Int] + val resultArray = if (UnsafeArrayData.shouldUseGenericArrayData( +IntegerType.defaultSize, elements)) { +new GenericArrayData(new Array[Any](elements)) + } else { +UnsafeArrayData.forPrimitiveArray( + Platform.INT_ARRAY_OFFSET, elements, IntegerType.defaultSize) + } + // assign elements into the result array + evalIntLongPrimitiveType(array1, array2, resultArray, false) + resultArray +case LongType => + // avoid boxing of primitive long array elements + // calculate result array size + hsLong = new OpenHashSet[Long] + val elements = evalIntLongPrimitiveType(array1, array2, null, true) + // allocate result array + hsLong = new OpenHashSet[Long] + val resultArray = if (UnsafeArrayData.shouldUseGenericArrayData( +LongType.defaultSize, elements)) { +new GenericArrayData(new Array[Any](elements)) + } else { +UnsafeArrayData.forPrimitiveArray( + Platform.LONG_ARRAY_OFFSET, elements, LongType.defaultSize) + } +
[GitHub] spark issue #21748: [SPARK-23146][K8S] Support client mode.
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21748 Kubernetes integration test status success URL: https://amplab.cs.berkeley.edu/jenkins/job/testing-k8s-prb-make-spark-distribution-unified/1317/ --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21103: [SPARK-23915][SQL] Add array_except function
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/21103#discussion_r205186241 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -3805,3 +3799,330 @@ object ArrayUnion { new GenericArrayData(arrayBuffer) } } + +/** + * Returns an array of the elements in the intersect of x and y, without duplicates + */ +@ExpressionDescription( + usage = """ + _FUNC_(array1, array2) - Returns an array of the elements in array1 but not in array2, +without duplicates. + """, + examples = """ +Examples: + > SELECT _FUNC_(array(1, 2, 3), array(1, 3, 5)); + array(2) + """, + since = "2.4.0") +case class ArrayExcept(left: Expression, right: Expression) extends ArraySetLike { + override def dataType: DataType = left.dataType + + var hsInt: OpenHashSet[Int] = _ + var hsLong: OpenHashSet[Long] = _ + + def assignInt(array: ArrayData, idx: Int, resultArray: ArrayData, pos: Int): Boolean = { +val elem = array.getInt(idx) +if (!hsInt.contains(elem)) { + if (resultArray != null) { +resultArray.setInt(pos, elem) + } + hsInt.add(elem) + true +} else { + false +} + } + + def assignLong(array: ArrayData, idx: Int, resultArray: ArrayData, pos: Int): Boolean = { +val elem = array.getLong(idx) +if (!hsLong.contains(elem)) { + if (resultArray != null) { +resultArray.setLong(pos, elem) + } + hsLong.add(elem) + true +} else { + false +} + } + + def evalIntLongPrimitiveType( + array1: ArrayData, + array2: ArrayData, + resultArray: ArrayData, + isLongType: Boolean): Int = { +// store elements into resultArray +var notFoundNullElement = true +var i = 0 +while (i < array2.numElements()) { + if (array2.isNullAt(i)) { +notFoundNullElement = false + } else { +val assigned = if (!isLongType) { + hsInt.add(array2.getInt(i)) +} else { + hsLong.add(array2.getLong(i)) +} + } + i += 1 +} +var pos = 0 +i = 0 +while (i < array1.numElements()) { + if (array1.isNullAt(i)) { +if (notFoundNullElement) { + if (resultArray != null) { +resultArray.setNullAt(pos) + } + pos += 1 + notFoundNullElement = false +} + } else { +val assigned = if (!isLongType) { + assignInt(array1, i, resultArray, pos) +} else { + assignLong(array1, i, resultArray, pos) +} +if (assigned) { + pos += 1 +} + } + i += 1 +} +pos + } + + override def nullSafeEval(input1: Any, input2: Any): Any = { +val array1 = input1.asInstanceOf[ArrayData] +val array2 = input2.asInstanceOf[ArrayData] + +if (elementTypeSupportEquals) { + elementType match { +case IntegerType => + // avoid boxing of primitive int array elements + // calculate result array size + hsInt = new OpenHashSet[Int] + val elements = evalIntLongPrimitiveType(array1, array2, null, false) + // allocate result array + hsInt = new OpenHashSet[Int] + val resultArray = if (UnsafeArrayData.shouldUseGenericArrayData( +IntegerType.defaultSize, elements)) { +new GenericArrayData(new Array[Any](elements)) + } else { +UnsafeArrayData.forPrimitiveArray( + Platform.INT_ARRAY_OFFSET, elements, IntegerType.defaultSize) + } + // assign elements into the result array + evalIntLongPrimitiveType(array1, array2, resultArray, false) + resultArray +case LongType => + // avoid boxing of primitive long array elements + // calculate result array size + hsLong = new OpenHashSet[Long] + val elements = evalIntLongPrimitiveType(array1, array2, null, true) + // allocate result array + hsLong = new OpenHashSet[Long] + val resultArray = if (UnsafeArrayData.shouldUseGenericArrayData( +LongType.defaultSize, elements)) { +new GenericArrayData(new Array[Any](elements)) + } else { +UnsafeArrayData.forPrimitiveArray( + Platform.LONG_ARRAY_OFFSET, elements, LongType.defaultSize) + } +
[GitHub] spark pull request #21103: [SPARK-23915][SQL] Add array_except function
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/21103#discussion_r205186075 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -3805,3 +3799,330 @@ object ArrayUnion { new GenericArrayData(arrayBuffer) } } + +/** + * Returns an array of the elements in the intersect of x and y, without duplicates + */ +@ExpressionDescription( + usage = """ + _FUNC_(array1, array2) - Returns an array of the elements in array1 but not in array2, +without duplicates. + """, + examples = """ +Examples: + > SELECT _FUNC_(array(1, 2, 3), array(1, 3, 5)); + array(2) + """, + since = "2.4.0") +case class ArrayExcept(left: Expression, right: Expression) extends ArraySetLike { + override def dataType: DataType = left.dataType + + var hsInt: OpenHashSet[Int] = _ + var hsLong: OpenHashSet[Long] = _ + + def assignInt(array: ArrayData, idx: Int, resultArray: ArrayData, pos: Int): Boolean = { +val elem = array.getInt(idx) +if (!hsInt.contains(elem)) { + if (resultArray != null) { +resultArray.setInt(pos, elem) + } + hsInt.add(elem) + true +} else { + false +} + } + + def assignLong(array: ArrayData, idx: Int, resultArray: ArrayData, pos: Int): Boolean = { +val elem = array.getLong(idx) +if (!hsLong.contains(elem)) { + if (resultArray != null) { +resultArray.setLong(pos, elem) + } + hsLong.add(elem) + true +} else { + false +} + } + + def evalIntLongPrimitiveType( + array1: ArrayData, + array2: ArrayData, + resultArray: ArrayData, + isLongType: Boolean): Int = { +// store elements into resultArray +var notFoundNullElement = true +var i = 0 +while (i < array2.numElements()) { + if (array2.isNullAt(i)) { +notFoundNullElement = false + } else { +val assigned = if (!isLongType) { + hsInt.add(array2.getInt(i)) +} else { + hsLong.add(array2.getLong(i)) +} + } + i += 1 +} +var pos = 0 +i = 0 +while (i < array1.numElements()) { + if (array1.isNullAt(i)) { +if (notFoundNullElement) { + if (resultArray != null) { +resultArray.setNullAt(pos) + } + pos += 1 + notFoundNullElement = false +} + } else { +val assigned = if (!isLongType) { + assignInt(array1, i, resultArray, pos) +} else { + assignLong(array1, i, resultArray, pos) +} +if (assigned) { + pos += 1 +} + } + i += 1 +} +pos + } + + override def nullSafeEval(input1: Any, input2: Any): Any = { +val array1 = input1.asInstanceOf[ArrayData] +val array2 = input2.asInstanceOf[ArrayData] + +if (elementTypeSupportEquals) { + elementType match { +case IntegerType => + // avoid boxing of primitive int array elements + // calculate result array size + hsInt = new OpenHashSet[Int] + val elements = evalIntLongPrimitiveType(array1, array2, null, false) + // allocate result array + hsInt = new OpenHashSet[Int] + val resultArray = if (UnsafeArrayData.shouldUseGenericArrayData( +IntegerType.defaultSize, elements)) { +new GenericArrayData(new Array[Any](elements)) + } else { +UnsafeArrayData.forPrimitiveArray( + Platform.INT_ARRAY_OFFSET, elements, IntegerType.defaultSize) + } + // assign elements into the result array + evalIntLongPrimitiveType(array1, array2, resultArray, false) + resultArray +case LongType => + // avoid boxing of primitive long array elements + // calculate result array size + hsLong = new OpenHashSet[Long] + val elements = evalIntLongPrimitiveType(array1, array2, null, true) + // allocate result array + hsLong = new OpenHashSet[Long] + val resultArray = if (UnsafeArrayData.shouldUseGenericArrayData( +LongType.defaultSize, elements)) { +new GenericArrayData(new Array[Any](elements)) + } else { +UnsafeArrayData.forPrimitiveArray( + Platform.LONG_ARRAY_OFFSET, elements, LongType.defaultSize) + } +
[GitHub] spark pull request #21650: [SPARK-24624][SQL][PYTHON] Support mixture of Pyt...
Github user BryanCutler commented on a diff in the pull request: https://github.com/apache/spark/pull/21650#discussion_r205186820 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala --- @@ -94,36 +95,94 @@ object ExtractPythonUDFFromAggregate extends Rule[LogicalPlan] { */ object ExtractPythonUDFs extends Rule[SparkPlan] with PredicateHelper { - private def hasPythonUDF(e: Expression): Boolean = { + private case class LazyEvalType(var evalType: Int = -1) { --- End diff -- I'm not too fond of the name `LazyEvalType`, makes it sound like something else. Maybe `CurrentEvalType`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21821: [SPARK-24867] [SQL] Add AnalysisBarrier to DataFrameWrit...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21821 **[Test build #93553 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/93553/testReport)** for PR 21821 at commit [`328addd`](https://github.com/apache/spark/commit/328adddc0c1870400e92934827150df2c98731f6). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21650: [SPARK-24624][SQL][PYTHON] Support mixture of Pyt...
Github user BryanCutler commented on a diff in the pull request: https://github.com/apache/spark/pull/21650#discussion_r205185872 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala --- @@ -94,36 +95,94 @@ object ExtractPythonUDFFromAggregate extends Rule[LogicalPlan] { */ object ExtractPythonUDFs extends Rule[SparkPlan] with PredicateHelper { - private def hasPythonUDF(e: Expression): Boolean = { + private case class LazyEvalType(var evalType: Int = -1) { + +def isSet: Boolean = evalType >= 0 + +def set(evalType: Int): Unit = { + if (isSet) { +throw new IllegalStateException("Eval type has already been set") + } else { +this.evalType = evalType + } +} + +def get(): Int = { + if (!isSet) { +throw new IllegalStateException("Eval type is not set") + } else { +evalType + } +} + } + + private def hasScalarPythonUDF(e: Expression): Boolean = { e.find(PythonUDF.isScalarPythonUDF).isDefined } - private def canEvaluateInPython(e: PythonUDF): Boolean = { -e.children match { - // single PythonUDF child could be chained and evaluated in Python - case Seq(u: PythonUDF) => canEvaluateInPython(u) - // Python UDF can't be evaluated directly in JVM - case children => !children.exists(hasPythonUDF) + /** + * Check whether a PythonUDF expression can be evaluated in Python. + * + * If the lazy eval type is not set, this method checks for either Batched Python UDF and Scalar + * Pandas UDF. If the lazy eval type is set, this method checks for the expression of the + * specified eval type. + * + * This method will also set the lazy eval type to be the type of the first evaluable expression, + * i.e., if lazy eval type is not set and we find a evaluable Python UDF expression, lazy eval + * type will be set to the eval type of the expression. + * + */ + private def canEvaluateInPython(e: PythonUDF, lazyEvalType: LazyEvalType): Boolean = { +if (!lazyEvalType.isSet) { + e.children match { +// single PythonUDF child could be chained and evaluated in Python if eval type is the same +case Seq(u: PythonUDF) => + // Need to recheck the eval type because lazy eval type will be set if child Python UDF is + // evaluable + canEvaluateInPython(u, lazyEvalType) && lazyEvalType.get == e.evalType +// Python UDF can't be evaluated directly in JVM +case children => if (!children.exists(hasScalarPythonUDF)) { + // We found the first evaluable expression, set lazy eval type to its eval type. + lazyEvalType.set(e.evalType) + true +} else { + false +} + } +} else { + if (e.evalType != lazyEvalType.get) { +false + } else { +e.children match { + case Seq(u: PythonUDF) => canEvaluateInPython(u, lazyEvalType) --- End diff -- There are 2 paths for recursion here, which is probably not a good idea. This method is much more complicated now and a little difficult to follow. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21873: [SPARK-24919][BUILD] New linter rule for sparkContext.ha...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21873 **[Test build #93552 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/93552/testReport)** for PR 21873 at commit [`8b5ad70`](https://github.com/apache/spark/commit/8b5ad70c472ea4931dfd352b28044dd26bbed2a1). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21748: [SPARK-23146][K8S] Support client mode.
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21748 Kubernetes integration test starting URL: https://amplab.cs.berkeley.edu/jenkins/job/testing-k8s-prb-make-spark-distribution-unified/1317/ --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21873: [SPARK-24919][BUILD] New linter rule for sparkCon...
GitHub user gengliangwang opened a pull request: https://github.com/apache/spark/pull/21873 [SPARK-24919][BUILD] New linter rule for sparkContext.hadoopConfiguration ## What changes were proposed in this pull request? In most cases, we should use `spark.sessionState.newHadoopConf()` instead of `sparkContext.hadoopConfiguration`, so that the hadoop configurations specified in Spark session configuration will come into effect. Add a rule matching `spark.sparkContext.hadoopConfiguration` or `spark.sqlContext.sparkContext.hadoopConfiguration` to prevent the usage. ## How was this patch tested? Unit test You can merge this pull request into a Git repository by running: $ git pull https://github.com/gengliangwang/spark linterRule Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/21873.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #21873 commit 8b5ad70c472ea4931dfd352b28044dd26bbed2a1 Author: Gengliang Wang Date: 2018-07-25T11:59:36Z new linter rule for spark.sparkContext.hadoopConfiguration --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21821: [SPARK-24867] [SQL] Add AnalysisBarrier to DataFrameWrit...
Github user hvanhovell commented on the issue: https://github.com/apache/spark/pull/21821 LGTM --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21821: [SPARK-24867] [SQL] Add AnalysisBarrier to DataFrameWrit...
Github user maryannxue commented on the issue: https://github.com/apache/spark/pull/21821 LGTM. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21748: [SPARK-23146][K8S] Support client mode.
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21748 **[Test build #93551 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/93551/testReport)** for PR 21748 at commit [`ded1ff6`](https://github.com/apache/spark/commit/ded1ff6081da6f0b3879f6bf63b73caf01983bea). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21748: [SPARK-23146][K8S] Support client mode.
Github user mccheah commented on the issue: https://github.com/apache/spark/pull/21748 Ok after the next build passes I'm going to merge immediately. Thanks for the review. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21748: [SPARK-23146][K8S] Support client mode.
Github user mccheah commented on a diff in the pull request: https://github.com/apache/spark/pull/21748#discussion_r205178769 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala --- @@ -35,26 +35,39 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit override def canCreate(masterURL: String): Boolean = masterURL.startsWith("k8s") override def createTaskScheduler(sc: SparkContext, masterURL: String): TaskScheduler = { -if (masterURL.startsWith("k8s") && - sc.deployMode == "client" && - !sc.conf.get(KUBERNETES_DRIVER_SUBMIT_CHECK).getOrElse(false)) { - throw new SparkException("Client mode is currently not supported for Kubernetes.") -} - new TaskSchedulerImpl(sc) } override def createSchedulerBackend( sc: SparkContext, masterURL: String, scheduler: TaskScheduler): SchedulerBackend = { +val wasSparkSubmittedInClusterMode = sc.conf.get(KUBERNETES_DRIVER_SUBMIT_CHECK) +val (authConfPrefix, + apiServerUri, + defaultServiceAccountToken, + defaultServiceAccountCaCrt) = if (wasSparkSubmittedInClusterMode) { + require(sc.conf.get(KUBERNETES_DRIVER_POD_NAME).isDefined, +"If the application is deployed using spark-submit in cluster mode, the driver pod name " + + "must be provided.") + (KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX, +KUBERNETES_MASTER_INTERNAL_URL, +Some(new File(Config.KUBERNETES_SERVICE_ACCOUNT_TOKEN_PATH)), +Some(new File(Config.KUBERNETES_SERVICE_ACCOUNT_CA_CRT_PATH))) +} else { + (KUBERNETES_AUTH_CLIENT_MODE_PREFIX, +masterURL.substring("k8s://".length()), --- End diff -- We can make such a helper function, currently this logic is done here and in KubernetesClientApplication --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21748: [SPARK-23146][K8S] Support client mode.
Github user mccheah commented on a diff in the pull request: https://github.com/apache/spark/pull/21748#discussion_r205178092 --- Diff: docs/running-on-kubernetes.md --- @@ -117,6 +117,45 @@ If the local proxy is running at localhost:8001, `--master k8s://http://127.0.0. spark-submit. Finally, notice that in the above example we specify a jar with a specific URI with a scheme of `local://`. This URI is the location of the example jar that is already in the Docker image. +## Client Mode + +Starting with Spark 2.4.0, it is possible to run Spark applications on Kubernetes in client mode. When your application +runs in client mode, the driver can run inside a pod or on a physical host. When running an application in client mode, +it is recommended to account for the following factors: + +### Client Mode Networking + +Spark executors must be able to connect to the Spark driver over a hostname and a port that is routable from the Spark +executors. The specific network configuration that will be required for Spark to work in client mode will vary per +setup. If you run your driver inside a Kubernetes pod, you can use a +[headless service](https://kubernetes.io/docs/concepts/services-networking/service/#headless-services) to allow your +driver pod to be routable from the executors by a stable hostname. When deploying your headless service, ensure that +the service's label selector will only match the driver pod and no other pods; it is recommended to assign your driver +pod a sufficiently unique label and to use that label in the label selector of the headless service. Specify the driver's +hostname via `spark.driver.host` and your spark driver's port to `spark.driver.port`. + +### Client Mode Executor Pod Garbage Collection + +If you run your Spark driver in a pod, it is highly recommended to set `spark.driver.pod.name` to the name of that pod. +When this property is set, the Spark scheduler will deploy the executor pods with an +[OwnerReference](https://kubernetes.io/docs/concepts/workloads/controllers/garbage-collection/), which in turn will +ensure that once the driver pod is deleted from the cluster, all of the application's executor pods will also be deleted. +The driver will look for a pod with the given name in the namespace specified by `spark.kubernetes.namespace`, and +an OwnerReference pointing to that pod will be added to each executor pod's OwnerReferences list. Be careful to avoid +setting the OwnerReference to a pod that is not actually that driver pod, or else the executors may be terminated +prematurely when the wrong pod is deleted. + +If your application is not running inside a pod, or if `spark.driver.pod.name` is not set when your application is +actually running in a pod, keep in mind that the executor pods may not be properly deleted from the cluster when the +application exits. The Spark scheduler attempts to delete these pods, but if the network request to the API server fails +for any reason, these pods will remain in the cluster. The executor processes should exit when they cannot reach the +driver, so the executor pods should not consume compute resources (cpu and memory) in the cluster after your application --- End diff -- Unclear, it triggers in the `onDisconnected` event so I think there's a persistent socket connection that's dropped that causes the exit. So, it should more or less be instantaneous. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21748: [SPARK-23146][K8S] Support client mode.
Github user mccheah commented on a diff in the pull request: https://github.com/apache/spark/pull/21748#discussion_r205177861 --- Diff: docs/running-on-kubernetes.md --- @@ -117,6 +117,45 @@ If the local proxy is running at localhost:8001, `--master k8s://http://127.0.0. spark-submit. Finally, notice that in the above example we specify a jar with a specific URI with a scheme of `local://`. This URI is the location of the example jar that is already in the Docker image. +## Client Mode + +Starting with Spark 2.4.0, it is possible to run Spark applications on Kubernetes in client mode. When your application +runs in client mode, the driver can run inside a pod or on a physical host. When running an application in client mode, +it is recommended to account for the following factors: + +### Client Mode Networking + +Spark executors must be able to connect to the Spark driver over a hostname and a port that is routable from the Spark +executors. The specific network configuration that will be required for Spark to work in client mode will vary per +setup. If you run your driver inside a Kubernetes pod, you can use a +[headless service](https://kubernetes.io/docs/concepts/services-networking/service/#headless-services) to allow your +driver pod to be routable from the executors by a stable hostname. When deploying your headless service, ensure that +the service's label selector will only match the driver pod and no other pods; it is recommended to assign your driver +pod a sufficiently unique label and to use that label in the label selector of the headless service. Specify the driver's +hostname via `spark.driver.host` and your spark driver's port to `spark.driver.port`. --- End diff -- Yeah manual setup is fine for now. Think additional docs around how to do all this can be a separate PR. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21850: [SPARK-24892] [SQL] Simplify `CaseWhen` to `If` w...
Github user dongjoon-hyun commented on a diff in the pull request: https://github.com/apache/spark/pull/21850#discussion_r205177295 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala --- @@ -414,6 +414,16 @@ object SimplifyConditionals extends Rule[LogicalPlan] with PredicateHelper { // these branches can be pruned away val (h, t) = branches.span(_._1 != TrueLiteral) CaseWhen( h :+ t.head, None) + + case CaseWhen(branches, elseValue) if branches.length == 1 => +// Using pattern matching like `CaseWhen((cond, branchValue) :: Nil, elseValue)` will not +// work since the implementation of `branches` can be `ArrayBuffer`. A full test is in --- End diff -- +1 for @ueshin 's suggestion. And, sorry for this trouble, @dbtsai . :) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21850: [SPARK-24892] [SQL] Simplify `CaseWhen` to `If` w...
Github user dongjoon-hyun commented on a diff in the pull request: https://github.com/apache/spark/pull/21850#discussion_r205172569 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala --- @@ -2813,4 +2813,16 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { checkAnswer(df, Seq(Row(3, 99, 1))) } } + + test("SPARK-24892: simplify `CaseWhen` to `If` when there is only one branch") { +withTable("t") { + Seq(Some(1), null, Some(3)).toDF("a").write.saveAsTable("t") + + val plan1 = sql("select case when a is null then 1 end col1 from t") + val plan2 = sql("select if(a is null, 1, null) col1 from t") + + checkAnswer(plan1, Row(null) :: Row(1) :: Row(null) :: Nil) + comparePlans(plan1.queryExecution.optimizedPlan, plan2.queryExecution.optimizedPlan) +} --- End diff -- Thank you for adding this higher level test, too. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21698: [SPARK-23243][Core] Fix RDD.repartition() data correctne...
Github user squito commented on the issue: https://github.com/apache/spark/pull/21698 sorry I got bogged down in some other things, thanks for the responses: >> on a fetch-failure in repartition, fail the entire job > Currently I can't figure out a case that a customer may vote for this behavior change, esp. FetchFailure tends to occur more often on long-running jobs on big datasets compared to interactive queries. yeah maybe you're right. I was thinking that maybe there comes a point where if you have one failure, you expect more failures on retries as well (in my experience, large shuffles often fail the first time when everything is getting fetched, but on subsequent retries they manage to succeed because the load is smaller). It might be better to just not bother retrying. But then again, there are situtations where retry is fine, and I guess users won't know which one to choose. >> since we only need to do this sort on RDDs post shuffle > IIUC this is not the case in RDD.repartition(), see https://github.com/apache/spark/blob/94c67a76ec1fda908a671a47a2a1fa63b3ab1b06/core/src/main/scala/org/apache/spark/rdd/RDD.scala#L453~L461 , it requires the input rows are ordered then perform a round-robin style data transformation, so I don't see what we can do if the input data type is not sortable. my point is that if you serialize the input (the `Iterator[T]` there), then there is a well-defined ordering based on the serialized bytes. (I guess I'm assuming serialization is deterministic, I can't think of a case that isn't true.) In general, you don't know that `T` is serializable, but after a shuffle you know it must be. So that gives you a way to always deterministically order the input after a shuffle, though at a pretty serious performance penalty. You could avoid the re-serialization overhead by pushing the sort down into ShuffleBlockFetcherIterator etc. Maybe you could skip this if you detect checkpointing or something equivalent which eliminates the ordering dependency ... or maybe thats just not possible with the current apis. thanks for the description of the problem with determinstic shuffle ordering. The "Shuffle Merge With Spills" problem seems particularly hard to solve. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21818: [SPARK-24860][SQL] Support setting of partitionOverWrite...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21818 **[Test build #93550 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/93550/testReport)** for PR 21818 at commit [`03c0926`](https://github.com/apache/spark/commit/03c0926bed03bcedd035be3a743d08b664ec5006). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21818: [SPARK-24860][SQL] Support setting of partitionOverWrite...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21818 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/1316/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21818: [SPARK-24860][SQL] Support setting of partitionOverWrite...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21818 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21866: [SPARK-24768][FollowUp][SQL]Avro migration follow...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/21866 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21866: [SPARK-24768][FollowUp][SQL]Avro migration followup: cha...
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/21866 Thanks! Merged to master. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21866: [SPARK-24768][FollowUp][SQL]Avro migration followup: cha...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21866 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21866: [SPARK-24768][FollowUp][SQL]Avro migration followup: cha...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21866 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/93541/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21596: [SPARK-24601] Bump Jackson version
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/21596 @Fokko Let us wait for the code freeze of Spark 2.4 release? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21866: [SPARK-24768][FollowUp][SQL]Avro migration followup: cha...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21866 **[Test build #93541 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/93541/testReport)** for PR 21866 at commit [`cff6f2a`](https://github.com/apache/spark/commit/cff6f2a0459e8cc4e48f28bde8103ea44ce5a1ab). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21869: [SPARK-24891][FOLLOWUP][HOT-FIX][2.3] Fix the Com...
Github user gatorsmile closed the pull request at: https://github.com/apache/spark/pull/21869 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21451: [SPARK-24296][CORE][WIP] Replicate large blocks as a str...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21451 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/1315/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21451: [SPARK-24296][CORE][WIP] Replicate large blocks as a str...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21451 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21451: [SPARK-24296][CORE][WIP] Replicate large blocks as a str...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21451 **[Test build #93549 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/93549/testReport)** for PR 21451 at commit [`fe31a7d`](https://github.com/apache/spark/commit/fe31a7d61ecabca76356b313211bb7b769e02b5b). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21867: [SPARK-24307][CORE] Add conf to revert to old code.
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21867 **[Test build #93548 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/93548/testReport)** for PR 21867 at commit [`a5b00b8`](https://github.com/apache/spark/commit/a5b00b8a05538a6adb3a4525c2fecc1e15575f7c). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21867: [SPARK-24307][CORE] Add conf to revert to old code.
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21867 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/1314/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21867: [SPARK-24307][CORE] Add conf to revert to old code.
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21867 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21867: [SPARK-24307][CORE] Add conf to revert to old cod...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/21867#discussion_r205150784 --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala --- @@ -731,7 +733,14 @@ private[spark] class BlockManager( } if (data != null) { -return Some(ChunkedByteBuffer.fromManagedBuffer(data, chunkSize)) +// SPARK-24307 undocumented "escape-hatch" in case there are any issues in converting to +// to ChunkedByteBuffer, to go back to old code-path. Can be removed post Spark 2.4 if --- End diff -- oops, thanks for catching that. fixed --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21650: [SPARK-24624][SQL][PYTHON] Support mixture of Pyt...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/21650#discussion_r205146857 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala --- @@ -94,36 +95,94 @@ object ExtractPythonUDFFromAggregate extends Rule[LogicalPlan] { */ object ExtractPythonUDFs extends Rule[SparkPlan] with PredicateHelper { - private def hasPythonUDF(e: Expression): Boolean = { + private case class LazyEvalType(var evalType: Int = -1) { --- End diff -- hm looks messier then I thought .. previous one looks a bit better to me .. wdyt @BryanCutler ? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21650: [SPARK-24624][SQL][PYTHON] Support mixture of Pyt...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/21650#discussion_r205144604 --- Diff: python/pyspark/sql/tests.py --- @@ -5060,6 +5049,147 @@ def test_type_annotation(self): df = self.spark.range(1).select(pandas_udf(f=_locals['noop'], returnType='bigint')('id')) self.assertEqual(df.first()[0], 0) +def test_mixed_udf(self): +import pandas as pd +from pyspark.sql.functions import col, udf, pandas_udf + +df = self.spark.range(0, 1).toDF('v') + +# Test mixture of multiple UDFs and Pandas UDFs + +@udf('int') +def f1(x): +assert type(x) == int +return x + 1 + +@pandas_udf('int') +def f2(x): +assert type(x) == pd.Series +return x + 10 + +@udf('int') +def f3(x): +assert type(x) == int +return x + 100 + +@pandas_udf('int') +def f4(x): +assert type(x) == pd.Series +return x + 1000 + +# Test mixed udfs in a single projection +df1 = df \ +.withColumn('f1', f1(col('v'))) \ +.withColumn('f2', f2(col('v'))) \ +.withColumn('f3', f3(col('v'))) \ +.withColumn('f4', f4(col('v'))) \ +.withColumn('f2_f1', f2(col('f1'))) \ +.withColumn('f3_f1', f3(col('f1'))) \ +.withColumn('f4_f1', f4(col('f1'))) \ +.withColumn('f3_f2', f3(col('f2'))) \ +.withColumn('f4_f2', f4(col('f2'))) \ +.withColumn('f4_f3', f4(col('f3'))) \ +.withColumn('f3_f2_f1', f3(col('f2_f1'))) \ +.withColumn('f4_f2_f1', f4(col('f2_f1'))) \ +.withColumn('f4_f3_f1', f4(col('f3_f1'))) \ +.withColumn('f4_f3_f2', f4(col('f3_f2'))) \ +.withColumn('f4_f3_f2_f1', f4(col('f3_f2_f1'))) + +# Test mixed udfs in a single expression +df2 = df \ +.withColumn('f1', f1(col('v'))) \ +.withColumn('f2', f2(col('v'))) \ +.withColumn('f3', f3(col('v'))) \ +.withColumn('f4', f4(col('v'))) \ +.withColumn('f2_f1', f2(f1(col('v' \ +.withColumn('f3_f1', f3(f1(col('v' \ +.withColumn('f4_f1', f4(f1(col('v' \ +.withColumn('f3_f2', f3(f2(col('v' \ +.withColumn('f4_f2', f4(f2(col('v' \ +.withColumn('f4_f3', f4(f3(col('v' \ +.withColumn('f3_f2_f1', f3(f2(f1(col('v') \ +.withColumn('f4_f2_f1', f4(f2(f1(col('v') \ +.withColumn('f4_f3_f1', f4(f3(f1(col('v') \ +.withColumn('f4_f3_f2', f4(f3(f2(col('v') \ +.withColumn('f4_f3_f2_f1', f4(f3(f2(f1(col('v')) + +# expected result +df3 = df \ +.withColumn('f1', df['v'] + 1) \ +.withColumn('f2', df['v'] + 10) \ +.withColumn('f3', df['v'] + 100) \ +.withColumn('f4', df['v'] + 1000) \ +.withColumn('f2_f1', df['v'] + 11) \ +.withColumn('f3_f1', df['v'] + 101) \ +.withColumn('f4_f1', df['v'] + 1001) \ +.withColumn('f3_f2', df['v'] + 110) \ +.withColumn('f4_f2', df['v'] + 1010) \ +.withColumn('f4_f3', df['v'] + 1100) \ +.withColumn('f3_f2_f1', df['v'] + 111) \ +.withColumn('f4_f2_f1', df['v'] + 1011) \ +.withColumn('f4_f3_f1', df['v'] + 1101) \ +.withColumn('f4_f3_f2', df['v'] + 1110) \ +.withColumn('f4_f3_f2_f1', df['v'] + ) + +self.assertEquals(df3.collect(), df1.collect()) +self.assertEquals(df3.collect(), df2.collect()) + +def test_mixed_udf_and_sql(self): +import pandas as pd +from pyspark.sql.functions import udf, pandas_udf + +df = self.spark.range(0, 1).toDF('v') + +# Test mixture of UDFs, Pandas UDFs and SQL expression. + +@udf('int') +def f1(x): +assert type(x) == int +return x + 1 + +def f2(x): +return x + 10 + +@pandas_udf('int') +def f3(x): +assert type(x) == pd.Series +return x + 100 + +df1 = df.withColumn('f1', f1(df['v'])) \ +.withColumn('f2', f2(df['v'])) \ +.withColumn('f3', f3(df['v'])) \ +.withColumn('f1_f2', f1(f2(df['v']))) \ +.withColumn('f1_f3', f1(f3(df['v']))) \ +
[GitHub] spark issue #21584: [SPARK-24433][K8S] Initial R Bindings for SparkR on K8s
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21584 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/1313/ Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21584: [SPARK-24433][K8S] Initial R Bindings for SparkR on K8s
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21584 Merged build finished. Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21584: [SPARK-24433][K8S] Initial R Bindings for SparkR on K8s
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21584 Kubernetes integration test status failure URL: https://amplab.cs.berkeley.edu/jenkins/job/testing-k8s-prb-make-spark-distribution-unified/1313/ --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21650: [SPARK-24624][SQL][PYTHON] Support mixture of Pyt...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/21650#discussion_r205141733 --- Diff: python/pyspark/sql/tests.py --- @@ -5060,6 +5049,147 @@ def test_type_annotation(self): df = self.spark.range(1).select(pandas_udf(f=_locals['noop'], returnType='bigint')('id')) self.assertEqual(df.first()[0], 0) +def test_mixed_udf(self): +import pandas as pd +from pyspark.sql.functions import col, udf, pandas_udf + +df = self.spark.range(0, 1).toDF('v') + +# Test mixture of multiple UDFs and Pandas UDFs + +@udf('int') +def f1(x): +assert type(x) == int +return x + 1 + +@pandas_udf('int') +def f2(x): +assert type(x) == pd.Series +return x + 10 + +@udf('int') +def f3(x): +assert type(x) == int +return x + 100 + +@pandas_udf('int') +def f4(x): +assert type(x) == pd.Series +return x + 1000 + +# Test mixed udfs in a single projection +df1 = df \ +.withColumn('f1', f1(col('v'))) \ +.withColumn('f2', f2(col('v'))) \ +.withColumn('f3', f3(col('v'))) \ +.withColumn('f4', f4(col('v'))) \ +.withColumn('f2_f1', f2(col('f1'))) \ +.withColumn('f3_f1', f3(col('f1'))) \ +.withColumn('f4_f1', f4(col('f1'))) \ +.withColumn('f3_f2', f3(col('f2'))) \ +.withColumn('f4_f2', f4(col('f2'))) \ +.withColumn('f4_f3', f4(col('f3'))) \ +.withColumn('f3_f2_f1', f3(col('f2_f1'))) \ +.withColumn('f4_f2_f1', f4(col('f2_f1'))) \ +.withColumn('f4_f3_f1', f4(col('f3_f1'))) \ +.withColumn('f4_f3_f2', f4(col('f3_f2'))) \ +.withColumn('f4_f3_f2_f1', f4(col('f3_f2_f1'))) + +# Test mixed udfs in a single expression +df2 = df \ +.withColumn('f1', f1(col('v'))) \ +.withColumn('f2', f2(col('v'))) \ +.withColumn('f3', f3(col('v'))) \ +.withColumn('f4', f4(col('v'))) \ +.withColumn('f2_f1', f2(f1(col('v' \ +.withColumn('f3_f1', f3(f1(col('v' \ +.withColumn('f4_f1', f4(f1(col('v' \ +.withColumn('f3_f2', f3(f2(col('v' \ +.withColumn('f4_f2', f4(f2(col('v' \ +.withColumn('f4_f3', f4(f3(col('v' \ +.withColumn('f3_f2_f1', f3(f2(f1(col('v') \ +.withColumn('f4_f2_f1', f4(f2(f1(col('v') \ +.withColumn('f4_f3_f1', f4(f3(f1(col('v') \ +.withColumn('f4_f3_f2', f4(f3(f2(col('v') \ +.withColumn('f4_f3_f2_f1', f4(f3(f2(f1(col('v')) + +# expected result +df3 = df \ +.withColumn('f1', df['v'] + 1) \ +.withColumn('f2', df['v'] + 10) \ +.withColumn('f3', df['v'] + 100) \ +.withColumn('f4', df['v'] + 1000) \ +.withColumn('f2_f1', df['v'] + 11) \ +.withColumn('f3_f1', df['v'] + 101) \ +.withColumn('f4_f1', df['v'] + 1001) \ +.withColumn('f3_f2', df['v'] + 110) \ +.withColumn('f4_f2', df['v'] + 1010) \ +.withColumn('f4_f3', df['v'] + 1100) \ +.withColumn('f3_f2_f1', df['v'] + 111) \ +.withColumn('f4_f2_f1', df['v'] + 1011) \ +.withColumn('f4_f3_f1', df['v'] + 1101) \ +.withColumn('f4_f3_f2', df['v'] + 1110) \ +.withColumn('f4_f3_f2_f1', df['v'] + ) + +self.assertEquals(df3.collect(), df1.collect()) +self.assertEquals(df3.collect(), df2.collect()) + +def test_mixed_udf_and_sql(self): +import pandas as pd +from pyspark.sql.functions import udf, pandas_udf + +df = self.spark.range(0, 1).toDF('v') + +# Test mixture of UDFs, Pandas UDFs and SQL expression. + +@udf('int') +def f1(x): +assert type(x) == int +return x + 1 + +def f2(x): +return x + 10 + +@pandas_udf('int') +def f3(x): +assert type(x) == pd.Series +return x + 100 + +df1 = df.withColumn('f1', f1(df['v'])) \ +.withColumn('f2', f2(df['v'])) \ +.withColumn('f3', f3(df['v'])) \ +.withColumn('f1_f2', f1(f2(df['v']))) \ +.withColumn('f1_f3', f1(f3(df['v']))) \ +
[GitHub] spark issue #21584: [SPARK-24433][K8S] Initial R Bindings for SparkR on K8s
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21584 Kubernetes integration test starting URL: https://amplab.cs.berkeley.edu/jenkins/job/testing-k8s-prb-make-spark-distribution-unified/1313/ --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21803: [SPARK-24849][SPARK-24911][SQL] Converting a value of St...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21803 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21803: [SPARK-24849][SPARK-24911][SQL] Converting a value of St...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21803 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/93540/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21803: [SPARK-24849][SPARK-24911][SQL] Converting a value of St...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21803 **[Test build #93540 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/93540/testReport)** for PR 21803 at commit [`60f663d`](https://github.com/apache/spark/commit/60f663d7b12fcb3141eff774a9120f049d837112). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21403: [SPARK-24341][SQL] Support only IN subqueries with the s...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21403 Build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21650: [SPARK-24624][SQL][PYTHON] Support mixture of Python UDF...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21650 **[Test build #93546 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/93546/testReport)** for PR 21650 at commit [`2bc906d`](https://github.com/apache/spark/commit/2bc906de5a12dcc452e6855aa30d27021c446e17). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21650: [SPARK-24624][SQL][PYTHON] Support mixture of Pyt...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/21650#discussion_r205133506 --- Diff: python/pyspark/sql/tests.py --- @@ -5060,6 +5049,147 @@ def test_type_annotation(self): df = self.spark.range(1).select(pandas_udf(f=_locals['noop'], returnType='bigint')('id')) self.assertEqual(df.first()[0], 0) +def test_mixed_udf(self): +import pandas as pd +from pyspark.sql.functions import col, udf, pandas_udf + +df = self.spark.range(0, 1).toDF('v') + +# Test mixture of multiple UDFs and Pandas UDFs + +@udf('int') +def f1(x): +assert type(x) == int +return x + 1 + +@pandas_udf('int') +def f2(x): +assert type(x) == pd.Series +return x + 10 + +@udf('int') +def f3(x): +assert type(x) == int +return x + 100 + +@pandas_udf('int') +def f4(x): +assert type(x) == pd.Series +return x + 1000 + +# Test mixed udfs in a single projection +df1 = df \ +.withColumn('f1', f1(col('v'))) \ +.withColumn('f2', f2(col('v'))) \ +.withColumn('f3', f3(col('v'))) \ +.withColumn('f4', f4(col('v'))) \ +.withColumn('f2_f1', f2(col('f1'))) \ +.withColumn('f3_f1', f3(col('f1'))) \ +.withColumn('f4_f1', f4(col('f1'))) \ +.withColumn('f3_f2', f3(col('f2'))) \ +.withColumn('f4_f2', f4(col('f2'))) \ +.withColumn('f4_f3', f4(col('f3'))) \ +.withColumn('f3_f2_f1', f3(col('f2_f1'))) \ +.withColumn('f4_f2_f1', f4(col('f2_f1'))) \ +.withColumn('f4_f3_f1', f4(col('f3_f1'))) \ +.withColumn('f4_f3_f2', f4(col('f3_f2'))) \ +.withColumn('f4_f3_f2_f1', f4(col('f3_f2_f1'))) + +# Test mixed udfs in a single expression +df2 = df \ +.withColumn('f1', f1(col('v'))) \ +.withColumn('f2', f2(col('v'))) \ +.withColumn('f3', f3(col('v'))) \ +.withColumn('f4', f4(col('v'))) \ +.withColumn('f2_f1', f2(f1(col('v' \ +.withColumn('f3_f1', f3(f1(col('v' \ +.withColumn('f4_f1', f4(f1(col('v' \ +.withColumn('f3_f2', f3(f2(col('v' \ +.withColumn('f4_f2', f4(f2(col('v' \ +.withColumn('f4_f3', f4(f3(col('v' \ +.withColumn('f3_f2_f1', f3(f2(f1(col('v') \ +.withColumn('f4_f2_f1', f4(f2(f1(col('v') \ +.withColumn('f4_f3_f1', f4(f3(f1(col('v') \ +.withColumn('f4_f3_f2', f4(f3(f2(col('v') \ +.withColumn('f4_f3_f2_f1', f4(f3(f2(f1(col('v')) + +# expected result +df3 = df \ +.withColumn('f1', df['v'] + 1) \ +.withColumn('f2', df['v'] + 10) \ +.withColumn('f3', df['v'] + 100) \ +.withColumn('f4', df['v'] + 1000) \ +.withColumn('f2_f1', df['v'] + 11) \ +.withColumn('f3_f1', df['v'] + 101) \ +.withColumn('f4_f1', df['v'] + 1001) \ +.withColumn('f3_f2', df['v'] + 110) \ +.withColumn('f4_f2', df['v'] + 1010) \ +.withColumn('f4_f3', df['v'] + 1100) \ +.withColumn('f3_f2_f1', df['v'] + 111) \ +.withColumn('f4_f2_f1', df['v'] + 1011) \ +.withColumn('f4_f3_f1', df['v'] + 1101) \ +.withColumn('f4_f3_f2', df['v'] + 1110) \ +.withColumn('f4_f3_f2_f1', df['v'] + ) + +self.assertEquals(df3.collect(), df1.collect()) +self.assertEquals(df3.collect(), df2.collect()) + +def test_mixed_udf_and_sql(self): +import pandas as pd +from pyspark.sql.functions import udf, pandas_udf + +df = self.spark.range(0, 1).toDF('v') + +# Test mixture of UDFs, Pandas UDFs and SQL expression. + +@udf('int') +def f1(x): +assert type(x) == int +return x + 1 + +def f2(x): --- End diff -- Added --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21403: [SPARK-24341][SQL] Support only IN subqueries with the s...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21403 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/93539/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21584: [SPARK-24433][K8S] Initial R Bindings for SparkR on K8s
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21584 **[Test build #93547 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/93547/testReport)** for PR 21584 at commit [`d036673`](https://github.com/apache/spark/commit/d0366732a9ebef710cb78e52ee42a869002e9040). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21650: [SPARK-24624][SQL][PYTHON] Support mixture of Python UDF...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21650 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/1312/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21650: [SPARK-24624][SQL][PYTHON] Support mixture of Pyt...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/21650#discussion_r205132748 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala --- @@ -94,36 +95,94 @@ object ExtractPythonUDFFromAggregate extends Rule[LogicalPlan] { */ object ExtractPythonUDFs extends Rule[SparkPlan] with PredicateHelper { - private def hasPythonUDF(e: Expression): Boolean = { + private case class LazyEvalType(var evalType: Int = -1) { + +def isSet: Boolean = evalType >= 0 + +def set(evalType: Int): Unit = { + if (isSet) { +throw new IllegalStateException("Eval type has already been set") + } else { +this.evalType = evalType + } +} + +def get(): Int = { + if (!isSet) { +throw new IllegalStateException("Eval type is not set") + } else { +evalType + } +} + } + + private def hasScalarPythonUDF(e: Expression): Boolean = { e.find(PythonUDF.isScalarPythonUDF).isDefined } - private def canEvaluateInPython(e: PythonUDF): Boolean = { -e.children match { - // single PythonUDF child could be chained and evaluated in Python - case Seq(u: PythonUDF) => canEvaluateInPython(u) - // Python UDF can't be evaluated directly in JVM - case children => !children.exists(hasPythonUDF) + /** + * Check whether a PythonUDF expression can be evaluated in Python. + * + * If the lazy eval type is not set, this method checks for either Batched Python UDF and Scalar + * Pandas UDF. If the lazy eval type is set, this method checks for the expression of the + * specified eval type. + * + * This method will also set the lazy eval type to be the type of the first evaluable expression, + * i.e., if lazy eval type is not set and we find a evaluable Python UDF expression, lazy eval + * type will be set to the eval type of the expression. + * + */ + private def canEvaluateInPython(e: PythonUDF, lazyEvalType: LazyEvalType): Boolean = { --- End diff -- @BryanCutler I rewrite this function using mutable state based on your suggestion. It's not quite the same as your code so please take a look and let me know if this looks better now. Thanks! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21584: [SPARK-24433][K8S] Initial R Bindings for SparkR ...
Github user ifilonenko commented on a diff in the pull request: https://github.com/apache/spark/pull/21584#discussion_r205132947 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala --- @@ -71,8 +75,9 @@ private[spark] class KubernetesDriverBuilder( case JavaMainAppResource(_) => provideJavaStep(kubernetesConf) case PythonMainAppResource(_) => - providePythonStep(kubernetesConf)} - .getOrElse(provideJavaStep(kubernetesConf)) + providePythonStep(kubernetesConf) +case RMainAppResource(_) => + provideRStep(kubernetesConf)}.getOrElse(provideJavaStep(kubernetesConf)) --- End diff -- done --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21650: [SPARK-24624][SQL][PYTHON] Support mixture of Python UDF...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21650 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21867: [SPARK-24307][CORE] Add conf to revert to old code.
Github user jiangxb1987 commented on the issue: https://github.com/apache/spark/pull/21867 LGTM --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21403: [SPARK-24341][SQL] Support only IN subqueries with the s...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21403 **[Test build #93539 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/93539/testReport)** for PR 21403 at commit [`0412829`](https://github.com/apache/spark/commit/04128292e6d145ec608166b532c960cac72a500c). * This patch passes all tests. * This patch **does not merge cleanly**. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21867: [SPARK-24307][CORE] Add conf to revert to old cod...
Github user jiangxb1987 commented on a diff in the pull request: https://github.com/apache/spark/pull/21867#discussion_r205130421 --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala --- @@ -731,7 +733,14 @@ private[spark] class BlockManager( } if (data != null) { -return Some(ChunkedByteBuffer.fromManagedBuffer(data, chunkSize)) +// SPARK-24307 undocumented "escape-hatch" in case there are any issues in converting to +// to ChunkedByteBuffer, to go back to old code-path. Can be removed post Spark 2.4 if --- End diff -- nit: `to ChunkedByteBuffer` -> `ChunkedByteBuffer` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21650: [SPARK-24624][SQL][PYTHON] Support mixture of Pyt...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/21650#discussion_r205127465 --- Diff: python/pyspark/sql/tests.py --- @@ -5060,6 +5049,147 @@ def test_type_annotation(self): df = self.spark.range(1).select(pandas_udf(f=_locals['noop'], returnType='bigint')('id')) self.assertEqual(df.first()[0], 0) +def test_mixed_udf(self): +import pandas as pd +from pyspark.sql.functions import col, udf, pandas_udf + +df = self.spark.range(0, 1).toDF('v') + +# Test mixture of multiple UDFs and Pandas UDFs + +@udf('int') +def f1(x): +assert type(x) == int +return x + 1 + +@pandas_udf('int') +def f2(x): +assert type(x) == pd.Series +return x + 10 + +@udf('int') +def f3(x): +assert type(x) == int +return x + 100 + +@pandas_udf('int') +def f4(x): +assert type(x) == pd.Series +return x + 1000 + +# Test mixed udfs in a single projection +df1 = df \ +.withColumn('f1', f1(col('v'))) \ +.withColumn('f2', f2(col('v'))) \ +.withColumn('f3', f3(col('v'))) \ +.withColumn('f4', f4(col('v'))) \ +.withColumn('f2_f1', f2(col('f1'))) \ +.withColumn('f3_f1', f3(col('f1'))) \ +.withColumn('f4_f1', f4(col('f1'))) \ +.withColumn('f3_f2', f3(col('f2'))) \ +.withColumn('f4_f2', f4(col('f2'))) \ +.withColumn('f4_f3', f4(col('f3'))) \ +.withColumn('f3_f2_f1', f3(col('f2_f1'))) \ +.withColumn('f4_f2_f1', f4(col('f2_f1'))) \ +.withColumn('f4_f3_f1', f4(col('f3_f1'))) \ +.withColumn('f4_f3_f2', f4(col('f3_f2'))) \ +.withColumn('f4_f3_f2_f1', f4(col('f3_f2_f1'))) + +# Test mixed udfs in a single expression +df2 = df \ +.withColumn('f1', f1(col('v'))) \ +.withColumn('f2', f2(col('v'))) \ +.withColumn('f3', f3(col('v'))) \ +.withColumn('f4', f4(col('v'))) \ +.withColumn('f2_f1', f2(f1(col('v' \ +.withColumn('f3_f1', f3(f1(col('v' \ +.withColumn('f4_f1', f4(f1(col('v' \ +.withColumn('f3_f2', f3(f2(col('v' \ +.withColumn('f4_f2', f4(f2(col('v' \ +.withColumn('f4_f3', f4(f3(col('v' \ +.withColumn('f3_f2_f1', f3(f2(f1(col('v') \ +.withColumn('f4_f2_f1', f4(f2(f1(col('v') \ +.withColumn('f4_f3_f1', f4(f3(f1(col('v') \ +.withColumn('f4_f3_f2', f4(f3(f2(col('v') \ +.withColumn('f4_f3_f2_f1', f4(f3(f2(f1(col('v')) + +# expected result +df3 = df \ +.withColumn('f1', df['v'] + 1) \ +.withColumn('f2', df['v'] + 10) \ +.withColumn('f3', df['v'] + 100) \ +.withColumn('f4', df['v'] + 1000) \ +.withColumn('f2_f1', df['v'] + 11) \ +.withColumn('f3_f1', df['v'] + 101) \ +.withColumn('f4_f1', df['v'] + 1001) \ +.withColumn('f3_f2', df['v'] + 110) \ +.withColumn('f4_f2', df['v'] + 1010) \ +.withColumn('f4_f3', df['v'] + 1100) \ +.withColumn('f3_f2_f1', df['v'] + 111) \ +.withColumn('f4_f2_f1', df['v'] + 1011) \ +.withColumn('f4_f3_f1', df['v'] + 1101) \ +.withColumn('f4_f3_f2', df['v'] + 1110) \ +.withColumn('f4_f3_f2_f1', df['v'] + ) + +self.assertEquals(df3.collect(), df1.collect()) +self.assertEquals(df3.collect(), df2.collect()) + +def test_mixed_udf_and_sql(self): +import pandas as pd +from pyspark.sql.functions import udf, pandas_udf + +df = self.spark.range(0, 1).toDF('v') + +# Test mixture of UDFs, Pandas UDFs and SQL expression. + +@udf('int') +def f1(x): +assert type(x) == int +return x + 1 + +def f2(x): +return x + 10 + +@pandas_udf('int') +def f3(x): +assert type(x) == pd.Series +return x + 100 + +df1 = df.withColumn('f1', f1(df['v'])) \ +.withColumn('f2', f2(df['v'])) \ +.withColumn('f3', f3(df['v'])) \ +.withColumn('f1_f2', f1(f2(df['v']))) \ +.withColumn('f1_f3', f1(f3(df['v']))) \ +
[GitHub] spark pull request #21650: [SPARK-24624][SQL][PYTHON] Support mixture of Pyt...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/21650#discussion_r205127129 --- Diff: python/pyspark/sql/tests.py --- @@ -5060,6 +5049,147 @@ def test_type_annotation(self): df = self.spark.range(1).select(pandas_udf(f=_locals['noop'], returnType='bigint')('id')) self.assertEqual(df.first()[0], 0) +def test_mixed_udf(self): +import pandas as pd +from pyspark.sql.functions import col, udf, pandas_udf + +df = self.spark.range(0, 1).toDF('v') + +# Test mixture of multiple UDFs and Pandas UDFs + +@udf('int') +def f1(x): +assert type(x) == int +return x + 1 + +@pandas_udf('int') +def f2(x): +assert type(x) == pd.Series +return x + 10 + +@udf('int') +def f3(x): +assert type(x) == int +return x + 100 + +@pandas_udf('int') +def f4(x): +assert type(x) == pd.Series +return x + 1000 + +# Test mixed udfs in a single projection +df1 = df \ +.withColumn('f1', f1(col('v'))) \ +.withColumn('f2', f2(col('v'))) \ +.withColumn('f3', f3(col('v'))) \ +.withColumn('f4', f4(col('v'))) \ +.withColumn('f2_f1', f2(col('f1'))) \ +.withColumn('f3_f1', f3(col('f1'))) \ --- End diff -- Yeah, the way the test is written is that I am trying to test many combinations so some combinations might not be mixed UDF. Do you prefer that I remove these cases? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21867: [SPARK-24307][CORE] Add conf to revert to old code.
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21867 **[Test build #93545 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/93545/testReport)** for PR 21867 at commit [`1275c01`](https://github.com/apache/spark/commit/1275c0108cf753d2ce552c25164e2d3ce5460849). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21758: [SPARK-24795][CORE] Implement barrier execution m...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/21758#discussion_r205126592 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala --- @@ -359,20 +366,55 @@ private[spark] class TaskSchedulerImpl( // of locality levels so that it gets a chance to launch local tasks on all of them. // NOTE: the preferredLocality order: PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY for (taskSet <- sortedTaskSets) { - var launchedAnyTask = false - var launchedTaskAtCurrentMaxLocality = false - for (currentMaxLocality <- taskSet.myLocalityLevels) { -do { - launchedTaskAtCurrentMaxLocality = resourceOfferSingleTaskSet( -taskSet, currentMaxLocality, shuffledOffers, availableCpus, tasks) - launchedAnyTask |= launchedTaskAtCurrentMaxLocality -} while (launchedTaskAtCurrentMaxLocality) - } - if (!launchedAnyTask) { -taskSet.abortIfCompletelyBlacklisted(hostToExecutors) + // Skip the barrier taskSet if the available slots are less than the number of pending tasks. + if (taskSet.isBarrier && availableSlots < taskSet.numTasks) { --- End diff -- yeah I think its fine to not support Dyanmic Allocation in the initial version. I just think it would be better to have a failure right away if a user tries to use this with dynamic allocation, rather than some undefined behavior. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21867: [SPARK-24307][CORE] Add conf to revert to old code.
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21867 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/1311/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21867: [SPARK-24307][CORE] Add conf to revert to old code.
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21867 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21867: [SPARK-24307][CORE] Add conf to revert to old cod...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/21867#discussion_r205124896 --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala --- @@ -731,7 +731,14 @@ private[spark] class BlockManager( } if (data != null) { -return Some(ChunkedByteBuffer.fromManagedBuffer(data, chunkSize)) +// SPARK-24307 undocumented "escape-hatch" in case there are any issues in converting to +// to ChunkedByteBuffer, to go back to old code-path. Can be removed post Spark 2.4 if +// new path is stable. +if (conf.getBoolean("spark.fetchToNioBuffer", false)) { --- End diff -- sure -- the fetch-to-disk conf is "spark.maxRemoteBlockSizeFetchToMem" which is why I stuck with just "spark." prefix. Also on second thought, I will make the rest of it more specific too, as there is lots of "fetching" this doesn't effect. how about "spark.network.remoteReadNioBufferConversion"? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21596: [SPARK-24601] Bump Jackson version
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21596 **[Test build #93544 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/93544/testReport)** for PR 21596 at commit [`5742678`](https://github.com/apache/spark/commit/5742678da3ca9a900b9e54589d146f0a3f78541f). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21868: [SPARK-24906][SQL] Adaptively enlarge split / partition ...
Github user maropu commented on the issue: https://github.com/apache/spark/pull/21868 Thanks for the work, but, probably, we first need consensus to work on this because this part is pretty performance-sensitive... As @viirya described in the jira, I think we need more general approach than the current fix (for example, I'm not sure that this fix don't have any performance degression). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21403: [SPARK-24341][SQL] Support only IN subqueries with the s...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21403 **[Test build #93543 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/93543/testReport)** for PR 21403 at commit [`bd008fe`](https://github.com/apache/spark/commit/bd008fe51f70f9925e9513680636f4dd9aadcd7c). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21403: [SPARK-24341][SQL] Support only IN subqueries with the s...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21403 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/1310/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21834: [SPARK-22814][SQL] Support Date/Timestamp in a JDBC part...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21834 **[Test build #93542 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/93542/testReport)** for PR 21834 at commit [`577f66e`](https://github.com/apache/spark/commit/577f66e24222190cd1d7c78b50bd7a2ba17189fe). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21403: [SPARK-24341][SQL] Support only IN subqueries with the s...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21403 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21834: [SPARK-22814][SQL] Support Date/Timestamp in a JDBC part...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21834 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/1309/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21834: [SPARK-22814][SQL] Support Date/Timestamp in a JDBC part...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21834 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21871: [SPARK-24916][SQL] Fix type coercion for IN expression w...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21871 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/93538/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21871: [SPARK-24916][SQL] Fix type coercion for IN expression w...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21871 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21871: [SPARK-24916][SQL] Fix type coercion for IN expression w...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21871 **[Test build #93538 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/93538/testReport)** for PR 21871 at commit [`8ef142f`](https://github.com/apache/spark/commit/8ef142f78c22b980fe60d836c56d7d18d221a958). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21758: [SPARK-24795][CORE] Implement barrier execution m...
Github user jiangxb1987 commented on a diff in the pull request: https://github.com/apache/spark/pull/21758#discussion_r205102656 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala --- @@ -359,20 +366,55 @@ private[spark] class TaskSchedulerImpl( // of locality levels so that it gets a chance to launch local tasks on all of them. // NOTE: the preferredLocality order: PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY for (taskSet <- sortedTaskSets) { - var launchedAnyTask = false - var launchedTaskAtCurrentMaxLocality = false - for (currentMaxLocality <- taskSet.myLocalityLevels) { -do { - launchedTaskAtCurrentMaxLocality = resourceOfferSingleTaskSet( -taskSet, currentMaxLocality, shuffledOffers, availableCpus, tasks) - launchedAnyTask |= launchedTaskAtCurrentMaxLocality -} while (launchedTaskAtCurrentMaxLocality) - } - if (!launchedAnyTask) { -taskSet.abortIfCompletelyBlacklisted(hostToExecutors) + // Skip the barrier taskSet if the available slots are less than the number of pending tasks. + if (taskSet.isBarrier && availableSlots < taskSet.numTasks) { --- End diff -- As listed in the design doc, support running barrier stage with dynamic resource allocation is Non-Goal of this task. However, we do plan to better integrate this feature with dynamic resource allocation, hopefully we can get to work on this before Spark 3.0. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21758: [SPARK-24795][CORE] Implement barrier execution m...
Github user jiangxb1987 commented on a diff in the pull request: https://github.com/apache/spark/pull/21758#discussion_r205100534 --- Diff: core/src/main/scala/org/apache/spark/rdd/RDD.scala --- @@ -1647,6 +1647,14 @@ abstract class RDD[T: ClassTag]( } } + /** + * :: Experimental :: + * Indicates that Spark must launch the tasks together for the current stage. + */ + @Experimental + @Since("2.4.0") + def barrier(): RDDBarrier[T] = withScope(new RDDBarrier[T](this)) --- End diff -- Em, thanks for raising this question. IMO we indeed require users be aware of how many tasks they may launch for a barrier stage, and tasks may exchange internal data between each other in the middle, so users really care about the task numbers. I agree it shall be very useful to enable specify the number of tasks in a barrier stage, maybe we can have `RDDBarrier.coalesce(numPartitions: Int)` to enforce the number of tasks to be launched together in a barrier stage. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21872: [WIP] merge upstream
Github user onursatici closed the pull request at: https://github.com/apache/spark/pull/21872 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21872: [WIP] merge upstream
GitHub user onursatici opened a pull request: https://github.com/apache/spark/pull/21872 [WIP] merge upstream ## What changes were proposed in this pull request? (Please fill in changes proposed in this fix) ## How was this patch tested? (Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests) (If this patch involves UI changes, please attach a screenshot; otherwise, remove this) Please review http://spark.apache.org/contributing.html before opening a pull request. You can merge this pull request into a Git repository by running: $ git pull https://github.com/palantir/spark os/merge-upstream Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/21872.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #21872 commit a4697f7d4d5cbe31934f04c788c58c3fd598e0d5 Author: Robert Kruszewski Date: 2018-03-12T15:08:13Z v2 commit 48ecf0b688d00cbf57495dec9c338d479d8f55f3 Author: Robert Kruszewski Date: 2018-03-12T15:11:26Z more changes commit e7429d45e086fa82f5c214a74255232aedb727da Author: Dan Sanduleac Date: 2018-03-12T15:18:00Z Propagate hadoop exclusions to additional hadoop deps commit adeb886d94c0a018382f49340962500c1d3498c2 Author: Robert Kruszewski Date: 2018-03-12T16:21:17Z R tests commit 50e7e92cb29fd914d32b002dc96303360427aff4 Author: Robert Kruszewski Date: 2018-03-12T16:25:56Z nesting commit 2d3ef4a5199b401697d415520cf8aa9cd4a65862 Author: Dan Sanduleac Date: 2018-03-12T16:28:05Z Undo mistake commit 62deec487f2c48680eb86148abc5d818da696a26 Author: Robert Kruszewski Date: 2018-03-12T17:04:47Z maybe commit 9a4cf19c0070ce7d9bf7dfbde7f6646bb0fe2f1a Author: Dan Sanduleac Date: 2018-03-12T17:38:49Z Try to add better logging for test_conda commit 1b7a75835d3abcd7ba55504b56e269a3f061969a Author: Dan Sanduleac Date: 2018-03-12T17:39:39Z Comment out saving the build-sbt-* cache commit a10bdff90cae0fc593d9275dca6193b22e3a2c51 Author: Robert Kruszewski Date: 2018-03-12T18:06:59Z move r tests reporting commit 0ece2d6dfa4a21c2bc05de5b545ba52d5193234a Author: Dan Sanduleac Date: 2018-03-12T18:19:32Z Remove old cache restore commit 570e0e6536fd34a22059e4758ef38144f12d3413 Author: Dan Sanduleac Date: 2018-03-12T19:31:40Z Limit python parallelism in an attempt to reduce flakes commit 020adda18d642d27b3dd43b25260fb37ff6d4692 Author: Dan Sanduleac Date: 2018-03-12T20:24:04Z Make run_python_tests verbose so circle doesn't time it out commit e970d355f66bbe6df3da45213ddf06b797364d30 Author: Dan Sanduleac Date: 2018-03-12T21:25:38Z Use circleci machinery to split test classes by timings commit 87d3173a13c57d0f99bb1710c97e78db224e2ca2 Author: Dan Sanduleac Date: 2018-03-12T22:00:53Z Install unishark into python image commit 7eaafedb755740d9019eda20f6796c203b84e6c7 Author: Dan Sanduleac Date: 2018-03-12T22:01:06Z Run python tests using unishark commit ae3f14e11d7de4095174f09401e222659cf9e0ef Author: Dan Sanduleac Date: 2018-03-12T22:08:45Z don't expect every project to have an entry in circleTestsForProject commit 6e5b03c79a710159e883bcfcff42bad4cdd125b2 Author: Dan Sanduleac Date: 2018-03-13T12:03:45Z Also resolve the oldDeps project before saving ivy-dependency-cache commit 45cec9d9e3f97848acea84631824a22d5cd59d0f Author: Dan Sanduleac Date: 2018-03-13T12:32:47Z Revert "Also resolve the oldDeps project before saving ivy-dependency-cache" This reverts commit 6e5b03c79a710159e883bcfcff42bad4cdd125b2. commit 6c7809cec248bb778ad54117db85868222c38a3f Author: Dan Sanduleac Date: 2018-03-13T13:43:32Z Pipe test output for python/run-tests.py commit 487796d07e24cfa4534a243cbb7451e3397b7137 Author: Dan Sanduleac Date: 2018-03-13T16:00:09Z Try to set respectSessionTimeZone to fix python tests commit bb2076d1b80820fc9280af27c9a63c8ca2bbefb4 Author: Dan Sanduleac Date: 2018-03-13T18:46:46Z Revert "Try to set respectSessionTimeZone to fix python tests" This reverts commit 487796d commit 52cbda2e67cf37346f0ca7f90a2f5a8659447d1d Author: Dan Sanduleac Date: 2018-03-13T18:50:28Z Skip bad arrow tests for now - https://github.com/palantir/spark/issues/328 commit 51858bf187c53aefb123b4f56b7e63b63c544771 Author: Dan Sanduleac Date: 2018-03-13T19:04:27Z Make python tests output xml results in PYSPARK_PYTHON subdir commit 2620ab3fbd3d013a1070a5bd6b1c4250d5e4 Author: Dan Sanduleac Date: 2018-03-13T19:05:15Z Try to use unishark instead of xmlrunner in pyspark.streaming.tests too commit 77eaa80b4f8de046b20f14456bb7c9b5cb223016 Author: Dan Sanduleac Date: 2018-03-13T19:20:25Z Only the basename of PYSPARK_PYTHON please... commit
[GitHub] spark pull request #21758: [SPARK-24795][CORE] Implement barrier execution m...
Github user jiangxb1987 commented on a diff in the pull request: https://github.com/apache/spark/pull/21758#discussion_r205096607 --- Diff: core/src/main/scala/org/apache/spark/rdd/RDDBarrier.scala --- @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.rdd + +import scala.reflect.ClassTag + +import org.apache.spark.BarrierTaskContext +import org.apache.spark.TaskContext +import org.apache.spark.annotation.{Experimental, Since} + +/** Represents an RDD barrier, which forces Spark to launch tasks of this stage together. */ +class RDDBarrier[T: ClassTag](rdd: RDD[T]) { + + /** + * :: Experimental :: + * Maps partitions together with a provided BarrierTaskContext. + * + * `preservesPartitioning` indicates whether the input function preserves the partitioner, which + * should be `false` unless `rdd` is a pair RDD and the input function doesn't modify the keys. + */ + @Experimental + @Since("2.4.0") + def mapPartitions[S: ClassTag]( --- End diff -- `RDDBarrier` is actually expected to be used like a builder, we shall provide more options for the barrier stage in the future, eg. config a timeout of a barrier stage. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...
Github user edwinalu commented on a diff in the pull request: https://github.com/apache/spark/pull/21221#discussion_r205095575 --- Diff: core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala --- @@ -251,6 +261,215 @@ class EventLoggingListenerSuite extends SparkFunSuite with LocalSparkContext wit } } + /** + * Test stage executor metrics logging functionality. This checks that peak + * values from SparkListenerExecutorMetricsUpdate events during a stage are + * logged in a StageExecutorMetrics event for each executor at stage completion. + */ + private def testStageExecutorMetricsEventLogging() { +val conf = getLoggingConf(testDirPath, None) +val logName = "stageExecutorMetrics-test" +val eventLogger = new EventLoggingListener(logName, None, testDirPath.toUri(), conf) +val listenerBus = new LiveListenerBus(conf) + +// expected StageExecutorMetrics, for the given stage id and executor id +val expectedMetricsEvents: Map[(Int, String), SparkListenerStageExecutorMetrics] = + Map( +((0, "1"), + new SparkListenerStageExecutorMetrics("1", 0, 0, + Array(5000L, 50L, 50L, 20L, 50L, 10L, 100L, 30L, 70L, 20L))), +((0, "2"), + new SparkListenerStageExecutorMetrics("2", 0, 0, + Array(7000L, 70L, 50L, 20L, 10L, 10L, 50L, 30L, 80L, 40L))), +((1, "1"), + new SparkListenerStageExecutorMetrics("1", 1, 0, + Array(7000L, 70L, 50L, 30L, 60L, 30L, 80L, 55L, 50L, 0L))), +((1, "2"), + new SparkListenerStageExecutorMetrics("2", 1, 0, + Array(7000L, 70L, 50L, 40L, 10L, 30L, 50L, 60L, 40L, 40L + +// Events to post. +val events = Array( + SparkListenerApplicationStart("executionMetrics", None, +1L, "update", None), + createExecutorAddedEvent(1), + createExecutorAddedEvent(2), + createStageSubmittedEvent(0), + // receive 3 metric updates from each executor with just stage 0 running, + // with different peak updates for each executor + createExecutorMetricsUpdateEvent(1, + Array(4000L, 50L, 20L, 0L, 40L, 0L, 60L, 0L, 70L, 20L)), + createExecutorMetricsUpdateEvent(2, + Array(1500L, 50L, 20L, 0L, 0L, 0L, 20L, 0L, 70L, 0L)), + // exec 1: new stage 0 peaks for metrics at indexes: 2, 4, 6 + createExecutorMetricsUpdateEvent(1, + Array(4000L, 50L, 50L, 0L, 50L, 0L, 100L, 0L, 70L, 20L)), + // exec 2: new stage 0 peaks for metrics at indexes: 0, 4, 6 + createExecutorMetricsUpdateEvent(2, + Array(2000L, 50L, 10L, 0L, 10L, 0L, 30L, 0L, 70L, 0L)), + // exec 1: new stage 0 peaks for metrics at indexes: 5, 7 + createExecutorMetricsUpdateEvent(1, + Array(2000L, 40L, 50L, 0L, 40L, 10L, 90L, 10L, 50L, 0L)), + // exec 2: new stage 0 peaks for metrics at indexes: 0, 5, 6, 7, 8 + createExecutorMetricsUpdateEvent(2, + Array(3500L, 50L, 15L, 0L, 10L, 10L, 35L, 10L, 80L, 0L)), + // now start stage 1, one more metric update for each executor, and new + // peaks for some stage 1 metrics (as listed), initialize stage 1 peaks + createStageSubmittedEvent(1), + // exec 1: new stage 0 peaks for metrics at indexes: 0, 3, 7 --- End diff -- Stage 0 is still running, and these are new peaks for that stage. It is also initializing all the stage 1 metric values, since these are the first executor metrics seen for stage 1 (I'll add this to the comments). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21834: [SPARK-22814][SQL] Support Date/Timestamp in a JDBC part...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21834 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/93537/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21834: [SPARK-22814][SQL] Support Date/Timestamp in a JDBC part...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21834 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21834: [SPARK-22814][SQL] Support Date/Timestamp in a JDBC part...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21834 **[Test build #93537 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/93537/testReport)** for PR 21834 at commit [`1041a38`](https://github.com/apache/spark/commit/1041a38571eb4daf66a23d37d5bf51a1abb8d74c). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21866: [SPARK-24768][FollowUp][SQL]Avro migration followup: cha...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21866 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/1308/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org