[GitHub] spark pull request #21650: [SPARK-24624][SQL][PYTHON] Support mixture of Pyt...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/21650#discussion_r205133506 --- Diff: python/pyspark/sql/tests.py --- @@ -5060,6 +5049,147 @@ def test_type_annotation(self): df = self.spark.range(1).select(pandas_udf(f=_locals['noop'], returnType='bigint')('id')) self.assertEqual(df.first()[0], 0) +def test_mixed_udf(self): +import pandas as pd +from pyspark.sql.functions import col, udf, pandas_udf + +df = self.spark.range(0, 1).toDF('v') + +# Test mixture of multiple UDFs and Pandas UDFs + +@udf('int') +def f1(x): +assert type(x) == int +return x + 1 + +@pandas_udf('int') +def f2(x): +assert type(x) == pd.Series +return x + 10 + +@udf('int') +def f3(x): +assert type(x) == int +return x + 100 + +@pandas_udf('int') +def f4(x): +assert type(x) == pd.Series +return x + 1000 + +# Test mixed udfs in a single projection +df1 = df \ +.withColumn('f1', f1(col('v'))) \ +.withColumn('f2', f2(col('v'))) \ +.withColumn('f3', f3(col('v'))) \ +.withColumn('f4', f4(col('v'))) \ +.withColumn('f2_f1', f2(col('f1'))) \ +.withColumn('f3_f1', f3(col('f1'))) \ +.withColumn('f4_f1', f4(col('f1'))) \ +.withColumn('f3_f2', f3(col('f2'))) \ +.withColumn('f4_f2', f4(col('f2'))) \ +.withColumn('f4_f3', f4(col('f3'))) \ +.withColumn('f3_f2_f1', f3(col('f2_f1'))) \ +.withColumn('f4_f2_f1', f4(col('f2_f1'))) \ +.withColumn('f4_f3_f1', f4(col('f3_f1'))) \ +.withColumn('f4_f3_f2', f4(col('f3_f2'))) \ +.withColumn('f4_f3_f2_f1', f4(col('f3_f2_f1'))) + +# Test mixed udfs in a single expression +df2 = df \ +.withColumn('f1', f1(col('v'))) \ +.withColumn('f2', f2(col('v'))) \ +.withColumn('f3', f3(col('v'))) \ +.withColumn('f4', f4(col('v'))) \ +.withColumn('f2_f1', f2(f1(col('v' \ +.withColumn('f3_f1', f3(f1(col('v' \ +.withColumn('f4_f1', f4(f1(col('v' \ +.withColumn('f3_f2', f3(f2(col('v' \ +.withColumn('f4_f2', f4(f2(col('v' \ +.withColumn('f4_f3', f4(f3(col('v' \ +.withColumn('f3_f2_f1', f3(f2(f1(col('v') \ +.withColumn('f4_f2_f1', f4(f2(f1(col('v') \ +.withColumn('f4_f3_f1', f4(f3(f1(col('v') \ +.withColumn('f4_f3_f2', f4(f3(f2(col('v') \ +.withColumn('f4_f3_f2_f1', f4(f3(f2(f1(col('v')) + +# expected result +df3 = df \ +.withColumn('f1', df['v'] + 1) \ +.withColumn('f2', df['v'] + 10) \ +.withColumn('f3', df['v'] + 100) \ +.withColumn('f4', df['v'] + 1000) \ +.withColumn('f2_f1', df['v'] + 11) \ +.withColumn('f3_f1', df['v'] + 101) \ +.withColumn('f4_f1', df['v'] + 1001) \ +.withColumn('f3_f2', df['v'] + 110) \ +.withColumn('f4_f2', df['v'] + 1010) \ +.withColumn('f4_f3', df['v'] + 1100) \ +.withColumn('f3_f2_f1', df['v'] + 111) \ +.withColumn('f4_f2_f1', df['v'] + 1011) \ +.withColumn('f4_f3_f1', df['v'] + 1101) \ +.withColumn('f4_f3_f2', df['v'] + 1110) \ +.withColumn('f4_f3_f2_f1', df['v'] + ) + +self.assertEquals(df3.collect(), df1.collect()) +self.assertEquals(df3.collect(), df2.collect()) + +def test_mixed_udf_and_sql(self): +import pandas as pd +from pyspark.sql.functions import udf, pandas_udf + +df = self.spark.range(0, 1).toDF('v') + +# Test mixture of UDFs, Pandas UDFs and SQL expression. + +@udf('int') +def f1(x): +assert type(x) == int +return x + 1 + +def f2(x): --- End diff -- Added --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21803: [SPARK-24849][SPARK-24911][SQL] Converting a value of St...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21803 **[Test build #93540 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/93540/testReport)** for PR 21803 at commit [`60f663d`](https://github.com/apache/spark/commit/60f663d7b12fcb3141eff774a9120f049d837112). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21803: [SPARK-24849][SPARK-24911][SQL] Converting a value of St...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21803 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/93540/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21803: [SPARK-24849][SPARK-24911][SQL] Converting a value of St...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21803 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21584: [SPARK-24433][K8S] Initial R Bindings for SparkR on K8s
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21584 Kubernetes integration test starting URL: https://amplab.cs.berkeley.edu/jenkins/job/testing-k8s-prb-make-spark-distribution-unified/1313/ --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21650: [SPARK-24624][SQL][PYTHON] Support mixture of Pyt...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/21650#discussion_r205141733 --- Diff: python/pyspark/sql/tests.py --- @@ -5060,6 +5049,147 @@ def test_type_annotation(self): df = self.spark.range(1).select(pandas_udf(f=_locals['noop'], returnType='bigint')('id')) self.assertEqual(df.first()[0], 0) +def test_mixed_udf(self): +import pandas as pd +from pyspark.sql.functions import col, udf, pandas_udf + +df = self.spark.range(0, 1).toDF('v') + +# Test mixture of multiple UDFs and Pandas UDFs + +@udf('int') +def f1(x): +assert type(x) == int +return x + 1 + +@pandas_udf('int') +def f2(x): +assert type(x) == pd.Series +return x + 10 + +@udf('int') +def f3(x): +assert type(x) == int +return x + 100 + +@pandas_udf('int') +def f4(x): +assert type(x) == pd.Series +return x + 1000 + +# Test mixed udfs in a single projection +df1 = df \ +.withColumn('f1', f1(col('v'))) \ +.withColumn('f2', f2(col('v'))) \ +.withColumn('f3', f3(col('v'))) \ +.withColumn('f4', f4(col('v'))) \ +.withColumn('f2_f1', f2(col('f1'))) \ +.withColumn('f3_f1', f3(col('f1'))) \ +.withColumn('f4_f1', f4(col('f1'))) \ +.withColumn('f3_f2', f3(col('f2'))) \ +.withColumn('f4_f2', f4(col('f2'))) \ +.withColumn('f4_f3', f4(col('f3'))) \ +.withColumn('f3_f2_f1', f3(col('f2_f1'))) \ +.withColumn('f4_f2_f1', f4(col('f2_f1'))) \ +.withColumn('f4_f3_f1', f4(col('f3_f1'))) \ +.withColumn('f4_f3_f2', f4(col('f3_f2'))) \ +.withColumn('f4_f3_f2_f1', f4(col('f3_f2_f1'))) + +# Test mixed udfs in a single expression +df2 = df \ +.withColumn('f1', f1(col('v'))) \ +.withColumn('f2', f2(col('v'))) \ +.withColumn('f3', f3(col('v'))) \ +.withColumn('f4', f4(col('v'))) \ +.withColumn('f2_f1', f2(f1(col('v' \ +.withColumn('f3_f1', f3(f1(col('v' \ +.withColumn('f4_f1', f4(f1(col('v' \ +.withColumn('f3_f2', f3(f2(col('v' \ +.withColumn('f4_f2', f4(f2(col('v' \ +.withColumn('f4_f3', f4(f3(col('v' \ +.withColumn('f3_f2_f1', f3(f2(f1(col('v') \ +.withColumn('f4_f2_f1', f4(f2(f1(col('v') \ +.withColumn('f4_f3_f1', f4(f3(f1(col('v') \ +.withColumn('f4_f3_f2', f4(f3(f2(col('v') \ +.withColumn('f4_f3_f2_f1', f4(f3(f2(f1(col('v')) + +# expected result +df3 = df \ +.withColumn('f1', df['v'] + 1) \ +.withColumn('f2', df['v'] + 10) \ +.withColumn('f3', df['v'] + 100) \ +.withColumn('f4', df['v'] + 1000) \ +.withColumn('f2_f1', df['v'] + 11) \ +.withColumn('f3_f1', df['v'] + 101) \ +.withColumn('f4_f1', df['v'] + 1001) \ +.withColumn('f3_f2', df['v'] + 110) \ +.withColumn('f4_f2', df['v'] + 1010) \ +.withColumn('f4_f3', df['v'] + 1100) \ +.withColumn('f3_f2_f1', df['v'] + 111) \ +.withColumn('f4_f2_f1', df['v'] + 1011) \ +.withColumn('f4_f3_f1', df['v'] + 1101) \ +.withColumn('f4_f3_f2', df['v'] + 1110) \ +.withColumn('f4_f3_f2_f1', df['v'] + ) + +self.assertEquals(df3.collect(), df1.collect()) +self.assertEquals(df3.collect(), df2.collect()) + +def test_mixed_udf_and_sql(self): +import pandas as pd +from pyspark.sql.functions import udf, pandas_udf + +df = self.spark.range(0, 1).toDF('v') + +# Test mixture of UDFs, Pandas UDFs and SQL expression. + +@udf('int') +def f1(x): +assert type(x) == int +return x + 1 + +def f2(x): +return x + 10 + +@pandas_udf('int') +def f3(x): +assert type(x) == pd.Series +return x + 100 + +df1 = df.withColumn('f1', f1(df['v'])) \ +.withColumn('f2', f2(df['v'])) \ +.withColumn('f3', f3(df['v'])) \ +.withColumn('f1_f2', f1(f2(df['v']))) \ +.withColumn('f1_f3', f1(f3(df['v']))) \ +
[GitHub] spark issue #21584: [SPARK-24433][K8S] Initial R Bindings for SparkR on K8s
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21584 Kubernetes integration test status failure URL: https://amplab.cs.berkeley.edu/jenkins/job/testing-k8s-prb-make-spark-distribution-unified/1313/ --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21584: [SPARK-24433][K8S] Initial R Bindings for SparkR on K8s
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21584 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/1313/ Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21584: [SPARK-24433][K8S] Initial R Bindings for SparkR on K8s
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21584 Merged build finished. Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21650: [SPARK-24624][SQL][PYTHON] Support mixture of Pyt...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/21650#discussion_r205144604 --- Diff: python/pyspark/sql/tests.py --- @@ -5060,6 +5049,147 @@ def test_type_annotation(self): df = self.spark.range(1).select(pandas_udf(f=_locals['noop'], returnType='bigint')('id')) self.assertEqual(df.first()[0], 0) +def test_mixed_udf(self): +import pandas as pd +from pyspark.sql.functions import col, udf, pandas_udf + +df = self.spark.range(0, 1).toDF('v') + +# Test mixture of multiple UDFs and Pandas UDFs + +@udf('int') +def f1(x): +assert type(x) == int +return x + 1 + +@pandas_udf('int') +def f2(x): +assert type(x) == pd.Series +return x + 10 + +@udf('int') +def f3(x): +assert type(x) == int +return x + 100 + +@pandas_udf('int') +def f4(x): +assert type(x) == pd.Series +return x + 1000 + +# Test mixed udfs in a single projection +df1 = df \ +.withColumn('f1', f1(col('v'))) \ +.withColumn('f2', f2(col('v'))) \ +.withColumn('f3', f3(col('v'))) \ +.withColumn('f4', f4(col('v'))) \ +.withColumn('f2_f1', f2(col('f1'))) \ +.withColumn('f3_f1', f3(col('f1'))) \ +.withColumn('f4_f1', f4(col('f1'))) \ +.withColumn('f3_f2', f3(col('f2'))) \ +.withColumn('f4_f2', f4(col('f2'))) \ +.withColumn('f4_f3', f4(col('f3'))) \ +.withColumn('f3_f2_f1', f3(col('f2_f1'))) \ +.withColumn('f4_f2_f1', f4(col('f2_f1'))) \ +.withColumn('f4_f3_f1', f4(col('f3_f1'))) \ +.withColumn('f4_f3_f2', f4(col('f3_f2'))) \ +.withColumn('f4_f3_f2_f1', f4(col('f3_f2_f1'))) + +# Test mixed udfs in a single expression +df2 = df \ +.withColumn('f1', f1(col('v'))) \ +.withColumn('f2', f2(col('v'))) \ +.withColumn('f3', f3(col('v'))) \ +.withColumn('f4', f4(col('v'))) \ +.withColumn('f2_f1', f2(f1(col('v' \ +.withColumn('f3_f1', f3(f1(col('v' \ +.withColumn('f4_f1', f4(f1(col('v' \ +.withColumn('f3_f2', f3(f2(col('v' \ +.withColumn('f4_f2', f4(f2(col('v' \ +.withColumn('f4_f3', f4(f3(col('v' \ +.withColumn('f3_f2_f1', f3(f2(f1(col('v') \ +.withColumn('f4_f2_f1', f4(f2(f1(col('v') \ +.withColumn('f4_f3_f1', f4(f3(f1(col('v') \ +.withColumn('f4_f3_f2', f4(f3(f2(col('v') \ +.withColumn('f4_f3_f2_f1', f4(f3(f2(f1(col('v')) + +# expected result +df3 = df \ +.withColumn('f1', df['v'] + 1) \ +.withColumn('f2', df['v'] + 10) \ +.withColumn('f3', df['v'] + 100) \ +.withColumn('f4', df['v'] + 1000) \ +.withColumn('f2_f1', df['v'] + 11) \ +.withColumn('f3_f1', df['v'] + 101) \ +.withColumn('f4_f1', df['v'] + 1001) \ +.withColumn('f3_f2', df['v'] + 110) \ +.withColumn('f4_f2', df['v'] + 1010) \ +.withColumn('f4_f3', df['v'] + 1100) \ +.withColumn('f3_f2_f1', df['v'] + 111) \ +.withColumn('f4_f2_f1', df['v'] + 1011) \ +.withColumn('f4_f3_f1', df['v'] + 1101) \ +.withColumn('f4_f3_f2', df['v'] + 1110) \ +.withColumn('f4_f3_f2_f1', df['v'] + ) + +self.assertEquals(df3.collect(), df1.collect()) +self.assertEquals(df3.collect(), df2.collect()) + +def test_mixed_udf_and_sql(self): +import pandas as pd +from pyspark.sql.functions import udf, pandas_udf + +df = self.spark.range(0, 1).toDF('v') + +# Test mixture of UDFs, Pandas UDFs and SQL expression. + +@udf('int') +def f1(x): +assert type(x) == int +return x + 1 + +def f2(x): +return x + 10 + +@pandas_udf('int') +def f3(x): +assert type(x) == pd.Series +return x + 100 + +df1 = df.withColumn('f1', f1(df['v'])) \ +.withColumn('f2', f2(df['v'])) \ +.withColumn('f3', f3(df['v'])) \ +.withColumn('f1_f2', f1(f2(df['v']))) \ +.withColumn('f1_f3', f1(f3(df['v']))) \ +
[GitHub] spark pull request #21650: [SPARK-24624][SQL][PYTHON] Support mixture of Pyt...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/21650#discussion_r205146857 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala --- @@ -94,36 +95,94 @@ object ExtractPythonUDFFromAggregate extends Rule[LogicalPlan] { */ object ExtractPythonUDFs extends Rule[SparkPlan] with PredicateHelper { - private def hasPythonUDF(e: Expression): Boolean = { + private case class LazyEvalType(var evalType: Int = -1) { --- End diff -- hm looks messier then I thought .. previous one looks a bit better to me .. wdyt @BryanCutler ? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21867: [SPARK-24307][CORE] Add conf to revert to old cod...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/21867#discussion_r205150784 --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala --- @@ -731,7 +733,14 @@ private[spark] class BlockManager( } if (data != null) { -return Some(ChunkedByteBuffer.fromManagedBuffer(data, chunkSize)) +// SPARK-24307 undocumented "escape-hatch" in case there are any issues in converting to +// to ChunkedByteBuffer, to go back to old code-path. Can be removed post Spark 2.4 if --- End diff -- oops, thanks for catching that. fixed --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21867: [SPARK-24307][CORE] Add conf to revert to old code.
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21867 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21867: [SPARK-24307][CORE] Add conf to revert to old code.
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21867 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/1314/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21867: [SPARK-24307][CORE] Add conf to revert to old code.
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21867 **[Test build #93548 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/93548/testReport)** for PR 21867 at commit [`a5b00b8`](https://github.com/apache/spark/commit/a5b00b8a05538a6adb3a4525c2fecc1e15575f7c). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21451: [SPARK-24296][CORE][WIP] Replicate large blocks as a str...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21451 **[Test build #93549 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/93549/testReport)** for PR 21451 at commit [`fe31a7d`](https://github.com/apache/spark/commit/fe31a7d61ecabca76356b313211bb7b769e02b5b). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21451: [SPARK-24296][CORE][WIP] Replicate large blocks as a str...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21451 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21451: [SPARK-24296][CORE][WIP] Replicate large blocks as a str...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21451 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/1315/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21869: [SPARK-24891][FOLLOWUP][HOT-FIX][2.3] Fix the Com...
Github user gatorsmile closed the pull request at: https://github.com/apache/spark/pull/21869 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21866: [SPARK-24768][FollowUp][SQL]Avro migration followup: cha...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21866 **[Test build #93541 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/93541/testReport)** for PR 21866 at commit [`cff6f2a`](https://github.com/apache/spark/commit/cff6f2a0459e8cc4e48f28bde8103ea44ce5a1ab). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21596: [SPARK-24601] Bump Jackson version
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/21596 @Fokko Let us wait for the code freeze of Spark 2.4 release? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21866: [SPARK-24768][FollowUp][SQL]Avro migration followup: cha...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21866 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/93541/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21866: [SPARK-24768][FollowUp][SQL]Avro migration followup: cha...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21866 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21866: [SPARK-24768][FollowUp][SQL]Avro migration followup: cha...
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/21866 Thanks! Merged to master. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21866: [SPARK-24768][FollowUp][SQL]Avro migration follow...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/21866 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21818: [SPARK-24860][SQL] Support setting of partitionOverWrite...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21818 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21818: [SPARK-24860][SQL] Support setting of partitionOverWrite...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21818 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/1316/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21818: [SPARK-24860][SQL] Support setting of partitionOverWrite...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21818 **[Test build #93550 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/93550/testReport)** for PR 21818 at commit [`03c0926`](https://github.com/apache/spark/commit/03c0926bed03bcedd035be3a743d08b664ec5006). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21698: [SPARK-23243][Core] Fix RDD.repartition() data correctne...
Github user squito commented on the issue: https://github.com/apache/spark/pull/21698 sorry I got bogged down in some other things, thanks for the responses: >> on a fetch-failure in repartition, fail the entire job > Currently I can't figure out a case that a customer may vote for this behavior change, esp. FetchFailure tends to occur more often on long-running jobs on big datasets compared to interactive queries. yeah maybe you're right. I was thinking that maybe there comes a point where if you have one failure, you expect more failures on retries as well (in my experience, large shuffles often fail the first time when everything is getting fetched, but on subsequent retries they manage to succeed because the load is smaller). It might be better to just not bother retrying. But then again, there are situtations where retry is fine, and I guess users won't know which one to choose. >> since we only need to do this sort on RDDs post shuffle > IIUC this is not the case in RDD.repartition(), see https://github.com/apache/spark/blob/94c67a76ec1fda908a671a47a2a1fa63b3ab1b06/core/src/main/scala/org/apache/spark/rdd/RDD.scala#L453~L461 , it requires the input rows are ordered then perform a round-robin style data transformation, so I don't see what we can do if the input data type is not sortable. my point is that if you serialize the input (the `Iterator[T]` there), then there is a well-defined ordering based on the serialized bytes. (I guess I'm assuming serialization is deterministic, I can't think of a case that isn't true.) In general, you don't know that `T` is serializable, but after a shuffle you know it must be. So that gives you a way to always deterministically order the input after a shuffle, though at a pretty serious performance penalty. You could avoid the re-serialization overhead by pushing the sort down into ShuffleBlockFetcherIterator etc. Maybe you could skip this if you detect checkpointing or something equivalent which eliminates the ordering dependency ... or maybe thats just not possible with the current apis. thanks for the description of the problem with determinstic shuffle ordering. The "Shuffle Merge With Spills" problem seems particularly hard to solve. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21850: [SPARK-24892] [SQL] Simplify `CaseWhen` to `If` w...
Github user dongjoon-hyun commented on a diff in the pull request: https://github.com/apache/spark/pull/21850#discussion_r205172569 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala --- @@ -2813,4 +2813,16 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { checkAnswer(df, Seq(Row(3, 99, 1))) } } + + test("SPARK-24892: simplify `CaseWhen` to `If` when there is only one branch") { +withTable("t") { + Seq(Some(1), null, Some(3)).toDF("a").write.saveAsTable("t") + + val plan1 = sql("select case when a is null then 1 end col1 from t") + val plan2 = sql("select if(a is null, 1, null) col1 from t") + + checkAnswer(plan1, Row(null) :: Row(1) :: Row(null) :: Nil) + comparePlans(plan1.queryExecution.optimizedPlan, plan2.queryExecution.optimizedPlan) +} --- End diff -- Thank you for adding this higher level test, too. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21850: [SPARK-24892] [SQL] Simplify `CaseWhen` to `If` w...
Github user dongjoon-hyun commented on a diff in the pull request: https://github.com/apache/spark/pull/21850#discussion_r205177295 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala --- @@ -414,6 +414,16 @@ object SimplifyConditionals extends Rule[LogicalPlan] with PredicateHelper { // these branches can be pruned away val (h, t) = branches.span(_._1 != TrueLiteral) CaseWhen( h :+ t.head, None) + + case CaseWhen(branches, elseValue) if branches.length == 1 => +// Using pattern matching like `CaseWhen((cond, branchValue) :: Nil, elseValue)` will not +// work since the implementation of `branches` can be `ArrayBuffer`. A full test is in --- End diff -- +1 for @ueshin 's suggestion. And, sorry for this trouble, @dbtsai . :) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21748: [SPARK-23146][K8S] Support client mode.
Github user mccheah commented on a diff in the pull request: https://github.com/apache/spark/pull/21748#discussion_r205177861 --- Diff: docs/running-on-kubernetes.md --- @@ -117,6 +117,45 @@ If the local proxy is running at localhost:8001, `--master k8s://http://127.0.0. spark-submit. Finally, notice that in the above example we specify a jar with a specific URI with a scheme of `local://`. This URI is the location of the example jar that is already in the Docker image. +## Client Mode + +Starting with Spark 2.4.0, it is possible to run Spark applications on Kubernetes in client mode. When your application +runs in client mode, the driver can run inside a pod or on a physical host. When running an application in client mode, +it is recommended to account for the following factors: + +### Client Mode Networking + +Spark executors must be able to connect to the Spark driver over a hostname and a port that is routable from the Spark +executors. The specific network configuration that will be required for Spark to work in client mode will vary per +setup. If you run your driver inside a Kubernetes pod, you can use a +[headless service](https://kubernetes.io/docs/concepts/services-networking/service/#headless-services) to allow your +driver pod to be routable from the executors by a stable hostname. When deploying your headless service, ensure that +the service's label selector will only match the driver pod and no other pods; it is recommended to assign your driver +pod a sufficiently unique label and to use that label in the label selector of the headless service. Specify the driver's +hostname via `spark.driver.host` and your spark driver's port to `spark.driver.port`. --- End diff -- Yeah manual setup is fine for now. Think additional docs around how to do all this can be a separate PR. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21748: [SPARK-23146][K8S] Support client mode.
Github user mccheah commented on a diff in the pull request: https://github.com/apache/spark/pull/21748#discussion_r205178092 --- Diff: docs/running-on-kubernetes.md --- @@ -117,6 +117,45 @@ If the local proxy is running at localhost:8001, `--master k8s://http://127.0.0. spark-submit. Finally, notice that in the above example we specify a jar with a specific URI with a scheme of `local://`. This URI is the location of the example jar that is already in the Docker image. +## Client Mode + +Starting with Spark 2.4.0, it is possible to run Spark applications on Kubernetes in client mode. When your application +runs in client mode, the driver can run inside a pod or on a physical host. When running an application in client mode, +it is recommended to account for the following factors: + +### Client Mode Networking + +Spark executors must be able to connect to the Spark driver over a hostname and a port that is routable from the Spark +executors. The specific network configuration that will be required for Spark to work in client mode will vary per +setup. If you run your driver inside a Kubernetes pod, you can use a +[headless service](https://kubernetes.io/docs/concepts/services-networking/service/#headless-services) to allow your +driver pod to be routable from the executors by a stable hostname. When deploying your headless service, ensure that +the service's label selector will only match the driver pod and no other pods; it is recommended to assign your driver +pod a sufficiently unique label and to use that label in the label selector of the headless service. Specify the driver's +hostname via `spark.driver.host` and your spark driver's port to `spark.driver.port`. + +### Client Mode Executor Pod Garbage Collection + +If you run your Spark driver in a pod, it is highly recommended to set `spark.driver.pod.name` to the name of that pod. +When this property is set, the Spark scheduler will deploy the executor pods with an +[OwnerReference](https://kubernetes.io/docs/concepts/workloads/controllers/garbage-collection/), which in turn will +ensure that once the driver pod is deleted from the cluster, all of the application's executor pods will also be deleted. +The driver will look for a pod with the given name in the namespace specified by `spark.kubernetes.namespace`, and +an OwnerReference pointing to that pod will be added to each executor pod's OwnerReferences list. Be careful to avoid +setting the OwnerReference to a pod that is not actually that driver pod, or else the executors may be terminated +prematurely when the wrong pod is deleted. + +If your application is not running inside a pod, or if `spark.driver.pod.name` is not set when your application is +actually running in a pod, keep in mind that the executor pods may not be properly deleted from the cluster when the +application exits. The Spark scheduler attempts to delete these pods, but if the network request to the API server fails +for any reason, these pods will remain in the cluster. The executor processes should exit when they cannot reach the +driver, so the executor pods should not consume compute resources (cpu and memory) in the cluster after your application --- End diff -- Unclear, it triggers in the `onDisconnected` event so I think there's a persistent socket connection that's dropped that causes the exit. So, it should more or less be instantaneous. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21748: [SPARK-23146][K8S] Support client mode.
Github user mccheah commented on a diff in the pull request: https://github.com/apache/spark/pull/21748#discussion_r205178769 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala --- @@ -35,26 +35,39 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit override def canCreate(masterURL: String): Boolean = masterURL.startsWith("k8s") override def createTaskScheduler(sc: SparkContext, masterURL: String): TaskScheduler = { -if (masterURL.startsWith("k8s") && - sc.deployMode == "client" && - !sc.conf.get(KUBERNETES_DRIVER_SUBMIT_CHECK).getOrElse(false)) { - throw new SparkException("Client mode is currently not supported for Kubernetes.") -} - new TaskSchedulerImpl(sc) } override def createSchedulerBackend( sc: SparkContext, masterURL: String, scheduler: TaskScheduler): SchedulerBackend = { +val wasSparkSubmittedInClusterMode = sc.conf.get(KUBERNETES_DRIVER_SUBMIT_CHECK) +val (authConfPrefix, + apiServerUri, + defaultServiceAccountToken, + defaultServiceAccountCaCrt) = if (wasSparkSubmittedInClusterMode) { + require(sc.conf.get(KUBERNETES_DRIVER_POD_NAME).isDefined, +"If the application is deployed using spark-submit in cluster mode, the driver pod name " + + "must be provided.") + (KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX, +KUBERNETES_MASTER_INTERNAL_URL, +Some(new File(Config.KUBERNETES_SERVICE_ACCOUNT_TOKEN_PATH)), +Some(new File(Config.KUBERNETES_SERVICE_ACCOUNT_CA_CRT_PATH))) +} else { + (KUBERNETES_AUTH_CLIENT_MODE_PREFIX, +masterURL.substring("k8s://".length()), --- End diff -- We can make such a helper function, currently this logic is done here and in KubernetesClientApplication --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21748: [SPARK-23146][K8S] Support client mode.
Github user mccheah commented on the issue: https://github.com/apache/spark/pull/21748 Ok after the next build passes I'm going to merge immediately. Thanks for the review. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21748: [SPARK-23146][K8S] Support client mode.
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21748 **[Test build #93551 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/93551/testReport)** for PR 21748 at commit [`ded1ff6`](https://github.com/apache/spark/commit/ded1ff6081da6f0b3879f6bf63b73caf01983bea). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21821: [SPARK-24867] [SQL] Add AnalysisBarrier to DataFrameWrit...
Github user maryannxue commented on the issue: https://github.com/apache/spark/pull/21821 LGTM. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21821: [SPARK-24867] [SQL] Add AnalysisBarrier to DataFrameWrit...
Github user hvanhovell commented on the issue: https://github.com/apache/spark/pull/21821 LGTM --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21873: [SPARK-24919][BUILD] New linter rule for sparkCon...
GitHub user gengliangwang opened a pull request: https://github.com/apache/spark/pull/21873 [SPARK-24919][BUILD] New linter rule for sparkContext.hadoopConfiguration ## What changes were proposed in this pull request? In most cases, we should use `spark.sessionState.newHadoopConf()` instead of `sparkContext.hadoopConfiguration`, so that the hadoop configurations specified in Spark session configuration will come into effect. Add a rule matching `spark.sparkContext.hadoopConfiguration` or `spark.sqlContext.sparkContext.hadoopConfiguration` to prevent the usage. ## How was this patch tested? Unit test You can merge this pull request into a Git repository by running: $ git pull https://github.com/gengliangwang/spark linterRule Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/21873.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #21873 commit 8b5ad70c472ea4931dfd352b28044dd26bbed2a1 Author: Gengliang Wang Date: 2018-07-25T11:59:36Z new linter rule for spark.sparkContext.hadoopConfiguration --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21748: [SPARK-23146][K8S] Support client mode.
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21748 Kubernetes integration test starting URL: https://amplab.cs.berkeley.edu/jenkins/job/testing-k8s-prb-make-spark-distribution-unified/1317/ --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21873: [SPARK-24919][BUILD] New linter rule for sparkContext.ha...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21873 **[Test build #93552 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/93552/testReport)** for PR 21873 at commit [`8b5ad70`](https://github.com/apache/spark/commit/8b5ad70c472ea4931dfd352b28044dd26bbed2a1). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21650: [SPARK-24624][SQL][PYTHON] Support mixture of Pyt...
Github user BryanCutler commented on a diff in the pull request: https://github.com/apache/spark/pull/21650#discussion_r205185872 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala --- @@ -94,36 +95,94 @@ object ExtractPythonUDFFromAggregate extends Rule[LogicalPlan] { */ object ExtractPythonUDFs extends Rule[SparkPlan] with PredicateHelper { - private def hasPythonUDF(e: Expression): Boolean = { + private case class LazyEvalType(var evalType: Int = -1) { + +def isSet: Boolean = evalType >= 0 + +def set(evalType: Int): Unit = { + if (isSet) { +throw new IllegalStateException("Eval type has already been set") + } else { +this.evalType = evalType + } +} + +def get(): Int = { + if (!isSet) { +throw new IllegalStateException("Eval type is not set") + } else { +evalType + } +} + } + + private def hasScalarPythonUDF(e: Expression): Boolean = { e.find(PythonUDF.isScalarPythonUDF).isDefined } - private def canEvaluateInPython(e: PythonUDF): Boolean = { -e.children match { - // single PythonUDF child could be chained and evaluated in Python - case Seq(u: PythonUDF) => canEvaluateInPython(u) - // Python UDF can't be evaluated directly in JVM - case children => !children.exists(hasPythonUDF) + /** + * Check whether a PythonUDF expression can be evaluated in Python. + * + * If the lazy eval type is not set, this method checks for either Batched Python UDF and Scalar + * Pandas UDF. If the lazy eval type is set, this method checks for the expression of the + * specified eval type. + * + * This method will also set the lazy eval type to be the type of the first evaluable expression, + * i.e., if lazy eval type is not set and we find a evaluable Python UDF expression, lazy eval + * type will be set to the eval type of the expression. + * + */ + private def canEvaluateInPython(e: PythonUDF, lazyEvalType: LazyEvalType): Boolean = { +if (!lazyEvalType.isSet) { + e.children match { +// single PythonUDF child could be chained and evaluated in Python if eval type is the same +case Seq(u: PythonUDF) => + // Need to recheck the eval type because lazy eval type will be set if child Python UDF is + // evaluable + canEvaluateInPython(u, lazyEvalType) && lazyEvalType.get == e.evalType +// Python UDF can't be evaluated directly in JVM +case children => if (!children.exists(hasScalarPythonUDF)) { + // We found the first evaluable expression, set lazy eval type to its eval type. + lazyEvalType.set(e.evalType) + true +} else { + false +} + } +} else { + if (e.evalType != lazyEvalType.get) { +false + } else { +e.children match { + case Seq(u: PythonUDF) => canEvaluateInPython(u, lazyEvalType) --- End diff -- There are 2 paths for recursion here, which is probably not a good idea. This method is much more complicated now and a little difficult to follow. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21821: [SPARK-24867] [SQL] Add AnalysisBarrier to DataFrameWrit...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21821 **[Test build #93553 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/93553/testReport)** for PR 21821 at commit [`328addd`](https://github.com/apache/spark/commit/328adddc0c1870400e92934827150df2c98731f6). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21650: [SPARK-24624][SQL][PYTHON] Support mixture of Pyt...
Github user BryanCutler commented on a diff in the pull request: https://github.com/apache/spark/pull/21650#discussion_r205186820 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala --- @@ -94,36 +95,94 @@ object ExtractPythonUDFFromAggregate extends Rule[LogicalPlan] { */ object ExtractPythonUDFs extends Rule[SparkPlan] with PredicateHelper { - private def hasPythonUDF(e: Expression): Boolean = { + private case class LazyEvalType(var evalType: Int = -1) { --- End diff -- I'm not too fond of the name `LazyEvalType`, makes it sound like something else. Maybe `CurrentEvalType`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21103: [SPARK-23915][SQL] Add array_except function
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/21103#discussion_r205186241 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -3805,3 +3799,330 @@ object ArrayUnion { new GenericArrayData(arrayBuffer) } } + +/** + * Returns an array of the elements in the intersect of x and y, without duplicates + */ +@ExpressionDescription( + usage = """ + _FUNC_(array1, array2) - Returns an array of the elements in array1 but not in array2, +without duplicates. + """, + examples = """ +Examples: + > SELECT _FUNC_(array(1, 2, 3), array(1, 3, 5)); + array(2) + """, + since = "2.4.0") +case class ArrayExcept(left: Expression, right: Expression) extends ArraySetLike { + override def dataType: DataType = left.dataType + + var hsInt: OpenHashSet[Int] = _ + var hsLong: OpenHashSet[Long] = _ + + def assignInt(array: ArrayData, idx: Int, resultArray: ArrayData, pos: Int): Boolean = { +val elem = array.getInt(idx) +if (!hsInt.contains(elem)) { + if (resultArray != null) { +resultArray.setInt(pos, elem) + } + hsInt.add(elem) + true +} else { + false +} + } + + def assignLong(array: ArrayData, idx: Int, resultArray: ArrayData, pos: Int): Boolean = { +val elem = array.getLong(idx) +if (!hsLong.contains(elem)) { + if (resultArray != null) { +resultArray.setLong(pos, elem) + } + hsLong.add(elem) + true +} else { + false +} + } + + def evalIntLongPrimitiveType( + array1: ArrayData, + array2: ArrayData, + resultArray: ArrayData, + isLongType: Boolean): Int = { +// store elements into resultArray +var notFoundNullElement = true +var i = 0 +while (i < array2.numElements()) { + if (array2.isNullAt(i)) { +notFoundNullElement = false + } else { +val assigned = if (!isLongType) { + hsInt.add(array2.getInt(i)) +} else { + hsLong.add(array2.getLong(i)) +} + } + i += 1 +} +var pos = 0 +i = 0 +while (i < array1.numElements()) { + if (array1.isNullAt(i)) { +if (notFoundNullElement) { + if (resultArray != null) { +resultArray.setNullAt(pos) + } + pos += 1 + notFoundNullElement = false +} + } else { +val assigned = if (!isLongType) { + assignInt(array1, i, resultArray, pos) +} else { + assignLong(array1, i, resultArray, pos) +} +if (assigned) { + pos += 1 +} + } + i += 1 +} +pos + } + + override def nullSafeEval(input1: Any, input2: Any): Any = { +val array1 = input1.asInstanceOf[ArrayData] +val array2 = input2.asInstanceOf[ArrayData] + +if (elementTypeSupportEquals) { + elementType match { +case IntegerType => + // avoid boxing of primitive int array elements + // calculate result array size + hsInt = new OpenHashSet[Int] + val elements = evalIntLongPrimitiveType(array1, array2, null, false) + // allocate result array + hsInt = new OpenHashSet[Int] + val resultArray = if (UnsafeArrayData.shouldUseGenericArrayData( +IntegerType.defaultSize, elements)) { +new GenericArrayData(new Array[Any](elements)) + } else { +UnsafeArrayData.forPrimitiveArray( + Platform.INT_ARRAY_OFFSET, elements, IntegerType.defaultSize) + } + // assign elements into the result array + evalIntLongPrimitiveType(array1, array2, resultArray, false) + resultArray +case LongType => + // avoid boxing of primitive long array elements + // calculate result array size + hsLong = new OpenHashSet[Long] + val elements = evalIntLongPrimitiveType(array1, array2, null, true) + // allocate result array + hsLong = new OpenHashSet[Long] + val resultArray = if (UnsafeArrayData.shouldUseGenericArrayData( +LongType.defaultSize, elements)) { +new GenericArrayData(new Array[Any](elements)) + } else { +UnsafeArrayData.forPrimitiveArray( + Platform.LONG_ARRAY_OFFSET, elements, LongType.defaultSize) + } +
[GitHub] spark pull request #21103: [SPARK-23915][SQL] Add array_except function
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/21103#discussion_r205186075 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -3805,3 +3799,330 @@ object ArrayUnion { new GenericArrayData(arrayBuffer) } } + +/** + * Returns an array of the elements in the intersect of x and y, without duplicates + */ +@ExpressionDescription( + usage = """ + _FUNC_(array1, array2) - Returns an array of the elements in array1 but not in array2, +without duplicates. + """, + examples = """ +Examples: + > SELECT _FUNC_(array(1, 2, 3), array(1, 3, 5)); + array(2) + """, + since = "2.4.0") +case class ArrayExcept(left: Expression, right: Expression) extends ArraySetLike { + override def dataType: DataType = left.dataType + + var hsInt: OpenHashSet[Int] = _ + var hsLong: OpenHashSet[Long] = _ + + def assignInt(array: ArrayData, idx: Int, resultArray: ArrayData, pos: Int): Boolean = { +val elem = array.getInt(idx) +if (!hsInt.contains(elem)) { + if (resultArray != null) { +resultArray.setInt(pos, elem) + } + hsInt.add(elem) + true +} else { + false +} + } + + def assignLong(array: ArrayData, idx: Int, resultArray: ArrayData, pos: Int): Boolean = { +val elem = array.getLong(idx) +if (!hsLong.contains(elem)) { + if (resultArray != null) { +resultArray.setLong(pos, elem) + } + hsLong.add(elem) + true +} else { + false +} + } + + def evalIntLongPrimitiveType( + array1: ArrayData, + array2: ArrayData, + resultArray: ArrayData, + isLongType: Boolean): Int = { +// store elements into resultArray +var notFoundNullElement = true +var i = 0 +while (i < array2.numElements()) { + if (array2.isNullAt(i)) { +notFoundNullElement = false + } else { +val assigned = if (!isLongType) { + hsInt.add(array2.getInt(i)) +} else { + hsLong.add(array2.getLong(i)) +} + } + i += 1 +} +var pos = 0 +i = 0 +while (i < array1.numElements()) { + if (array1.isNullAt(i)) { +if (notFoundNullElement) { + if (resultArray != null) { +resultArray.setNullAt(pos) + } + pos += 1 + notFoundNullElement = false +} + } else { +val assigned = if (!isLongType) { + assignInt(array1, i, resultArray, pos) +} else { + assignLong(array1, i, resultArray, pos) +} +if (assigned) { + pos += 1 +} + } + i += 1 +} +pos + } + + override def nullSafeEval(input1: Any, input2: Any): Any = { +val array1 = input1.asInstanceOf[ArrayData] +val array2 = input2.asInstanceOf[ArrayData] + +if (elementTypeSupportEquals) { + elementType match { +case IntegerType => + // avoid boxing of primitive int array elements + // calculate result array size + hsInt = new OpenHashSet[Int] + val elements = evalIntLongPrimitiveType(array1, array2, null, false) + // allocate result array + hsInt = new OpenHashSet[Int] + val resultArray = if (UnsafeArrayData.shouldUseGenericArrayData( +IntegerType.defaultSize, elements)) { +new GenericArrayData(new Array[Any](elements)) + } else { +UnsafeArrayData.forPrimitiveArray( + Platform.INT_ARRAY_OFFSET, elements, IntegerType.defaultSize) + } + // assign elements into the result array + evalIntLongPrimitiveType(array1, array2, resultArray, false) + resultArray +case LongType => + // avoid boxing of primitive long array elements + // calculate result array size + hsLong = new OpenHashSet[Long] + val elements = evalIntLongPrimitiveType(array1, array2, null, true) + // allocate result array + hsLong = new OpenHashSet[Long] + val resultArray = if (UnsafeArrayData.shouldUseGenericArrayData( +LongType.defaultSize, elements)) { +new GenericArrayData(new Array[Any](elements)) + } else { +UnsafeArrayData.forPrimitiveArray( + Platform.LONG_ARRAY_OFFSET, elements, LongType.defaultSize) + } +
[GitHub] spark issue #21748: [SPARK-23146][K8S] Support client mode.
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21748 Kubernetes integration test status success URL: https://amplab.cs.berkeley.edu/jenkins/job/testing-k8s-prb-make-spark-distribution-unified/1317/ --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21103: [SPARK-23915][SQL] Add array_except function
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/21103#discussion_r205186167 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -3805,3 +3799,330 @@ object ArrayUnion { new GenericArrayData(arrayBuffer) } } + +/** + * Returns an array of the elements in the intersect of x and y, without duplicates + */ +@ExpressionDescription( + usage = """ + _FUNC_(array1, array2) - Returns an array of the elements in array1 but not in array2, +without duplicates. + """, + examples = """ +Examples: + > SELECT _FUNC_(array(1, 2, 3), array(1, 3, 5)); + array(2) + """, + since = "2.4.0") +case class ArrayExcept(left: Expression, right: Expression) extends ArraySetLike { + override def dataType: DataType = left.dataType + + var hsInt: OpenHashSet[Int] = _ + var hsLong: OpenHashSet[Long] = _ + + def assignInt(array: ArrayData, idx: Int, resultArray: ArrayData, pos: Int): Boolean = { +val elem = array.getInt(idx) +if (!hsInt.contains(elem)) { + if (resultArray != null) { +resultArray.setInt(pos, elem) + } + hsInt.add(elem) + true +} else { + false +} + } + + def assignLong(array: ArrayData, idx: Int, resultArray: ArrayData, pos: Int): Boolean = { +val elem = array.getLong(idx) +if (!hsLong.contains(elem)) { + if (resultArray != null) { +resultArray.setLong(pos, elem) + } + hsLong.add(elem) + true +} else { + false +} + } + + def evalIntLongPrimitiveType( + array1: ArrayData, + array2: ArrayData, + resultArray: ArrayData, + isLongType: Boolean): Int = { +// store elements into resultArray +var notFoundNullElement = true +var i = 0 +while (i < array2.numElements()) { + if (array2.isNullAt(i)) { +notFoundNullElement = false + } else { +val assigned = if (!isLongType) { + hsInt.add(array2.getInt(i)) +} else { + hsLong.add(array2.getLong(i)) +} + } + i += 1 +} +var pos = 0 +i = 0 +while (i < array1.numElements()) { + if (array1.isNullAt(i)) { +if (notFoundNullElement) { + if (resultArray != null) { +resultArray.setNullAt(pos) + } + pos += 1 + notFoundNullElement = false +} + } else { +val assigned = if (!isLongType) { + assignInt(array1, i, resultArray, pos) +} else { + assignLong(array1, i, resultArray, pos) +} +if (assigned) { + pos += 1 +} + } + i += 1 +} +pos + } + + override def nullSafeEval(input1: Any, input2: Any): Any = { +val array1 = input1.asInstanceOf[ArrayData] +val array2 = input2.asInstanceOf[ArrayData] + +if (elementTypeSupportEquals) { + elementType match { +case IntegerType => + // avoid boxing of primitive int array elements + // calculate result array size + hsInt = new OpenHashSet[Int] + val elements = evalIntLongPrimitiveType(array1, array2, null, false) + // allocate result array + hsInt = new OpenHashSet[Int] + val resultArray = if (UnsafeArrayData.shouldUseGenericArrayData( +IntegerType.defaultSize, elements)) { +new GenericArrayData(new Array[Any](elements)) + } else { +UnsafeArrayData.forPrimitiveArray( + Platform.INT_ARRAY_OFFSET, elements, IntegerType.defaultSize) + } + // assign elements into the result array + evalIntLongPrimitiveType(array1, array2, resultArray, false) + resultArray +case LongType => + // avoid boxing of primitive long array elements + // calculate result array size + hsLong = new OpenHashSet[Long] + val elements = evalIntLongPrimitiveType(array1, array2, null, true) + // allocate result array + hsLong = new OpenHashSet[Long] + val resultArray = if (UnsafeArrayData.shouldUseGenericArrayData( +LongType.defaultSize, elements)) { +new GenericArrayData(new Array[Any](elements)) + } else { +UnsafeArrayData.forPrimitiveArray( + Platform.LONG_ARRAY_OFFSET, elements, LongType.defaultSize) + } +
[GitHub] spark issue #21748: [SPARK-23146][K8S] Support client mode.
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21748 **[Test build #93551 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/93551/testReport)** for PR 21748 at commit [`ded1ff6`](https://github.com/apache/spark/commit/ded1ff6081da6f0b3879f6bf63b73caf01983bea). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21748: [SPARK-23146][K8S] Support client mode.
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21748 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21873: [SPARK-24919][BUILD] New linter rule for sparkContext.ha...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21873 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21748: [SPARK-23146][K8S] Support client mode.
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21748 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/1317/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21748: [SPARK-23146][K8S] Support client mode.
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21748 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/93551/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21748: [SPARK-23146][K8S] Support client mode.
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21748 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21821: [SPARK-24867] [SQL] Add AnalysisBarrier to DataFrameWrit...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21821 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21821: [SPARK-24867] [SQL] Add AnalysisBarrier to DataFrameWrit...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21821 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/93553/ Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21821: [SPARK-24867] [SQL] Add AnalysisBarrier to DataFrameWrit...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21821 Merged build finished. Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21873: [SPARK-24919][BUILD] New linter rule for sparkContext.ha...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21873 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/1318/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21821: [SPARK-24867] [SQL] Add AnalysisBarrier to DataFrameWrit...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21821 **[Test build #93553 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/93553/testReport)** for PR 21821 at commit [`328addd`](https://github.com/apache/spark/commit/328adddc0c1870400e92934827150df2c98731f6). * This patch **fails Scala style tests**. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21821: [SPARK-24867] [SQL] Add AnalysisBarrier to DataFrameWrit...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21821 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/1319/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21650: [SPARK-24624][SQL][PYTHON] Support mixture of Pyt...
Github user BryanCutler commented on a diff in the pull request: https://github.com/apache/spark/pull/21650#discussion_r205187569 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala --- @@ -94,36 +95,94 @@ object ExtractPythonUDFFromAggregate extends Rule[LogicalPlan] { */ object ExtractPythonUDFs extends Rule[SparkPlan] with PredicateHelper { - private def hasPythonUDF(e: Expression): Boolean = { + private case class LazyEvalType(var evalType: Int = -1) { + +def isSet: Boolean = evalType >= 0 + +def set(evalType: Int): Unit = { + if (isSet) { +throw new IllegalStateException("Eval type has already been set") + } else { +this.evalType = evalType + } +} + +def get(): Int = { + if (!isSet) { +throw new IllegalStateException("Eval type is not set") + } else { +evalType + } +} + } + + private def hasScalarPythonUDF(e: Expression): Boolean = { e.find(PythonUDF.isScalarPythonUDF).isDefined } - private def canEvaluateInPython(e: PythonUDF): Boolean = { -e.children match { - // single PythonUDF child could be chained and evaluated in Python - case Seq(u: PythonUDF) => canEvaluateInPython(u) - // Python UDF can't be evaluated directly in JVM - case children => !children.exists(hasPythonUDF) + /** + * Check whether a PythonUDF expression can be evaluated in Python. + * + * If the lazy eval type is not set, this method checks for either Batched Python UDF and Scalar + * Pandas UDF. If the lazy eval type is set, this method checks for the expression of the + * specified eval type. + * + * This method will also set the lazy eval type to be the type of the first evaluable expression, + * i.e., if lazy eval type is not set and we find a evaluable Python UDF expression, lazy eval + * type will be set to the eval type of the expression. + * + */ + private def canEvaluateInPython(e: PythonUDF, lazyEvalType: LazyEvalType): Boolean = { --- End diff -- The one method seems overly complicated, so I prefer the code from my suggestion. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21850: [SPARK-24892] [SQL] Simplify `CaseWhen` to `If` w...
Github user dbtsai commented on a diff in the pull request: https://github.com/apache/spark/pull/21850#discussion_r205187664 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala --- @@ -414,6 +414,16 @@ object SimplifyConditionals extends Rule[LogicalPlan] with PredicateHelper { // these branches can be pruned away val (h, t) = branches.span(_._1 != TrueLiteral) CaseWhen( h :+ t.head, None) + + case CaseWhen(branches, elseValue) if branches.length == 1 => +// Using pattern matching like `CaseWhen((cond, branchValue) :: Nil, elseValue)` will not +// work since the implementation of `branches` can be `ArrayBuffer`. A full test is in --- End diff -- @ueshin thanks! The code is much more cleaner. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21850: [SPARK-24892] [SQL] Simplify `CaseWhen` to `If` when the...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21850 **[Test build #93554 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/93554/testReport)** for PR 21850 at commit [`e2b0e96`](https://github.com/apache/spark/commit/e2b0e963bd48e3b9361be3d6291f7fcfca4afea7). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21596: [SPARK-24601] Bump Jackson version
Github user Fokko commented on the issue: https://github.com/apache/spark/pull/21596 @gatorsmile Sure, just checking if it still works against recent master :) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21850: [SPARK-24892] [SQL] Simplify `CaseWhen` to `If` when the...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21850 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21850: [SPARK-24892] [SQL] Simplify `CaseWhen` to `If` when the...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21850 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/1320/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21102: [SPARK-23913][SQL] Add array_intersect function
Github user ueshin commented on the issue: https://github.com/apache/spark/pull/21102 Just in my opinion, I'd prefer to preserve the element order of the left array. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21403: [SPARK-24341][SQL] Support only IN subqueries with the s...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21403 **[Test build #93543 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/93543/testReport)** for PR 21403 at commit [`bd008fe`](https://github.com/apache/spark/commit/bd008fe51f70f9925e9513680636f4dd9aadcd7c). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21403: [SPARK-24341][SQL] Support only IN subqueries with the s...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21403 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/93543/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21403: [SPARK-24341][SQL] Support only IN subqueries with the s...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21403 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21596: [SPARK-24601] Bump Jackson version
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21596 **[Test build #93544 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/93544/testReport)** for PR 21596 at commit [`5742678`](https://github.com/apache/spark/commit/5742678da3ca9a900b9e54589d146f0a3f78541f). * This patch **fails Spark unit tests**. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21596: [SPARK-24601] Bump Jackson version
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21596 Merged build finished. Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21596: [SPARK-24601] Bump Jackson version
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21596 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/93544/ Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21650: [SPARK-24624][SQL][PYTHON] Support mixture of Pyt...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/21650#discussion_r205206127 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala --- @@ -94,36 +95,94 @@ object ExtractPythonUDFFromAggregate extends Rule[LogicalPlan] { */ object ExtractPythonUDFs extends Rule[SparkPlan] with PredicateHelper { - private def hasPythonUDF(e: Expression): Boolean = { + private case class LazyEvalType(var evalType: Int = -1) { + +def isSet: Boolean = evalType >= 0 + +def set(evalType: Int): Unit = { + if (isSet) { +throw new IllegalStateException("Eval type has already been set") + } else { +this.evalType = evalType + } +} + +def get(): Int = { + if (!isSet) { +throw new IllegalStateException("Eval type is not set") + } else { +evalType + } +} + } + + private def hasScalarPythonUDF(e: Expression): Boolean = { e.find(PythonUDF.isScalarPythonUDF).isDefined } - private def canEvaluateInPython(e: PythonUDF): Boolean = { -e.children match { - // single PythonUDF child could be chained and evaluated in Python - case Seq(u: PythonUDF) => canEvaluateInPython(u) - // Python UDF can't be evaluated directly in JVM - case children => !children.exists(hasPythonUDF) + /** + * Check whether a PythonUDF expression can be evaluated in Python. + * + * If the lazy eval type is not set, this method checks for either Batched Python UDF and Scalar + * Pandas UDF. If the lazy eval type is set, this method checks for the expression of the + * specified eval type. + * + * This method will also set the lazy eval type to be the type of the first evaluable expression, + * i.e., if lazy eval type is not set and we find a evaluable Python UDF expression, lazy eval + * type will be set to the eval type of the expression. + * + */ + private def canEvaluateInPython(e: PythonUDF, lazyEvalType: LazyEvalType): Boolean = { --- End diff -- In your code: ``` private def canEvaluateInPython(e: PythonUDF, firstEvalType: FirstEvalType): Boolean = { if (firstEvalType.isEvalTypeSet() && e.evalType != firstEvalType.evalType) { false } else { firstEvalType.evalType = e.evalType e.children match { // single PythonUDF child could be chained and evaluated in Python case Seq(u: PythonUDF) => canEvaluateInPython(u, firstEvalType) // Python UDF can't be evaluated directly in JVM case children => !children.exists(hasScalarPythonUDF) } } } ``` I think what's confusing part here is that the value of `firstEvalType.evalType` keeps changing while we are traversing the tree, and we could be carrying the value across independent subtrees (i.e., after finish traversing one subtree, the firstEvalType can be set to Scalar Pandas, even we didn't find a evaluable UDF and we never reset it so when we visit another subtree, we could get wrong results). The fact that the evalType keeps changing as we traverse the tree seems very error prone to me. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21821: [SPARK-24867] [SQL] Add AnalysisBarrier to DataFrameWrit...
Github user mgaido91 commented on the issue: https://github.com/apache/spark/pull/21821 Is this still valid since https://github.com/apache/spark/pull/21822 is going on? Shall we have this only on 2.3 main branches? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21834: [SPARK-22814][SQL] Support Date/Timestamp in a JDBC part...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21834 **[Test build #93542 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/93542/testReport)** for PR 21834 at commit [`577f66e`](https://github.com/apache/spark/commit/577f66e24222190cd1d7c78b50bd7a2ba17189fe). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21834: [SPARK-22814][SQL] Support Date/Timestamp in a JDBC part...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21834 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21834: [SPARK-22814][SQL] Support Date/Timestamp in a JDBC part...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21834 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/93542/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21821: [SPARK-24867] [SQL] Add AnalysisBarrier to DataFrameWrit...
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/21821 @mgaido91 See the comment https://github.com/apache/spark/pull/21821#issuecomment-407096818 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21821: [SPARK-24867] [SQL] Add AnalysisBarrier to DataFrameWrit...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21821 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21821: [SPARK-24867] [SQL] Add AnalysisBarrier to DataFrameWrit...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21821 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/1321/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21821: [SPARK-24867] [SQL] Add AnalysisBarrier to DataFrameWrit...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21821 **[Test build #93555 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/93555/testReport)** for PR 21821 at commit [`ddbd9f7`](https://github.com/apache/spark/commit/ddbd9f7c796e8bedfbae3141c9c7098370c217ce). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21821: [SPARK-24867] [SQL] Add AnalysisBarrier to DataFrameWrit...
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/21821 This PR is majorly for Spark 2.3 branch. The code changes will be removed from the master branch when https://github.com/apache/spark/pull/21822 is merged. However, the test cases will be valid. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21803: [SPARK-24849][SPARK-24911][SQL] Converting a value of St...
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/21803 LGTM Thanks! Merged to master --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21650: [SPARK-24624][SQL][PYTHON] Support mixture of Python UDF...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21650 **[Test build #93546 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/93546/testReport)** for PR 21650 at commit [`2bc906d`](https://github.com/apache/spark/commit/2bc906de5a12dcc452e6855aa30d27021c446e17). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21650: [SPARK-24624][SQL][PYTHON] Support mixture of Python UDF...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21650 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21650: [SPARK-24624][SQL][PYTHON] Support mixture of Python UDF...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21650 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/93546/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21803: [SPARK-24849][SPARK-24911][SQL] Converting a valu...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/21803 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21748: [SPARK-23146][K8S] Support client mode.
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/21748 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21748: [SPARK-23146][K8S] Support client mode.
Github user ifilonenko commented on the issue: https://github.com/apache/spark/pull/21748 @mccheah the integration tests did not include the ClientModeTestsSuite. Can you add `with ClientModeTestsSuite` else, the PRB doesn't actually test the client mode support accurately. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21306: [SPARK-24252][SQL] Add catalog registration and table ca...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21306 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/1322/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21306: [SPARK-24252][SQL] Add catalog registration and table ca...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21306 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21221: [SPARK-23429][CORE] Add executor memory metrics to heart...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21221 **[Test build #93557 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/93557/testReport)** for PR 21221 at commit [`20799d2`](https://github.com/apache/spark/commit/20799d2af7b70334534be913f7defea6d6b79ffb). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20405: [SPARK-23229][SQL] Dataset.hint should use planWithBarri...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20405 **[Test build #93559 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/93559/testReport)** for PR 20405 at commit [`47bb245`](https://github.com/apache/spark/commit/47bb245353202208f2c41634c3796c8e4d2be663). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21306: [SPARK-24252][SQL] Add catalog registration and table ca...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21306 **[Test build #93556 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/93556/testReport)** for PR 21306 at commit [`46100f3`](https://github.com/apache/spark/commit/46100f3fc7c51b86bbdc03fcc7d7b3388748f698). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20414: [SPARK-23243][SQL] Shuffle+Repartition on an RDD could l...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20414 **[Test build #93558 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/93558/testReport)** for PR 20414 at commit [`6910ed6`](https://github.com/apache/spark/commit/6910ed62c272bedfa251cab589bb52bed36be3ed). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21403: [SPARK-24341][SQL] Support only IN subqueries wit...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/21403#discussion_r205213736 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala --- @@ -247,6 +249,20 @@ class Analyzer( } } + /** + * Substitutes In values with an instance of [[InValues]]. + */ + object ResolveInValues extends Rule[LogicalPlan] { +def apply(plan: LogicalPlan): LogicalPlan = plan transformUp { --- End diff -- -> `plan.resolveOperators` Let us wait for https://github.com/apache/spark/pull/21822. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21403: [SPARK-24341][SQL] Support only IN subqueries wit...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/21403#discussion_r205214601 --- Diff: sql/core/src/test/resources/sql-tests/results/subquery/negative-cases/subq-input-typecheck.sql.out --- @@ -113,15 +105,7 @@ WHERE struct<> -- !query 8 output org.apache.spark.sql.AnalysisException -cannot resolve '(named_struct('t1a', t1.`t1a`, 't1b', t1.`t1b`) IN (listquery(t1.`t1a`)))' due to data type mismatch: -The number of columns in the left hand side of an IN subquery does not match the -number of columns in the output of subquery. -#columns in left hand side: 2. -#columns in right hand side: 1. -Left side columns: -[t1.`t1a`, t1.`t1b`]. -Right side columns: -[t2.`t2a`].; --- End diff -- Also output the message from line 117 to 124 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21403: [SPARK-24341][SQL] Support only IN subqueries wit...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/21403#discussion_r205214861 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala --- @@ -2320,6 +2320,27 @@ class DataFrameSuite extends QueryTest with SharedSQLContext { assert(df.queryExecution.executedPlan.isInstanceOf[WholeStageCodegenExec]) } + test("SPARK-24341: IN subqueries with struct fields") { --- End diff -- Yes. Please move it there, if they are not duplicate. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21850: [SPARK-24892] [SQL] Simplify `CaseWhen` to `If` w...
Github user dongjoon-hyun commented on a diff in the pull request: https://github.com/apache/spark/pull/21850#discussion_r205215698 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala --- @@ -414,6 +414,9 @@ object SimplifyConditionals extends Rule[LogicalPlan] with PredicateHelper { // these branches can be pruned away val (h, t) = branches.span(_._1 != TrueLiteral) CaseWhen( h :+ t.head, None) + + case CaseWhen((cond, branchValue) :: Nil, elseValue) => +If(cond, branchValue, elseValue.getOrElse(Literal(null, branchValue.dataType))) --- End diff -- Also, `CaseWhen` has additional `project_project_value_0_0` at outside. ``` /* 006 */ final class GeneratedIteratorForCodegenStage1 extends org.apache.spark.sql.execution.BufferedRowIterator { /* 007 */ private Object[] references; /* 008 */ private scala.collection.Iterator[] inputs; /* 009 */ private scala.collection.Iterator inputadapter_input_0; /* 010 */ private int project_project_value_0_0; ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org