[spark] branch branch-3.0 updated: [SPARK-31854][SQL] Invoke in MapElementsExec should not propagate null
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.0 by this push: new efa0269 [SPARK-31854][SQL] Invoke in MapElementsExec should not propagate null efa0269 is described below commit efa0269080cb7f6e2591caedcdac554beaf2661b Author: Takeshi Yamamuro AuthorDate: Mon Jun 1 04:50:00 2020 + [SPARK-31854][SQL] Invoke in MapElementsExec should not propagate null This PR intends to fix a bug of `Dataset.map` below when the whole-stage codegen enabled; ``` scala> val ds = Seq(1.asInstanceOf[Integer], null.asInstanceOf[Integer]).toDS() scala> sql("SET spark.sql.codegen.wholeStage=true") scala> ds.map(v=>(v,v)).explain == Physical Plan == *(1) SerializeFromObject [assertnotnull(input[0, scala.Tuple2, true])._1.intValue AS _1#69, assertnotnull(input[0, scala.Tuple2, true])._2.intValue AS _2#70] +- *(1) MapElements , obj#68: scala.Tuple2 +- *(1) DeserializeToObject staticinvoke(class java.lang.Integer, ObjectType(class java.lang.Integer), valueOf, value#1, true, false), obj#67: java.lang.Integer +- LocalTableScan [value#1] // `AssertNotNull` in `SerializeFromObject` will fail; scala> ds.map(v => (v, v)).show() java.lang.NullPointerException: Null value appeared in non-nullable fails: top level Product input object If the schema is inferred from a Scala tuple/case class, or a Java bean, please try to use scala.Option[_] or other nullable types (e.g. java.lang.Integer instead of int/scala.Int). // When the whole-stage codegen disabled, the query works well; scala> sql("SET spark.sql.codegen.wholeStage=false") scala> ds.map(v=>(v,v)).show() +++ | _1| _2| +++ | 1| 1| |null|null| +++ ``` A root cause is that `Invoke` used in `MapElementsExec` propagates input null, and then [AssertNotNull](https://github.com/apache/spark/blob/1b780f364bfbb46944fe805a024bb6c32f5d2dde/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala#L253-L255) in `SerializeFromObject` fails because a top-level row becomes null. So, `MapElementsExec` should not return `null` but `(null, null)`. NOTE: the generated code of the query above in the current master; ``` /* 033 */ private void mapelements_doConsume_0(java.lang.Integer mapelements_expr_0_0, boolean mapelements_exprIsNull_0_0) throws java.io.IOException { /* 034 */ boolean mapelements_isNull_1 = true; /* 035 */ scala.Tuple2 mapelements_value_1 = null; /* 036 */ if (!false) { /* 037 */ mapelements_resultIsNull_0 = false; /* 038 */ /* 039 */ if (!mapelements_resultIsNull_0) { /* 040 */ mapelements_resultIsNull_0 = mapelements_exprIsNull_0_0; /* 041 */ mapelements_mutableStateArray_0[0] = mapelements_expr_0_0; /* 042 */ } /* 043 */ /* 044 */ mapelements_isNull_1 = mapelements_resultIsNull_0; /* 045 */ if (!mapelements_isNull_1) { /* 046 */ Object mapelements_funcResult_0 = null; /* 047 */ mapelements_funcResult_0 = ((scala.Function1) references[1] /* literal */).apply(mapelements_mutableStateArray_0[0]); /* 048 */ /* 049 */ if (mapelements_funcResult_0 != null) { /* 050 */ mapelements_value_1 = (scala.Tuple2) mapelements_funcResult_0; /* 051 */ } else { /* 052 */ mapelements_isNull_1 = true; /* 053 */ } /* 054 */ /* 055 */ } /* 056 */ } /* 057 */ /* 058 */ serializefromobject_doConsume_0(mapelements_value_1, mapelements_isNull_1); /* 059 */ /* 060 */ } ``` The generated code w/ this fix; ``` /* 032 */ private void mapelements_doConsume_0(java.lang.Integer mapelements_expr_0_0, boolean mapelements_exprIsNull_0_0) throws java.io.IOException { /* 033 */ boolean mapelements_isNull_1 = true; /* 034 */ scala.Tuple2 mapelements_value_1 = null; /* 035 */ if (!false) { /* 036 */ mapelements_mutableStateArray_0[0] = mapelements_expr_0_0; /* 037 */ /* 038 */ mapelements_isNull_1 = false; /* 039 */ if (!mapelements_isNull_1) { /* 040 */ Object mapelements_funcResult_0 = null; /* 041 */ mapelements_funcResult_0 = ((scala.Function1) references[1] /* literal */).apply(mapelements_mutableStateArray_0[0]); /* 042 */ /* 043 */ if (mapelements_funcResult_0 != null) { /* 044 */ mapelements_value_1 = (scala.Tuple2) mapelements_funcResult_0; /* 045 */ mapelements_isNull_1 = false; /* 046 */ } else { /* 047 */ mapelements_isNull_1 = true; /* 048 */ } /* 049 */
[spark] branch master updated (e694660 -> b806fc4)
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from e694660 [SPARK-31849][PYTHON][SQL] Make PySpark SQL exceptions more Pythonic add b806fc4 [SPARK-31854][SQL] Invoke in MapElementsExec should not propagate null No new revisions were added by this update. Summary of changes: .../main/scala/org/apache/spark/sql/execution/objects.scala| 4 ++-- .../src/test/scala/org/apache/spark/sql/DatasetSuite.scala | 10 ++ 2 files changed, 12 insertions(+), 2 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (e694660 -> b806fc4)
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from e694660 [SPARK-31849][PYTHON][SQL] Make PySpark SQL exceptions more Pythonic add b806fc4 [SPARK-31854][SQL] Invoke in MapElementsExec should not propagate null No new revisions were added by this update. Summary of changes: .../main/scala/org/apache/spark/sql/execution/objects.scala| 4 ++-- .../src/test/scala/org/apache/spark/sql/DatasetSuite.scala | 10 ++ 2 files changed, 12 insertions(+), 2 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-3.0 updated: [SPARK-31849][PYTHON][SQL] Make PySpark SQL exceptions more Pythonic
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.0 by this push: new a5a8ec2 [SPARK-31849][PYTHON][SQL] Make PySpark SQL exceptions more Pythonic a5a8ec2 is described below commit a5a8ec2ca8e75207b4e6c7b76ab6214be4a4237e Author: HyukjinKwon AuthorDate: Mon Jun 1 09:45:21 2020 +0900 [SPARK-31849][PYTHON][SQL] Make PySpark SQL exceptions more Pythonic ### What changes were proposed in this pull request? This PR proposes to make PySpark exception more Pythonic by hiding JVM stacktrace by default. It can be enabled by turning on `spark.sql.pyspark.jvmStacktrace.enabled` configuration. ``` Traceback (most recent call last): ... pyspark.sql.utils.PythonException: An exception was thrown from Python worker in the executor. The below is the Python worker stacktrace. Traceback (most recent call last): ... ``` If this `spark.sql.pyspark.jvmStacktrace.enabled` is enabled, it appends: ``` JVM stacktrace: org.apache.spark.Exception: ... ... ``` For example, the codes below: ```python from pyspark.sql.functions import udf udf def divide_by_zero(v): raise v / 0 spark.range(1).select(divide_by_zero("id")).show() ``` will show an error messages that looks like Python exception thrown from the local. Python exception message when spark.sql.pyspark.jvmStacktrace.enabled is off (default) ``` Traceback (most recent call last): File "", line 1, in File "/.../spark/python/pyspark/sql/dataframe.py", line 427, in show print(self._jdf.showString(n, 20, vertical)) File "/.../spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1305, in __call__ File "/.../spark/python/pyspark/sql/utils.py", line 131, in deco raise_from(converted) File "", line 3, in raise_from pyspark.sql.utils.PythonException: An exception was thrown from Python worker in the executor. The below is the Python worker stacktrace. Traceback (most recent call last): File "/.../spark/python/lib/pyspark.zip/pyspark/worker.py", line 605, in main process() File "/.../spark/python/lib/pyspark.zip/pyspark/worker.py", line 597, in process serializer.dump_stream(out_iter, outfile) File "/.../spark/python/lib/pyspark.zip/pyspark/serializers.py", line 223, in dump_stream self.serializer.dump_stream(self._batched(iterator), stream) File "/.../spark/python/lib/pyspark.zip/pyspark/serializers.py", line 141, in dump_stream for obj in iterator: File "/.../spark/python/lib/pyspark.zip/pyspark/serializers.py", line 212, in _batched for item in iterator: File "/.../spark/python/lib/pyspark.zip/pyspark/worker.py", line 450, in mapper result = tuple(f(*[a[o] for o in arg_offsets]) for (arg_offsets, f) in udfs) File "/.../spark/python/lib/pyspark.zip/pyspark/worker.py", line 450, in result = tuple(f(*[a[o] for o in arg_offsets]) for (arg_offsets, f) in udfs) File "/.../spark/python/lib/pyspark.zip/pyspark/worker.py", line 90, in return lambda *a: f(*a) File "/.../spark/python/lib/pyspark.zip/pyspark/util.py", line 107, in wrapper return f(*args, **kwargs) File "", line 3, in divide_by_zero ZeroDivisionError: division by zero ``` Python exception message when spark.sql.pyspark.jvmStacktrace.enabled is on ``` Traceback (most recent call last): File "", line 1, in File "/.../spark/python/pyspark/sql/dataframe.py", line 427, in show print(self._jdf.showString(n, 20, vertical)) File "/.../spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1305, in __call__ File "/.../spark/python/pyspark/sql/utils.py", line 137, in deco raise_from(converted) File "", line 3, in raise_from pyspark.sql.utils.PythonException: An exception was thrown from Python worker in the executor. The below is the Python worker stacktrace. Traceback (most recent call last): File "/.../spark/python/lib/pyspark.zip/pyspark/worker.py", line 605, in main process() File "/.../spark/python/lib/pyspark.zip/pyspark/worker.py", line 597, in process serializer.dump_stream(out_iter, outfile) File "/.../spark/python/lib/pyspark.zip/pyspark/serializers.py", line 223, in dump_stream self.serializer.dump_stream(self._batched(iterator), stream) File "/.../spark/python/lib/pyspark.zip/pyspark/serializers.py", line 141, in dump_stream for obj in iterator: File "/.../spark/python/lib/pyspark.zip/pyspark/serializers.py", line 212, in _batched for item in
[spark] branch branch-3.0 updated: [SPARK-31849][PYTHON][SQL] Make PySpark SQL exceptions more Pythonic
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.0 by this push: new a5a8ec2 [SPARK-31849][PYTHON][SQL] Make PySpark SQL exceptions more Pythonic a5a8ec2 is described below commit a5a8ec2ca8e75207b4e6c7b76ab6214be4a4237e Author: HyukjinKwon AuthorDate: Mon Jun 1 09:45:21 2020 +0900 [SPARK-31849][PYTHON][SQL] Make PySpark SQL exceptions more Pythonic ### What changes were proposed in this pull request? This PR proposes to make PySpark exception more Pythonic by hiding JVM stacktrace by default. It can be enabled by turning on `spark.sql.pyspark.jvmStacktrace.enabled` configuration. ``` Traceback (most recent call last): ... pyspark.sql.utils.PythonException: An exception was thrown from Python worker in the executor. The below is the Python worker stacktrace. Traceback (most recent call last): ... ``` If this `spark.sql.pyspark.jvmStacktrace.enabled` is enabled, it appends: ``` JVM stacktrace: org.apache.spark.Exception: ... ... ``` For example, the codes below: ```python from pyspark.sql.functions import udf udf def divide_by_zero(v): raise v / 0 spark.range(1).select(divide_by_zero("id")).show() ``` will show an error messages that looks like Python exception thrown from the local. Python exception message when spark.sql.pyspark.jvmStacktrace.enabled is off (default) ``` Traceback (most recent call last): File "", line 1, in File "/.../spark/python/pyspark/sql/dataframe.py", line 427, in show print(self._jdf.showString(n, 20, vertical)) File "/.../spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1305, in __call__ File "/.../spark/python/pyspark/sql/utils.py", line 131, in deco raise_from(converted) File "", line 3, in raise_from pyspark.sql.utils.PythonException: An exception was thrown from Python worker in the executor. The below is the Python worker stacktrace. Traceback (most recent call last): File "/.../spark/python/lib/pyspark.zip/pyspark/worker.py", line 605, in main process() File "/.../spark/python/lib/pyspark.zip/pyspark/worker.py", line 597, in process serializer.dump_stream(out_iter, outfile) File "/.../spark/python/lib/pyspark.zip/pyspark/serializers.py", line 223, in dump_stream self.serializer.dump_stream(self._batched(iterator), stream) File "/.../spark/python/lib/pyspark.zip/pyspark/serializers.py", line 141, in dump_stream for obj in iterator: File "/.../spark/python/lib/pyspark.zip/pyspark/serializers.py", line 212, in _batched for item in iterator: File "/.../spark/python/lib/pyspark.zip/pyspark/worker.py", line 450, in mapper result = tuple(f(*[a[o] for o in arg_offsets]) for (arg_offsets, f) in udfs) File "/.../spark/python/lib/pyspark.zip/pyspark/worker.py", line 450, in result = tuple(f(*[a[o] for o in arg_offsets]) for (arg_offsets, f) in udfs) File "/.../spark/python/lib/pyspark.zip/pyspark/worker.py", line 90, in return lambda *a: f(*a) File "/.../spark/python/lib/pyspark.zip/pyspark/util.py", line 107, in wrapper return f(*args, **kwargs) File "", line 3, in divide_by_zero ZeroDivisionError: division by zero ``` Python exception message when spark.sql.pyspark.jvmStacktrace.enabled is on ``` Traceback (most recent call last): File "", line 1, in File "/.../spark/python/pyspark/sql/dataframe.py", line 427, in show print(self._jdf.showString(n, 20, vertical)) File "/.../spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1305, in __call__ File "/.../spark/python/pyspark/sql/utils.py", line 137, in deco raise_from(converted) File "", line 3, in raise_from pyspark.sql.utils.PythonException: An exception was thrown from Python worker in the executor. The below is the Python worker stacktrace. Traceback (most recent call last): File "/.../spark/python/lib/pyspark.zip/pyspark/worker.py", line 605, in main process() File "/.../spark/python/lib/pyspark.zip/pyspark/worker.py", line 597, in process serializer.dump_stream(out_iter, outfile) File "/.../spark/python/lib/pyspark.zip/pyspark/serializers.py", line 223, in dump_stream self.serializer.dump_stream(self._batched(iterator), stream) File "/.../spark/python/lib/pyspark.zip/pyspark/serializers.py", line 141, in dump_stream for obj in iterator: File "/.../spark/python/lib/pyspark.zip/pyspark/serializers.py", line 212, in _batched for item in
[spark] branch master updated (29c51d6 -> e694660)
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 29c51d6 [SPARK-31788][CORE][DSTREAM][PYTHON] Recover the support of union for different types of RDD and DStreams add e694660 [SPARK-31849][PYTHON][SQL] Make PySpark SQL exceptions more Pythonic No new revisions were added by this update. Summary of changes: python/pyspark/sql/tests/test_pandas_udf.py| 14 +++--- python/pyspark/sql/utils.py| 53 ++ .../org/apache/spark/sql/internal/SQLConf.scala| 11 + 3 files changed, 61 insertions(+), 17 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-3.0 updated: [SPARK-31849][PYTHON][SQL] Make PySpark SQL exceptions more Pythonic
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.0 by this push: new a5a8ec2 [SPARK-31849][PYTHON][SQL] Make PySpark SQL exceptions more Pythonic a5a8ec2 is described below commit a5a8ec2ca8e75207b4e6c7b76ab6214be4a4237e Author: HyukjinKwon AuthorDate: Mon Jun 1 09:45:21 2020 +0900 [SPARK-31849][PYTHON][SQL] Make PySpark SQL exceptions more Pythonic ### What changes were proposed in this pull request? This PR proposes to make PySpark exception more Pythonic by hiding JVM stacktrace by default. It can be enabled by turning on `spark.sql.pyspark.jvmStacktrace.enabled` configuration. ``` Traceback (most recent call last): ... pyspark.sql.utils.PythonException: An exception was thrown from Python worker in the executor. The below is the Python worker stacktrace. Traceback (most recent call last): ... ``` If this `spark.sql.pyspark.jvmStacktrace.enabled` is enabled, it appends: ``` JVM stacktrace: org.apache.spark.Exception: ... ... ``` For example, the codes below: ```python from pyspark.sql.functions import udf udf def divide_by_zero(v): raise v / 0 spark.range(1).select(divide_by_zero("id")).show() ``` will show an error messages that looks like Python exception thrown from the local. Python exception message when spark.sql.pyspark.jvmStacktrace.enabled is off (default) ``` Traceback (most recent call last): File "", line 1, in File "/.../spark/python/pyspark/sql/dataframe.py", line 427, in show print(self._jdf.showString(n, 20, vertical)) File "/.../spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1305, in __call__ File "/.../spark/python/pyspark/sql/utils.py", line 131, in deco raise_from(converted) File "", line 3, in raise_from pyspark.sql.utils.PythonException: An exception was thrown from Python worker in the executor. The below is the Python worker stacktrace. Traceback (most recent call last): File "/.../spark/python/lib/pyspark.zip/pyspark/worker.py", line 605, in main process() File "/.../spark/python/lib/pyspark.zip/pyspark/worker.py", line 597, in process serializer.dump_stream(out_iter, outfile) File "/.../spark/python/lib/pyspark.zip/pyspark/serializers.py", line 223, in dump_stream self.serializer.dump_stream(self._batched(iterator), stream) File "/.../spark/python/lib/pyspark.zip/pyspark/serializers.py", line 141, in dump_stream for obj in iterator: File "/.../spark/python/lib/pyspark.zip/pyspark/serializers.py", line 212, in _batched for item in iterator: File "/.../spark/python/lib/pyspark.zip/pyspark/worker.py", line 450, in mapper result = tuple(f(*[a[o] for o in arg_offsets]) for (arg_offsets, f) in udfs) File "/.../spark/python/lib/pyspark.zip/pyspark/worker.py", line 450, in result = tuple(f(*[a[o] for o in arg_offsets]) for (arg_offsets, f) in udfs) File "/.../spark/python/lib/pyspark.zip/pyspark/worker.py", line 90, in return lambda *a: f(*a) File "/.../spark/python/lib/pyspark.zip/pyspark/util.py", line 107, in wrapper return f(*args, **kwargs) File "", line 3, in divide_by_zero ZeroDivisionError: division by zero ``` Python exception message when spark.sql.pyspark.jvmStacktrace.enabled is on ``` Traceback (most recent call last): File "", line 1, in File "/.../spark/python/pyspark/sql/dataframe.py", line 427, in show print(self._jdf.showString(n, 20, vertical)) File "/.../spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1305, in __call__ File "/.../spark/python/pyspark/sql/utils.py", line 137, in deco raise_from(converted) File "", line 3, in raise_from pyspark.sql.utils.PythonException: An exception was thrown from Python worker in the executor. The below is the Python worker stacktrace. Traceback (most recent call last): File "/.../spark/python/lib/pyspark.zip/pyspark/worker.py", line 605, in main process() File "/.../spark/python/lib/pyspark.zip/pyspark/worker.py", line 597, in process serializer.dump_stream(out_iter, outfile) File "/.../spark/python/lib/pyspark.zip/pyspark/serializers.py", line 223, in dump_stream self.serializer.dump_stream(self._batched(iterator), stream) File "/.../spark/python/lib/pyspark.zip/pyspark/serializers.py", line 141, in dump_stream for obj in iterator: File "/.../spark/python/lib/pyspark.zip/pyspark/serializers.py", line 212, in _batched for item in
[spark] branch master updated (29c51d6 -> e694660)
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 29c51d6 [SPARK-31788][CORE][DSTREAM][PYTHON] Recover the support of union for different types of RDD and DStreams add e694660 [SPARK-31849][PYTHON][SQL] Make PySpark SQL exceptions more Pythonic No new revisions were added by this update. Summary of changes: python/pyspark/sql/tests/test_pandas_udf.py| 14 +++--- python/pyspark/sql/utils.py| 53 ++ .../org/apache/spark/sql/internal/SQLConf.scala| 11 + 3 files changed, 61 insertions(+), 17 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-3.0 updated: [SPARK-31788][CORE][DSTREAM][PYTHON] Recover the support of union for different types of RDD and DStreams
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.0 by this push: new bd7f5da [SPARK-31788][CORE][DSTREAM][PYTHON] Recover the support of union for different types of RDD and DStreams bd7f5da is described below commit bd7f5da3dfa0ce3edda0c9864cd0f89db744277f Author: HyukjinKwon AuthorDate: Mon Jun 1 09:43:03 2020 +0900 [SPARK-31788][CORE][DSTREAM][PYTHON] Recover the support of union for different types of RDD and DStreams ### What changes were proposed in this pull request? This PR manually specifies the class for the input array being used in `(SparkContext|StreamingContext).union`. It fixes a regression introduced from SPARK-25737. ```python rdd1 = sc.parallelize([1,2,3,4,5]) rdd2 = sc.parallelize([6,7,8,9,10]) pairRDD1 = rdd1.zip(rdd2) sc.union([pairRDD1, pairRDD1]).collect() ``` in the current master and `branch-3.0`: ``` Traceback (most recent call last): File "", line 1, in File "/.../spark/python/pyspark/context.py", line 870, in union jrdds[i] = rdds[i]._jrdd File "/.../spark/python/lib/py4j-0.10.9-src.zip/py4j/java_collections.py", line 238, in __setitem__ File "/.../spark/python/lib/py4j-0.10.9-src.zip/py4j/java_collections.py", line 221, in __set_item File "/.../spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py", line 332, in get_return_value py4j.protocol.Py4JError: An error occurred while calling None.None. Trace: py4j.Py4JException: Cannot convert org.apache.spark.api.java.JavaPairRDD to org.apache.spark.api.java.JavaRDD at py4j.commands.ArrayCommand.convertArgument(ArrayCommand.java:166) at py4j.commands.ArrayCommand.setArray(ArrayCommand.java:144) at py4j.commands.ArrayCommand.execute(ArrayCommand.java:97) at py4j.GatewayConnection.run(GatewayConnection.java:238) at java.lang.Thread.run(Thread.java:748) ``` which works in Spark 2.4.5: ``` [(1, 6), (2, 7), (3, 8), (4, 9), (5, 10), (1, 6), (2, 7), (3, 8), (4, 9), (5, 10)] ``` It assumed the class of the input array is the same `JavaRDD` or `JavaDStream`; however, that can be different such as `JavaPairRDD`. This fix is based on redsanket's initial approach, and will be co-authored. ### Why are the changes needed? To fix a regression from Spark 2.4.5. ### Does this PR introduce _any_ user-facing change? No, it's only in unreleased branches. This is to fix a regression. ### How was this patch tested? Manually tested, and a unittest was added. Closes #28648 from HyukjinKwon/SPARK-31788. Authored-by: HyukjinKwon Signed-off-by: HyukjinKwon (cherry picked from commit 29c51d682b3735123f78cf9cb8610522a9bb86fd) Signed-off-by: HyukjinKwon --- python/pyspark/context.py | 18 -- python/pyspark/streaming/context.py | 15 --- python/pyspark/tests/test_rdd.py| 11 +++ 3 files changed, 39 insertions(+), 5 deletions(-) diff --git a/python/pyspark/context.py b/python/pyspark/context.py index d5f1506..81b6caa 100644 --- a/python/pyspark/context.py +++ b/python/pyspark/context.py @@ -25,6 +25,7 @@ from threading import RLock from tempfile import NamedTemporaryFile from py4j.protocol import Py4JError +from py4j.java_gateway import is_instance_of from pyspark import accumulators from pyspark.accumulators import Accumulator @@ -864,8 +865,21 @@ class SparkContext(object): first_jrdd_deserializer = rdds[0]._jrdd_deserializer if any(x._jrdd_deserializer != first_jrdd_deserializer for x in rdds): rdds = [x._reserialize() for x in rdds] -cls = SparkContext._jvm.org.apache.spark.api.java.JavaRDD -jrdds = SparkContext._gateway.new_array(cls, len(rdds)) +gw = SparkContext._gateway +jvm = SparkContext._jvm +jrdd_cls = jvm.org.apache.spark.api.java.JavaRDD +jpair_rdd_cls = jvm.org.apache.spark.api.java.JavaPairRDD +jdouble_rdd_cls = jvm.org.apache.spark.api.java.JavaDoubleRDD +if is_instance_of(gw, rdds[0]._jrdd, jrdd_cls): +cls = jrdd_cls +elif is_instance_of(gw, rdds[0]._jrdd, jpair_rdd_cls): +cls = jpair_rdd_cls +elif is_instance_of(gw, rdds[0]._jrdd, jdouble_rdd_cls): +cls = jdouble_rdd_cls +else: +cls_name = rdds[0]._jrdd.getClass().getCanonicalName() +raise TypeError("Unsupported Java RDD class %s" % cls_name) +jrdds = gw.new_array(cls, len(rdds)) for i in range(0, len(rdds)): jrdds[i] = rdds[i]._jrdd return RDD(self._jsc.union(jrdds), self, rdds[0]._jrdd_deserializer) diff --git a/python/pyspark/s
[spark] branch branch-3.0 updated: [SPARK-31849][PYTHON][SQL] Make PySpark SQL exceptions more Pythonic
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.0 by this push: new a5a8ec2 [SPARK-31849][PYTHON][SQL] Make PySpark SQL exceptions more Pythonic a5a8ec2 is described below commit a5a8ec2ca8e75207b4e6c7b76ab6214be4a4237e Author: HyukjinKwon AuthorDate: Mon Jun 1 09:45:21 2020 +0900 [SPARK-31849][PYTHON][SQL] Make PySpark SQL exceptions more Pythonic ### What changes were proposed in this pull request? This PR proposes to make PySpark exception more Pythonic by hiding JVM stacktrace by default. It can be enabled by turning on `spark.sql.pyspark.jvmStacktrace.enabled` configuration. ``` Traceback (most recent call last): ... pyspark.sql.utils.PythonException: An exception was thrown from Python worker in the executor. The below is the Python worker stacktrace. Traceback (most recent call last): ... ``` If this `spark.sql.pyspark.jvmStacktrace.enabled` is enabled, it appends: ``` JVM stacktrace: org.apache.spark.Exception: ... ... ``` For example, the codes below: ```python from pyspark.sql.functions import udf udf def divide_by_zero(v): raise v / 0 spark.range(1).select(divide_by_zero("id")).show() ``` will show an error messages that looks like Python exception thrown from the local. Python exception message when spark.sql.pyspark.jvmStacktrace.enabled is off (default) ``` Traceback (most recent call last): File "", line 1, in File "/.../spark/python/pyspark/sql/dataframe.py", line 427, in show print(self._jdf.showString(n, 20, vertical)) File "/.../spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1305, in __call__ File "/.../spark/python/pyspark/sql/utils.py", line 131, in deco raise_from(converted) File "", line 3, in raise_from pyspark.sql.utils.PythonException: An exception was thrown from Python worker in the executor. The below is the Python worker stacktrace. Traceback (most recent call last): File "/.../spark/python/lib/pyspark.zip/pyspark/worker.py", line 605, in main process() File "/.../spark/python/lib/pyspark.zip/pyspark/worker.py", line 597, in process serializer.dump_stream(out_iter, outfile) File "/.../spark/python/lib/pyspark.zip/pyspark/serializers.py", line 223, in dump_stream self.serializer.dump_stream(self._batched(iterator), stream) File "/.../spark/python/lib/pyspark.zip/pyspark/serializers.py", line 141, in dump_stream for obj in iterator: File "/.../spark/python/lib/pyspark.zip/pyspark/serializers.py", line 212, in _batched for item in iterator: File "/.../spark/python/lib/pyspark.zip/pyspark/worker.py", line 450, in mapper result = tuple(f(*[a[o] for o in arg_offsets]) for (arg_offsets, f) in udfs) File "/.../spark/python/lib/pyspark.zip/pyspark/worker.py", line 450, in result = tuple(f(*[a[o] for o in arg_offsets]) for (arg_offsets, f) in udfs) File "/.../spark/python/lib/pyspark.zip/pyspark/worker.py", line 90, in return lambda *a: f(*a) File "/.../spark/python/lib/pyspark.zip/pyspark/util.py", line 107, in wrapper return f(*args, **kwargs) File "", line 3, in divide_by_zero ZeroDivisionError: division by zero ``` Python exception message when spark.sql.pyspark.jvmStacktrace.enabled is on ``` Traceback (most recent call last): File "", line 1, in File "/.../spark/python/pyspark/sql/dataframe.py", line 427, in show print(self._jdf.showString(n, 20, vertical)) File "/.../spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1305, in __call__ File "/.../spark/python/pyspark/sql/utils.py", line 137, in deco raise_from(converted) File "", line 3, in raise_from pyspark.sql.utils.PythonException: An exception was thrown from Python worker in the executor. The below is the Python worker stacktrace. Traceback (most recent call last): File "/.../spark/python/lib/pyspark.zip/pyspark/worker.py", line 605, in main process() File "/.../spark/python/lib/pyspark.zip/pyspark/worker.py", line 597, in process serializer.dump_stream(out_iter, outfile) File "/.../spark/python/lib/pyspark.zip/pyspark/serializers.py", line 223, in dump_stream self.serializer.dump_stream(self._batched(iterator), stream) File "/.../spark/python/lib/pyspark.zip/pyspark/serializers.py", line 141, in dump_stream for obj in iterator: File "/.../spark/python/lib/pyspark.zip/pyspark/serializers.py", line 212, in _batched for item in
[spark] branch master updated (29c51d6 -> e694660)
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 29c51d6 [SPARK-31788][CORE][DSTREAM][PYTHON] Recover the support of union for different types of RDD and DStreams add e694660 [SPARK-31849][PYTHON][SQL] Make PySpark SQL exceptions more Pythonic No new revisions were added by this update. Summary of changes: python/pyspark/sql/tests/test_pandas_udf.py| 14 +++--- python/pyspark/sql/utils.py| 53 ++ .../org/apache/spark/sql/internal/SQLConf.scala| 11 + 3 files changed, 61 insertions(+), 17 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-3.0 updated: [SPARK-31788][CORE][DSTREAM][PYTHON] Recover the support of union for different types of RDD and DStreams
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.0 by this push: new bd7f5da [SPARK-31788][CORE][DSTREAM][PYTHON] Recover the support of union for different types of RDD and DStreams bd7f5da is described below commit bd7f5da3dfa0ce3edda0c9864cd0f89db744277f Author: HyukjinKwon AuthorDate: Mon Jun 1 09:43:03 2020 +0900 [SPARK-31788][CORE][DSTREAM][PYTHON] Recover the support of union for different types of RDD and DStreams ### What changes were proposed in this pull request? This PR manually specifies the class for the input array being used in `(SparkContext|StreamingContext).union`. It fixes a regression introduced from SPARK-25737. ```python rdd1 = sc.parallelize([1,2,3,4,5]) rdd2 = sc.parallelize([6,7,8,9,10]) pairRDD1 = rdd1.zip(rdd2) sc.union([pairRDD1, pairRDD1]).collect() ``` in the current master and `branch-3.0`: ``` Traceback (most recent call last): File "", line 1, in File "/.../spark/python/pyspark/context.py", line 870, in union jrdds[i] = rdds[i]._jrdd File "/.../spark/python/lib/py4j-0.10.9-src.zip/py4j/java_collections.py", line 238, in __setitem__ File "/.../spark/python/lib/py4j-0.10.9-src.zip/py4j/java_collections.py", line 221, in __set_item File "/.../spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py", line 332, in get_return_value py4j.protocol.Py4JError: An error occurred while calling None.None. Trace: py4j.Py4JException: Cannot convert org.apache.spark.api.java.JavaPairRDD to org.apache.spark.api.java.JavaRDD at py4j.commands.ArrayCommand.convertArgument(ArrayCommand.java:166) at py4j.commands.ArrayCommand.setArray(ArrayCommand.java:144) at py4j.commands.ArrayCommand.execute(ArrayCommand.java:97) at py4j.GatewayConnection.run(GatewayConnection.java:238) at java.lang.Thread.run(Thread.java:748) ``` which works in Spark 2.4.5: ``` [(1, 6), (2, 7), (3, 8), (4, 9), (5, 10), (1, 6), (2, 7), (3, 8), (4, 9), (5, 10)] ``` It assumed the class of the input array is the same `JavaRDD` or `JavaDStream`; however, that can be different such as `JavaPairRDD`. This fix is based on redsanket's initial approach, and will be co-authored. ### Why are the changes needed? To fix a regression from Spark 2.4.5. ### Does this PR introduce _any_ user-facing change? No, it's only in unreleased branches. This is to fix a regression. ### How was this patch tested? Manually tested, and a unittest was added. Closes #28648 from HyukjinKwon/SPARK-31788. Authored-by: HyukjinKwon Signed-off-by: HyukjinKwon (cherry picked from commit 29c51d682b3735123f78cf9cb8610522a9bb86fd) Signed-off-by: HyukjinKwon --- python/pyspark/context.py | 18 -- python/pyspark/streaming/context.py | 15 --- python/pyspark/tests/test_rdd.py| 11 +++ 3 files changed, 39 insertions(+), 5 deletions(-) diff --git a/python/pyspark/context.py b/python/pyspark/context.py index d5f1506..81b6caa 100644 --- a/python/pyspark/context.py +++ b/python/pyspark/context.py @@ -25,6 +25,7 @@ from threading import RLock from tempfile import NamedTemporaryFile from py4j.protocol import Py4JError +from py4j.java_gateway import is_instance_of from pyspark import accumulators from pyspark.accumulators import Accumulator @@ -864,8 +865,21 @@ class SparkContext(object): first_jrdd_deserializer = rdds[0]._jrdd_deserializer if any(x._jrdd_deserializer != first_jrdd_deserializer for x in rdds): rdds = [x._reserialize() for x in rdds] -cls = SparkContext._jvm.org.apache.spark.api.java.JavaRDD -jrdds = SparkContext._gateway.new_array(cls, len(rdds)) +gw = SparkContext._gateway +jvm = SparkContext._jvm +jrdd_cls = jvm.org.apache.spark.api.java.JavaRDD +jpair_rdd_cls = jvm.org.apache.spark.api.java.JavaPairRDD +jdouble_rdd_cls = jvm.org.apache.spark.api.java.JavaDoubleRDD +if is_instance_of(gw, rdds[0]._jrdd, jrdd_cls): +cls = jrdd_cls +elif is_instance_of(gw, rdds[0]._jrdd, jpair_rdd_cls): +cls = jpair_rdd_cls +elif is_instance_of(gw, rdds[0]._jrdd, jdouble_rdd_cls): +cls = jdouble_rdd_cls +else: +cls_name = rdds[0]._jrdd.getClass().getCanonicalName() +raise TypeError("Unsupported Java RDD class %s" % cls_name) +jrdds = gw.new_array(cls, len(rdds)) for i in range(0, len(rdds)): jrdds[i] = rdds[i]._jrdd return RDD(self._jsc.union(jrdds), self, rdds[0]._jrdd_deserializer) diff --git a/python/pyspark/s
[spark] branch master updated (45cf5e9 -> 29c51d6)
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 45cf5e9 [SPARK-31840][ML] Add instance weight support in LogisticRegressionSummary add 29c51d6 [SPARK-31788][CORE][DSTREAM][PYTHON] Recover the support of union for different types of RDD and DStreams No new revisions were added by this update. Summary of changes: python/pyspark/context.py | 18 -- python/pyspark/streaming/context.py | 15 --- python/pyspark/tests/test_rdd.py| 11 +++ 3 files changed, 39 insertions(+), 5 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-3.0 updated: [SPARK-31849][PYTHON][SQL] Make PySpark SQL exceptions more Pythonic
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.0 by this push: new a5a8ec2 [SPARK-31849][PYTHON][SQL] Make PySpark SQL exceptions more Pythonic a5a8ec2 is described below commit a5a8ec2ca8e75207b4e6c7b76ab6214be4a4237e Author: HyukjinKwon AuthorDate: Mon Jun 1 09:45:21 2020 +0900 [SPARK-31849][PYTHON][SQL] Make PySpark SQL exceptions more Pythonic ### What changes were proposed in this pull request? This PR proposes to make PySpark exception more Pythonic by hiding JVM stacktrace by default. It can be enabled by turning on `spark.sql.pyspark.jvmStacktrace.enabled` configuration. ``` Traceback (most recent call last): ... pyspark.sql.utils.PythonException: An exception was thrown from Python worker in the executor. The below is the Python worker stacktrace. Traceback (most recent call last): ... ``` If this `spark.sql.pyspark.jvmStacktrace.enabled` is enabled, it appends: ``` JVM stacktrace: org.apache.spark.Exception: ... ... ``` For example, the codes below: ```python from pyspark.sql.functions import udf udf def divide_by_zero(v): raise v / 0 spark.range(1).select(divide_by_zero("id")).show() ``` will show an error messages that looks like Python exception thrown from the local. Python exception message when spark.sql.pyspark.jvmStacktrace.enabled is off (default) ``` Traceback (most recent call last): File "", line 1, in File "/.../spark/python/pyspark/sql/dataframe.py", line 427, in show print(self._jdf.showString(n, 20, vertical)) File "/.../spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1305, in __call__ File "/.../spark/python/pyspark/sql/utils.py", line 131, in deco raise_from(converted) File "", line 3, in raise_from pyspark.sql.utils.PythonException: An exception was thrown from Python worker in the executor. The below is the Python worker stacktrace. Traceback (most recent call last): File "/.../spark/python/lib/pyspark.zip/pyspark/worker.py", line 605, in main process() File "/.../spark/python/lib/pyspark.zip/pyspark/worker.py", line 597, in process serializer.dump_stream(out_iter, outfile) File "/.../spark/python/lib/pyspark.zip/pyspark/serializers.py", line 223, in dump_stream self.serializer.dump_stream(self._batched(iterator), stream) File "/.../spark/python/lib/pyspark.zip/pyspark/serializers.py", line 141, in dump_stream for obj in iterator: File "/.../spark/python/lib/pyspark.zip/pyspark/serializers.py", line 212, in _batched for item in iterator: File "/.../spark/python/lib/pyspark.zip/pyspark/worker.py", line 450, in mapper result = tuple(f(*[a[o] for o in arg_offsets]) for (arg_offsets, f) in udfs) File "/.../spark/python/lib/pyspark.zip/pyspark/worker.py", line 450, in result = tuple(f(*[a[o] for o in arg_offsets]) for (arg_offsets, f) in udfs) File "/.../spark/python/lib/pyspark.zip/pyspark/worker.py", line 90, in return lambda *a: f(*a) File "/.../spark/python/lib/pyspark.zip/pyspark/util.py", line 107, in wrapper return f(*args, **kwargs) File "", line 3, in divide_by_zero ZeroDivisionError: division by zero ``` Python exception message when spark.sql.pyspark.jvmStacktrace.enabled is on ``` Traceback (most recent call last): File "", line 1, in File "/.../spark/python/pyspark/sql/dataframe.py", line 427, in show print(self._jdf.showString(n, 20, vertical)) File "/.../spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1305, in __call__ File "/.../spark/python/pyspark/sql/utils.py", line 137, in deco raise_from(converted) File "", line 3, in raise_from pyspark.sql.utils.PythonException: An exception was thrown from Python worker in the executor. The below is the Python worker stacktrace. Traceback (most recent call last): File "/.../spark/python/lib/pyspark.zip/pyspark/worker.py", line 605, in main process() File "/.../spark/python/lib/pyspark.zip/pyspark/worker.py", line 597, in process serializer.dump_stream(out_iter, outfile) File "/.../spark/python/lib/pyspark.zip/pyspark/serializers.py", line 223, in dump_stream self.serializer.dump_stream(self._batched(iterator), stream) File "/.../spark/python/lib/pyspark.zip/pyspark/serializers.py", line 141, in dump_stream for obj in iterator: File "/.../spark/python/lib/pyspark.zip/pyspark/serializers.py", line 212, in _batched for item in
[spark] branch master updated (29c51d6 -> e694660)
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 29c51d6 [SPARK-31788][CORE][DSTREAM][PYTHON] Recover the support of union for different types of RDD and DStreams add e694660 [SPARK-31849][PYTHON][SQL] Make PySpark SQL exceptions more Pythonic No new revisions were added by this update. Summary of changes: python/pyspark/sql/tests/test_pandas_udf.py| 14 +++--- python/pyspark/sql/utils.py| 53 ++ .../org/apache/spark/sql/internal/SQLConf.scala| 11 + 3 files changed, 61 insertions(+), 17 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-3.0 updated: [SPARK-31788][CORE][DSTREAM][PYTHON] Recover the support of union for different types of RDD and DStreams
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.0 by this push: new bd7f5da [SPARK-31788][CORE][DSTREAM][PYTHON] Recover the support of union for different types of RDD and DStreams bd7f5da is described below commit bd7f5da3dfa0ce3edda0c9864cd0f89db744277f Author: HyukjinKwon AuthorDate: Mon Jun 1 09:43:03 2020 +0900 [SPARK-31788][CORE][DSTREAM][PYTHON] Recover the support of union for different types of RDD and DStreams ### What changes were proposed in this pull request? This PR manually specifies the class for the input array being used in `(SparkContext|StreamingContext).union`. It fixes a regression introduced from SPARK-25737. ```python rdd1 = sc.parallelize([1,2,3,4,5]) rdd2 = sc.parallelize([6,7,8,9,10]) pairRDD1 = rdd1.zip(rdd2) sc.union([pairRDD1, pairRDD1]).collect() ``` in the current master and `branch-3.0`: ``` Traceback (most recent call last): File "", line 1, in File "/.../spark/python/pyspark/context.py", line 870, in union jrdds[i] = rdds[i]._jrdd File "/.../spark/python/lib/py4j-0.10.9-src.zip/py4j/java_collections.py", line 238, in __setitem__ File "/.../spark/python/lib/py4j-0.10.9-src.zip/py4j/java_collections.py", line 221, in __set_item File "/.../spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py", line 332, in get_return_value py4j.protocol.Py4JError: An error occurred while calling None.None. Trace: py4j.Py4JException: Cannot convert org.apache.spark.api.java.JavaPairRDD to org.apache.spark.api.java.JavaRDD at py4j.commands.ArrayCommand.convertArgument(ArrayCommand.java:166) at py4j.commands.ArrayCommand.setArray(ArrayCommand.java:144) at py4j.commands.ArrayCommand.execute(ArrayCommand.java:97) at py4j.GatewayConnection.run(GatewayConnection.java:238) at java.lang.Thread.run(Thread.java:748) ``` which works in Spark 2.4.5: ``` [(1, 6), (2, 7), (3, 8), (4, 9), (5, 10), (1, 6), (2, 7), (3, 8), (4, 9), (5, 10)] ``` It assumed the class of the input array is the same `JavaRDD` or `JavaDStream`; however, that can be different such as `JavaPairRDD`. This fix is based on redsanket's initial approach, and will be co-authored. ### Why are the changes needed? To fix a regression from Spark 2.4.5. ### Does this PR introduce _any_ user-facing change? No, it's only in unreleased branches. This is to fix a regression. ### How was this patch tested? Manually tested, and a unittest was added. Closes #28648 from HyukjinKwon/SPARK-31788. Authored-by: HyukjinKwon Signed-off-by: HyukjinKwon (cherry picked from commit 29c51d682b3735123f78cf9cb8610522a9bb86fd) Signed-off-by: HyukjinKwon --- python/pyspark/context.py | 18 -- python/pyspark/streaming/context.py | 15 --- python/pyspark/tests/test_rdd.py| 11 +++ 3 files changed, 39 insertions(+), 5 deletions(-) diff --git a/python/pyspark/context.py b/python/pyspark/context.py index d5f1506..81b6caa 100644 --- a/python/pyspark/context.py +++ b/python/pyspark/context.py @@ -25,6 +25,7 @@ from threading import RLock from tempfile import NamedTemporaryFile from py4j.protocol import Py4JError +from py4j.java_gateway import is_instance_of from pyspark import accumulators from pyspark.accumulators import Accumulator @@ -864,8 +865,21 @@ class SparkContext(object): first_jrdd_deserializer = rdds[0]._jrdd_deserializer if any(x._jrdd_deserializer != first_jrdd_deserializer for x in rdds): rdds = [x._reserialize() for x in rdds] -cls = SparkContext._jvm.org.apache.spark.api.java.JavaRDD -jrdds = SparkContext._gateway.new_array(cls, len(rdds)) +gw = SparkContext._gateway +jvm = SparkContext._jvm +jrdd_cls = jvm.org.apache.spark.api.java.JavaRDD +jpair_rdd_cls = jvm.org.apache.spark.api.java.JavaPairRDD +jdouble_rdd_cls = jvm.org.apache.spark.api.java.JavaDoubleRDD +if is_instance_of(gw, rdds[0]._jrdd, jrdd_cls): +cls = jrdd_cls +elif is_instance_of(gw, rdds[0]._jrdd, jpair_rdd_cls): +cls = jpair_rdd_cls +elif is_instance_of(gw, rdds[0]._jrdd, jdouble_rdd_cls): +cls = jdouble_rdd_cls +else: +cls_name = rdds[0]._jrdd.getClass().getCanonicalName() +raise TypeError("Unsupported Java RDD class %s" % cls_name) +jrdds = gw.new_array(cls, len(rdds)) for i in range(0, len(rdds)): jrdds[i] = rdds[i]._jrdd return RDD(self._jsc.union(jrdds), self, rdds[0]._jrdd_deserializer) diff --git a/python/pyspark/s
[spark] branch master updated (45cf5e9 -> 29c51d6)
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 45cf5e9 [SPARK-31840][ML] Add instance weight support in LogisticRegressionSummary add 29c51d6 [SPARK-31788][CORE][DSTREAM][PYTHON] Recover the support of union for different types of RDD and DStreams No new revisions were added by this update. Summary of changes: python/pyspark/context.py | 18 -- python/pyspark/streaming/context.py | 15 --- python/pyspark/tests/test_rdd.py| 11 +++ 3 files changed, 39 insertions(+), 5 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (45cf5e9 -> 29c51d6)
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 45cf5e9 [SPARK-31840][ML] Add instance weight support in LogisticRegressionSummary add 29c51d6 [SPARK-31788][CORE][DSTREAM][PYTHON] Recover the support of union for different types of RDD and DStreams No new revisions were added by this update. Summary of changes: python/pyspark/context.py | 18 -- python/pyspark/streaming/context.py | 15 --- python/pyspark/tests/test_rdd.py| 11 +++ 3 files changed, 39 insertions(+), 5 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (47dc332 -> 45cf5e9)
This is an automated email from the ASF dual-hosted git repository. srowen pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 47dc332 [SPARK-31874][SQL] Use `FastDateFormat` as the legacy fractional formatter add 45cf5e9 [SPARK-31840][ML] Add instance weight support in LogisticRegressionSummary No new revisions were added by this update. Summary of changes: .../ml/classification/LogisticRegression.scala | 99 +- .../classification/LogisticRegressionSuite.scala | 61 + project/MimaExcludes.scala | 6 +- python/pyspark/ml/classification.py| 11 +++ 4 files changed, 134 insertions(+), 43 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (47dc332 -> 45cf5e9)
This is an automated email from the ASF dual-hosted git repository. srowen pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 47dc332 [SPARK-31874][SQL] Use `FastDateFormat` as the legacy fractional formatter add 45cf5e9 [SPARK-31840][ML] Add instance weight support in LogisticRegressionSummary No new revisions were added by this update. Summary of changes: .../ml/classification/LogisticRegression.scala | 99 +- .../classification/LogisticRegressionSuite.scala | 61 + project/MimaExcludes.scala | 6 +- python/pyspark/ml/classification.py| 11 +++ 4 files changed, 134 insertions(+), 43 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-3.0 updated: [SPARK-31874][SQL] Use `FastDateFormat` as the legacy fractional formatter
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.0 by this push: new a53394f [SPARK-31874][SQL] Use `FastDateFormat` as the legacy fractional formatter a53394f is described below commit a53394fda600279923c7ac7e61c1d763993e680b Author: Max Gekk AuthorDate: Sun May 31 13:05:00 2020 + [SPARK-31874][SQL] Use `FastDateFormat` as the legacy fractional formatter ### What changes were proposed in this pull request? 1. Replace `SimpleDateFormat` by `FastDateFormat` as the legacy formatter of `FractionTimestampFormatter`. 2. Optimise `LegacyFastTimestampFormatter` for `java.sql.Timestamp` w/o fractional part. ### Why are the changes needed? 1. By default `HiveResult`.`hiveResultString` retrieves timestamps values as instances of `java.sql.Timestamp`, and uses the legacy parser `SimpleDateFormat` to convert the timestamps to strings. After the fix https://github.com/apache/spark/pull/28024, the fractional formatter and its companion - legacy formatter `SimpleDateFormat` are created per every value. By switching from `LegacySimpleTimestampFormatter` to `LegacyFastTimestampFormatter`, we can utilize the internal cache of `F [...] 2. The second change in the method `def format(ts: Timestamp): String` of `LegacyFastTimestampFormatter` is needed to optimize the formatter for patterns without the fractional part and avoid conversions to microseconds. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? By existing tests in `TimestampFormatter`. Closes #28678 from MaxGekk/fastdateformat-as-legacy-frac-formatter. Authored-by: Max Gekk Signed-off-by: Wenchen Fan (cherry picked from commit 47dc332258bec20c460f666de50d9a8c5c0fbc0a) Signed-off-by: Wenchen Fan --- .../org/apache/spark/sql/catalyst/util/TimestampFormatter.scala| 7 ++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala index 32b4dcd..6d1c535 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala @@ -121,6 +121,7 @@ class FractionTimestampFormatter(zoneId: ZoneId) TimestampFormatter.defaultPattern, zoneId, TimestampFormatter.defaultLocale, +LegacyDateFormats.FAST_DATE_FORMAT, needVarLengthSecondFraction = false) { @transient @@ -203,7 +204,11 @@ class LegacyFastTimestampFormatter( } override def format(ts: Timestamp): String = { -format(fromJavaTimestamp(ts)) +if (ts.getNanos == 0) { + fastDateFormat.format(ts) +} else { + format(fromJavaTimestamp(ts)) +} } override def format(instant: Instant): String = { - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-31874][SQL] Use `FastDateFormat` as the legacy fractional formatter
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 47dc332 [SPARK-31874][SQL] Use `FastDateFormat` as the legacy fractional formatter 47dc332 is described below commit 47dc332258bec20c460f666de50d9a8c5c0fbc0a Author: Max Gekk AuthorDate: Sun May 31 13:05:00 2020 + [SPARK-31874][SQL] Use `FastDateFormat` as the legacy fractional formatter ### What changes were proposed in this pull request? 1. Replace `SimpleDateFormat` by `FastDateFormat` as the legacy formatter of `FractionTimestampFormatter`. 2. Optimise `LegacyFastTimestampFormatter` for `java.sql.Timestamp` w/o fractional part. ### Why are the changes needed? 1. By default `HiveResult`.`hiveResultString` retrieves timestamps values as instances of `java.sql.Timestamp`, and uses the legacy parser `SimpleDateFormat` to convert the timestamps to strings. After the fix https://github.com/apache/spark/pull/28024, the fractional formatter and its companion - legacy formatter `SimpleDateFormat` are created per every value. By switching from `LegacySimpleTimestampFormatter` to `LegacyFastTimestampFormatter`, we can utilize the internal cache of `F [...] 2. The second change in the method `def format(ts: Timestamp): String` of `LegacyFastTimestampFormatter` is needed to optimize the formatter for patterns without the fractional part and avoid conversions to microseconds. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? By existing tests in `TimestampFormatter`. Closes #28678 from MaxGekk/fastdateformat-as-legacy-frac-formatter. Authored-by: Max Gekk Signed-off-by: Wenchen Fan --- .../org/apache/spark/sql/catalyst/util/TimestampFormatter.scala| 7 ++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala index 8428964..3e302e2 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala @@ -121,6 +121,7 @@ class FractionTimestampFormatter(zoneId: ZoneId) TimestampFormatter.defaultPattern, zoneId, TimestampFormatter.defaultLocale, +LegacyDateFormats.FAST_DATE_FORMAT, needVarLengthSecondFraction = false) { @transient @@ -224,7 +225,11 @@ class LegacyFastTimestampFormatter( } override def format(ts: Timestamp): String = { -format(fromJavaTimestamp(ts)) +if (ts.getNanos == 0) { + fastDateFormat.format(ts) +} else { + format(fromJavaTimestamp(ts)) +} } override def format(instant: Instant): String = { - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-3.0 updated: [SPARK-31867][SQL] Disable year type datetime patterns which are longer than 10
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.0 by this push: new dc0d12a [SPARK-31867][SQL] Disable year type datetime patterns which are longer than 10 dc0d12a is described below commit dc0d12ac3ebef7c11b89f890757ebbbd5adeecfc Author: Kent Yao AuthorDate: Sun May 31 12:34:39 2020 + [SPARK-31867][SQL] Disable year type datetime patterns which are longer than 10 As mentioned in https://github.com/apache/spark/pull/28673 and suggested via cloud-fan at https://github.com/apache/spark/pull/28673#discussion_r432817075 In this PR, we disable datetime pattern in the form of `y..y` and `Y..Y` whose lengths are greater than 10 to avoid sort of JDK bug as described below he new datetime formatter introduces silent data change like, ```sql spark-sql> select from_unixtime(1, 'yyy-MM-dd'); NULL spark-sql> set spark.sql.legacy.timeParserPolicy=legacy; spark.sql.legacy.timeParserPolicy legacy spark-sql> select from_unixtime(1, 'yyy-MM-dd'); 0001970-01-01 spark-sql> ``` For patterns that support `SignStyle.EXCEEDS_PAD`, e.g. `y..y`(len >=4), when using the `NumberPrinterParser` to format it ```java switch (signStyle) { case EXCEEDS_PAD: if (minWidth < 19 && value >= EXCEED_POINTS[minWidth]) { buf.append(decimalStyle.getPositiveSign()); } break; ``` the `minWidth` == `len(y..y)` the `EXCEED_POINTS` is ```java /** * Array of 10 to the power of n. */ static final long[] EXCEED_POINTS = new long[] { 0L, 10L, 100L, 1000L, 1L, 10L, 100L, 1000L, 1L, 10L, 100L, }; ``` So when the `len(y..y)` is greater than 10, ` ArrayIndexOutOfBoundsException` will be raised. And at the caller side, for `from_unixtime`, the exception will be suppressed and silent data change occurs. for `date_format`, the `ArrayIndexOutOfBoundsException` will continue. fix silent data change Yes, SparkUpgradeException will take place of `null` result when the pattern contains 10 or more continuous 'y' or 'Y' new tests Closes #28684 from yaooqinn/SPARK-31867-2. Authored-by: Kent Yao Signed-off-by: Wenchen Fan (cherry picked from commit 547c5bf55265772780098ee5e29baa6f095c246b) Signed-off-by: Wenchen Fan --- docs/sql-ref-datetime-pattern.md | 2 +- .../catalyst/util/DateTimeFormatterHelper.scala| 14 --- .../util/DateTimeFormatterHelperSuite.scala| 3 +-- .../spark/sql/util/TimestampFormatterSuite.scala | 5 ++-- .../test/resources/sql-tests/inputs/datetime.sql | 4 .../sql-tests/results/ansi/datetime.sql.out| 28 +- .../sql-tests/results/datetime-legacy.sql.out | 26 +++- .../resources/sql-tests/results/datetime.sql.out | 28 +- 8 files changed, 99 insertions(+), 11 deletions(-) diff --git a/docs/sql-ref-datetime-pattern.md b/docs/sql-ref-datetime-pattern.md index 48e85b4..865b947 100644 --- a/docs/sql-ref-datetime-pattern.md +++ b/docs/sql-ref-datetime-pattern.md @@ -74,7 +74,7 @@ The count of pattern letters determines the format. For formatting, the fraction length would be padded to the number of contiguous 'S' with zeros. Spark supports datetime of micro-of-second precision, which has up to 6 significant digits, but can parse nano-of-second with exceeded part truncated. -- Year: The count of letters determines the minimum field width below which padding is used. If the count of letters is two, then a reduced two digit form is used. For printing, this outputs the rightmost two digits. For parsing, this will parse using the base value of 2000, resulting in a year within the range 2000 to 2099 inclusive. If the count of letters is less than four (but not two), then the sign is only output for negative years. Otherwise, the sign is output if the pad width is [...] +- Year: The count of letters determines the minimum field width below which padding is used. If the count of letters is two, then a reduced two digit form is used. For printing, this outputs the rightmost two digits. For parsing, this will parse using the base value of 2000, resulting in a year within the range 2000 to 2099 inclusive. If the count of letters is less than four (but not two), then the sign is only output for negative years. Otherwise, the sign is output if the pad width is [...] - Month:
[spark] branch master updated (1b780f3 -> 547c5bf)
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 1b780f3 [SPARK-31866][SQL][DOCS] Add COALESCE/REPARTITION/REPARTITION_BY_RANGE Hints to SQL Reference add 547c5bf [SPARK-31867][SQL] Disable year type datetime patterns which are longer than 10 No new revisions were added by this update. Summary of changes: docs/sql-ref-datetime-pattern.md | 2 +- .../catalyst/util/DateTimeFormatterHelper.scala| 14 --- .../util/DateTimeFormatterHelperSuite.scala| 2 +- .../spark/sql/util/TimestampFormatterSuite.scala | 5 ++-- .../test/resources/sql-tests/inputs/datetime.sql | 4 .../sql-tests/results/ansi/datetime.sql.out| 28 +- .../sql-tests/results/datetime-legacy.sql.out | 26 +++- .../resources/sql-tests/results/datetime.sql.out | 28 +- 8 files changed, 99 insertions(+), 10 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (1b780f3 -> 547c5bf)
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 1b780f3 [SPARK-31866][SQL][DOCS] Add COALESCE/REPARTITION/REPARTITION_BY_RANGE Hints to SQL Reference add 547c5bf [SPARK-31867][SQL] Disable year type datetime patterns which are longer than 10 No new revisions were added by this update. Summary of changes: docs/sql-ref-datetime-pattern.md | 2 +- .../catalyst/util/DateTimeFormatterHelper.scala| 14 --- .../util/DateTimeFormatterHelperSuite.scala| 2 +- .../spark/sql/util/TimestampFormatterSuite.scala | 5 ++-- .../test/resources/sql-tests/inputs/datetime.sql | 4 .../sql-tests/results/ansi/datetime.sql.out| 28 +- .../sql-tests/results/datetime-legacy.sql.out | 26 +++- .../resources/sql-tests/results/datetime.sql.out | 28 +- 8 files changed, 99 insertions(+), 10 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (1b780f3 -> 547c5bf)
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 1b780f3 [SPARK-31866][SQL][DOCS] Add COALESCE/REPARTITION/REPARTITION_BY_RANGE Hints to SQL Reference add 547c5bf [SPARK-31867][SQL] Disable year type datetime patterns which are longer than 10 No new revisions were added by this update. Summary of changes: docs/sql-ref-datetime-pattern.md | 2 +- .../catalyst/util/DateTimeFormatterHelper.scala| 14 --- .../util/DateTimeFormatterHelperSuite.scala| 2 +- .../spark/sql/util/TimestampFormatterSuite.scala | 5 ++-- .../test/resources/sql-tests/inputs/datetime.sql | 4 .../sql-tests/results/ansi/datetime.sql.out| 28 +- .../sql-tests/results/datetime-legacy.sql.out | 26 +++- .../resources/sql-tests/results/datetime.sql.out | 28 +- 8 files changed, 99 insertions(+), 10 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org