[spark] branch master updated: [SPARK-41010][CONNECT][PYTHON] Complete Support for Except and Intersect in Python client
This is an automated email from the ASF dual-hosted git repository. ruifengz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 9dc39e199de [SPARK-41010][CONNECT][PYTHON] Complete Support for Except and Intersect in Python client 9dc39e199de is described below commit 9dc39e199de645f60e115267fba2fae782ab53f1 Author: Rui Wang AuthorDate: Thu Nov 10 12:11:40 2022 +0800 [SPARK-41010][CONNECT][PYTHON] Complete Support for Except and Intersect in Python client ### What changes were proposed in this pull request? 1. Add support for intersect and except. 2. Unify union, intersect and except into `SetOperation`. ### Why are the changes needed? Improve API coverage. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? UT Closes #38506 from amaliujia/except_python. Authored-by: Rui Wang Signed-off-by: Ruifeng Zheng --- python/pyspark/sql/connect/dataframe.py| 82 +- python/pyspark/sql/connect/plan.py | 38 +++--- .../sql/tests/connect/test_connect_plan_only.py| 22 ++ 3 files changed, 132 insertions(+), 10 deletions(-) diff --git a/python/pyspark/sql/connect/dataframe.py b/python/pyspark/sql/connect/dataframe.py index c6877707ad2..ccd826cd476 100644 --- a/python/pyspark/sql/connect/dataframe.py +++ b/python/pyspark/sql/connect/dataframe.py @@ -389,7 +389,9 @@ class DataFrame(object): def unionAll(self, other: "DataFrame") -> "DataFrame": if other._plan is None: raise ValueError("Argument to Union does not contain a valid plan.") -return DataFrame.withPlan(plan.UnionAll(self._plan, other._plan), session=self._session) +return DataFrame.withPlan( +plan.SetOperation(self._plan, other._plan, "union", is_all=True), session=self._session +) def unionByName(self, other: "DataFrame", allowMissingColumns: bool = False) -> "DataFrame": """Returns a new :class:`DataFrame` containing union of rows in this and another @@ -415,7 +417,83 @@ class DataFrame(object): if other._plan is None: raise ValueError("Argument to UnionByName does not contain a valid plan.") return DataFrame.withPlan( -plan.UnionAll(self._plan, other._plan, allowMissingColumns), session=self._session +plan.SetOperation( +self._plan, other._plan, "union", is_all=True, by_name=allowMissingColumns +), +session=self._session, +) + +def exceptAll(self, other: "DataFrame") -> "DataFrame": +"""Return a new :class:`DataFrame` containing rows in this :class:`DataFrame` but +not in another :class:`DataFrame` while preserving duplicates. + +This is equivalent to `EXCEPT ALL` in SQL. +As standard in SQL, this function resolves columns by position (not by name). + +.. versionadded:: 3.4.0 + +Parameters +-- +other : :class:`DataFrame` +The other :class:`DataFrame` to compare to. + +Returns +--- +:class:`DataFrame` +""" +return DataFrame.withPlan( +plan.SetOperation(self._plan, other._plan, "except", is_all=True), session=self._session +) + +def intersect(self, other: "DataFrame") -> "DataFrame": +"""Return a new :class:`DataFrame` containing rows only in +both this :class:`DataFrame` and another :class:`DataFrame`. +Note that any duplicates are removed. To preserve duplicates +use :func:`intersectAll`. + +.. versionadded:: 3.4.0 + +Parameters +-- +other : :class:`DataFrame` +Another :class:`DataFrame` that needs to be combined. + +Returns +--- +:class:`DataFrame` +Combined DataFrame. + +Notes +- +This is equivalent to `INTERSECT` in SQL. +""" +return DataFrame.withPlan( +plan.SetOperation(self._plan, other._plan, "intersect", is_all=False), +session=self._session, +) + +def intersectAll(self, other: "DataFrame") -> "DataFrame": +"""Return a new :class:`DataFrame` containing rows in both this :class:`DataFrame` +and another :class:`DataFrame` while preserving duplicates. + +This is equivalent to `INTERSECT ALL` in SQL. As standard in SQL, this function +resolves columns by position (not by name). + +
[spark] branch master updated: [SPARK-41026][CONNECT][PYTHON][FOLLOW-UP] Add Coalesce and Repartition API to Python client
This is an automated email from the ASF dual-hosted git repository. ruifengz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 736be3116c7 [SPARK-41026][CONNECT][PYTHON][FOLLOW-UP] Add Coalesce and Repartition API to Python client 736be3116c7 is described below commit 736be3116c7c13c82eac91f426ee6e96753c9cf5 Author: Rui Wang AuthorDate: Wed Nov 9 17:35:43 2022 +0800 [SPARK-41026][CONNECT][PYTHON][FOLLOW-UP] Add Coalesce and Repartition API to Python client ### What changes were proposed in this pull request? Following up https://github.com/apache/spark/pull/38529, with `Reparitition` proto we can support `Coalesce` and `Repartition` API in Python client. ### Why are the changes needed? Improve API coverage ### Does this PR introduce _any_ user-facing change? NO ### How was this patch tested? UT Closes #38549 from amaliujia/support_coalesce_in_python. Authored-by: Rui Wang Signed-off-by: Ruifeng Zheng --- python/pyspark/sql/connect/dataframe.py| 49 +- python/pyspark/sql/connect/plan.py | 34 +++ .../sql/tests/connect/test_connect_plan_only.py| 17 3 files changed, 98 insertions(+), 2 deletions(-) diff --git a/python/pyspark/sql/connect/dataframe.py b/python/pyspark/sql/connect/dataframe.py index 64b2e54f0ef..c6877707ad2 100644 --- a/python/pyspark/sql/connect/dataframe.py +++ b/python/pyspark/sql/connect/dataframe.py @@ -156,8 +156,53 @@ class DataFrame(object): def crossJoin(self, other: "DataFrame") -> "DataFrame": ... -def coalesce(self, num_partitions: int) -> "DataFrame": -... +def coalesce(self, numPartitions: int) -> "DataFrame": +""" +Returns a new :class:`DataFrame` that has exactly `numPartitions` partitions. + +Coalesce does not trigger a shuffle. + +.. versionadded:: 3.4.0 + +Parameters +-- +numPartitions : int +specify the target number of partitions + +Returns +--- +:class:`DataFrame` +""" +if not numPartitions > 0: +raise ValueError("numPartitions must be positive.") +return DataFrame.withPlan( +plan.Repartition(self._plan, num_partitions=numPartitions, shuffle=False), +self._session, +) + +def repartition(self, numPartitions: int) -> "DataFrame": +""" +Returns a new :class:`DataFrame` that has exactly `numPartitions` partitions. + +Repartition will shuffle source partition into partitions specified by numPartitions. + +.. versionadded:: 3.4.0 + +Parameters +-- +numPartitions : int +specify the target number of partitions + +Returns +--- +:class:`DataFrame` +""" +if not numPartitions > 0: +raise ValueError("numPartitions must be positive.") +return DataFrame.withPlan( +plan.Repartition(self._plan, num_partitions=numPartitions, shuffle=True), +self._session, +) def describe(self, cols: List[ColumnRef]) -> Any: ... diff --git a/python/pyspark/sql/connect/plan.py b/python/pyspark/sql/connect/plan.py index 1d5c80f510e..3bb5558d04b 100644 --- a/python/pyspark/sql/connect/plan.py +++ b/python/pyspark/sql/connect/plan.py @@ -652,6 +652,40 @@ class UnionAll(LogicalPlan): """ +class Repartition(LogicalPlan): +"""Repartition Relation into a different number of partitions.""" + +def __init__(self, child: Optional["LogicalPlan"], num_partitions: int, shuffle: bool) -> None: +super().__init__(child) +self._num_partitions = num_partitions +self._shuffle = shuffle + +def plan(self, session: Optional["RemoteSparkSession"]) -> proto.Relation: +rel = proto.Relation() +if self._child is not None: +rel.repartition.input.CopyFrom(self._child.plan(session)) +rel.repartition.shuffle = self._shuffle +rel.repartition.num_partitions = self._num_partitions +return rel + +def print(self, indent: int = 0) -> str: +plan_name = "repartition" if self._shuffle else "coalesce" +c_buf = self._child.print(indent + LogicalPlan.INDENT) if self._child else "" +return f"{' ' * indent}<{plan_name} num_partitions={self._num_partitions}>\n{c_buf}" + +def _repr_html_(self) -> str: +plan_name = "repartition" if sel
[spark] branch master updated: [SPARK-41002][CONNECT][PYTHON] Compatible `take`, `head` and `first` API in Python client
This is an automated email from the ASF dual-hosted git repository. ruifengz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 2a361b9ddfa [SPARK-41002][CONNECT][PYTHON] Compatible `take`, `head` and `first` API in Python client 2a361b9ddfa is described below commit 2a361b9ddfa766c719399b35c38f4dafe68353ee Author: Rui Wang AuthorDate: Tue Nov 8 08:30:49 2022 +0800 [SPARK-41002][CONNECT][PYTHON] Compatible `take`, `head` and `first` API in Python client ### What changes were proposed in this pull request? 1. Add `take(n)` API. 2. Change `head(n)` API to return `Union[Optional[Row], List[Row]]`. 3. Update `first()` to return `Optional[Row]`. ### Why are the changes needed? Improve API coverage. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? UT Closes #38488 from amaliujia/SPARK-41002. Authored-by: Rui Wang Signed-off-by: Ruifeng Zheng --- python/pyspark/sql/connect/dataframe.py| 61 -- .../sql/tests/connect/test_connect_basic.py| 36 +++-- 2 files changed, 90 insertions(+), 7 deletions(-) diff --git a/python/pyspark/sql/connect/dataframe.py b/python/pyspark/sql/connect/dataframe.py index b9ba4b99ba0..9eecdbb7145 100644 --- a/python/pyspark/sql/connect/dataframe.py +++ b/python/pyspark/sql/connect/dataframe.py @@ -24,6 +24,7 @@ from typing import ( Tuple, Union, TYPE_CHECKING, +overload, ) import pandas @@ -211,14 +212,66 @@ class DataFrame(object): plan.Filter(child=self._plan, filter=condition), session=self._session ) -def first(self) -> Optional["pandas.DataFrame"]: -return self.head(1) +def first(self) -> Optional[Row]: +"""Returns the first row as a :class:`Row`. + +.. versionadded:: 3.4.0 + +Returns +--- +:class:`Row` + First row if :class:`DataFrame` is not empty, otherwise ``None``. +""" +return self.head() def groupBy(self, *cols: "ColumnOrString") -> GroupingFrame: return GroupingFrame(self, *cols) -def head(self, n: int) -> Optional["pandas.DataFrame"]: -return self.limit(n).toPandas() +@overload +def head(self) -> Optional[Row]: +... + +@overload +def head(self, n: int) -> List[Row]: +... + +def head(self, n: Optional[int] = None) -> Union[Optional[Row], List[Row]]: +"""Returns the first ``n`` rows. + +.. versionadded:: 3.4.0 + +Parameters +-- +n : int, optional +default 1. Number of rows to return. + +Returns +--- +If n is greater than 1, return a list of :class:`Row`. +If n is 1, return a single Row. +""" +if n is None: +rs = self.head(1) +return rs[0] if rs else None +return self.take(n) + +def take(self, num: int) -> List[Row]: +"""Returns the first ``num`` rows as a :class:`list` of :class:`Row`. + +.. versionadded:: 3.4.0 + +Parameters +-- +num : int +Number of records to return. Will return this number of records +or whataver number is available. + +Returns +--- +list +List of rows +""" +return self.limit(num).collect() # TODO: extend `on` to also be type List[ColumnRef]. def join( diff --git a/python/pyspark/sql/tests/connect/test_connect_basic.py b/python/pyspark/sql/tests/connect/test_connect_basic.py index 18a752ee19d..a0f046907f7 100644 --- a/python/pyspark/sql/tests/connect/test_connect_basic.py +++ b/python/pyspark/sql/tests/connect/test_connect_basic.py @@ -46,6 +46,7 @@ class SparkConnectSQLTestCase(ReusedPySparkTestCase): if have_pandas: connect: RemoteSparkSession tbl_name: str +tbl_name_empty: str df_text: "DataFrame" @classmethod @@ -61,6 +62,7 @@ class SparkConnectSQLTestCase(ReusedPySparkTestCase): cls.df_text = cls.sc.parallelize(cls.testDataStr).toDF() cls.tbl_name = "test_connect_basic_table_1" +cls.tbl_name_empty = "test_connect_basic_table_empty" # Cleanup test data cls.spark_connect_clean_up_test_data() @@ -80,10 +82,21 @@ class SparkConnectSQLTestCase(ReusedPySparkTestCase): # Since we might create multiple Spark sessions, we need to create global temporary view # that is specifically maintained in the "global_temp" schema. df.write.saveAsTable(cls.tbl_nam
[spark] branch master updated: [SPARK-40883][CONNECT][FOLLOW-UP] Range.step is required and Python client should have a default value=1
This is an automated email from the ASF dual-hosted git repository. ruifengz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new f03fdf90281 [SPARK-40883][CONNECT][FOLLOW-UP] Range.step is required and Python client should have a default value=1 f03fdf90281 is described below commit f03fdf90281d67065b9ab211b5cd9cfbe5742614 Author: Rui Wang AuthorDate: Wed Nov 2 14:10:13 2022 +0800 [SPARK-40883][CONNECT][FOLLOW-UP] Range.step is required and Python client should have a default value=1 ### What changes were proposed in this pull request? To match existing Python DataFarme API, this PR changes the `Range.step` as required and Python client keep `1` as a default value for this field. ### Why are the changes needed? Matching existing DataFrame API. ### Does this PR introduce _any_ user-facing change? NO ### How was this patch tested? UT Closes #38471 from amaliujia/range_step_required. Authored-by: Rui Wang Signed-off-by: Ruifeng Zheng --- .../main/protobuf/spark/connect/relations.proto| 8 ++-- .../org/apache/spark/sql/connect/dsl/package.scala | 4 +++- .../sql/connect/planner/SparkConnectPlanner.scala | 6 +- python/pyspark/sql/connect/client.py | 2 +- python/pyspark/sql/connect/plan.py | 7 ++- python/pyspark/sql/connect/proto/relations_pb2.py | 14 ++ python/pyspark/sql/connect/proto/relations_pb2.pyi | 22 -- .../sql/tests/connect/test_connect_plan_only.py| 4 ++-- python/pyspark/testing/connectutils.py | 2 +- 9 files changed, 22 insertions(+), 47 deletions(-) diff --git a/connector/connect/src/main/protobuf/spark/connect/relations.proto b/connector/connect/src/main/protobuf/spark/connect/relations.proto index a4503204aa1..deb35525728 100644 --- a/connector/connect/src/main/protobuf/spark/connect/relations.proto +++ b/connector/connect/src/main/protobuf/spark/connect/relations.proto @@ -226,16 +226,12 @@ message Range { int64 start = 1; // Required. int64 end = 2; - // Optional. Default value = 1 - Step step = 3; + // Required. + int64 step = 3; // Optional. Default value is assigned by 1) SQL conf "spark.sql.leafNodeDefaultParallelism" if // it is set, or 2) spark default parallelism. NumPartitions num_partitions = 4; - message Step { -int64 step = 1; - } - message NumPartitions { int32 num_partitions = 1; } diff --git a/connector/connect/src/main/scala/org/apache/spark/sql/connect/dsl/package.scala b/connector/connect/src/main/scala/org/apache/spark/sql/connect/dsl/package.scala index f649d040721..e2030c9ad31 100644 --- a/connector/connect/src/main/scala/org/apache/spark/sql/connect/dsl/package.scala +++ b/connector/connect/src/main/scala/org/apache/spark/sql/connect/dsl/package.scala @@ -190,7 +190,9 @@ package object dsl { } range.setEnd(end) if (step.isDefined) { - range.setStep(proto.Range.Step.newBuilder().setStep(step.get)) + range.setStep(step.get) +} else { + range.setStep(1L) } if (numPartitions.isDefined) { range.setNumPartitions( diff --git a/connector/connect/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala b/connector/connect/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala index eea2579e61f..f5c6980290f 100644 --- a/connector/connect/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala +++ b/connector/connect/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala @@ -110,11 +110,7 @@ class SparkConnectPlanner(plan: proto.Relation, session: SparkSession) { private def transformRange(rel: proto.Range): LogicalPlan = { val start = rel.getStart val end = rel.getEnd -val step = if (rel.hasStep) { - rel.getStep.getStep -} else { - 1 -} +val step = rel.getStep val numPartitions = if (rel.hasNumPartitions) { rel.getNumPartitions.getNumPartitions } else { diff --git a/python/pyspark/sql/connect/client.py b/python/pyspark/sql/connect/client.py index e64d612c53e..c845d378320 100644 --- a/python/pyspark/sql/connect/client.py +++ b/python/pyspark/sql/connect/client.py @@ -149,7 +149,7 @@ class RemoteSparkSession(object): self, start: int, end: int, -step: Optional[int] = None, +step: int = 1, numPartitions: Optional[int] = None, ) -> DataFrame: """ diff --git a/python/pyspark/sql/connect/plan.py b/python/pyspark/sql/connect/plan.py index 71c971d9e91..2f1f70ec1a9 100644 --- a/python/pyspark/sql/connect/plan.py +++ b/python/pyspark/sql/connect/plan.py @@ -705,7 +705,
[spark] branch master updated (968463b070e -> 219a49c9354)
This is an automated email from the ASF dual-hosted git repository. ruifengz pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git from 968463b070e [SPARK-40981][CONNECT][PYTHON] Support session.range in Python client add 219a49c9354 [SPARK-40980][CONNECT][TEST] Support session.sql in Connect DSL No new revisions were added by this update. Summary of changes: .../src/main/scala/org/apache/spark/sql/connect/dsl/package.scala | 4 .../org/apache/spark/sql/connect/planner/SparkConnectProtoSuite.scala | 4 2 files changed, 8 insertions(+) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-40981][CONNECT][PYTHON] Support session.range in Python client
This is an automated email from the ASF dual-hosted git repository. ruifengz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 968463b070e [SPARK-40981][CONNECT][PYTHON] Support session.range in Python client 968463b070e is described below commit 968463b070eac325f7d018b13e27c5694f33089e Author: Rui Wang AuthorDate: Tue Nov 1 16:51:33 2022 +0800 [SPARK-40981][CONNECT][PYTHON] Support session.range in Python client ### What changes were proposed in this pull request? This PR adds `range` API to Python client's `RemoteSparkSession` with tests. This PR also updates `start`, `end`, `step` to `int64` in the Connect proto. ### Why are the changes needed? Improve API coverage. ### Does this PR introduce _any_ user-facing change? NO ### How was this patch tested? UT Closes #38460 from amaliujia/SPARK-40981. Authored-by: Rui Wang Signed-off-by: Ruifeng Zheng --- .../main/protobuf/spark/connect/relations.proto| 6 +-- .../org/apache/spark/sql/connect/dsl/package.scala | 6 +-- python/pyspark/sql/connect/client.py | 35 ++- python/pyspark/sql/connect/plan.py | 50 ++ python/pyspark/sql/connect/proto/relations_pb2.py | 2 +- .../sql/tests/connect/test_connect_basic.py| 17 .../sql/tests/connect/test_connect_plan_only.py| 15 +++ python/pyspark/testing/connectutils.py | 18 +++- 8 files changed, 139 insertions(+), 10 deletions(-) diff --git a/connector/connect/src/main/protobuf/spark/connect/relations.proto b/connector/connect/src/main/protobuf/spark/connect/relations.proto index e88e70ceb73..a4503204aa1 100644 --- a/connector/connect/src/main/protobuf/spark/connect/relations.proto +++ b/connector/connect/src/main/protobuf/spark/connect/relations.proto @@ -223,9 +223,9 @@ message Sample { // Relation of type [[Range]] that generates a sequence of integers. message Range { // Optional. Default value = 0 - int32 start = 1; + int64 start = 1; // Required. - int32 end = 2; + int64 end = 2; // Optional. Default value = 1 Step step = 3; // Optional. Default value is assigned by 1) SQL conf "spark.sql.leafNodeDefaultParallelism" if @@ -233,7 +233,7 @@ message Range { NumPartitions num_partitions = 4; message Step { -int32 step = 1; +int64 step = 1; } message NumPartitions { diff --git a/connector/connect/src/main/scala/org/apache/spark/sql/connect/dsl/package.scala b/connector/connect/src/main/scala/org/apache/spark/sql/connect/dsl/package.scala index 54e51868c75..067b6e42ec2 100644 --- a/connector/connect/src/main/scala/org/apache/spark/sql/connect/dsl/package.scala +++ b/connector/connect/src/main/scala/org/apache/spark/sql/connect/dsl/package.scala @@ -180,9 +180,9 @@ package object dsl { object plans { // scalastyle:ignore implicit class DslMockRemoteSession(val session: MockRemoteSession) { def range( - start: Option[Int], - end: Int, - step: Option[Int], + start: Option[Long], + end: Long, + step: Option[Long], numPartitions: Option[Int]): Relation = { val range = proto.Range.newBuilder() if (start.isDefined) { diff --git a/python/pyspark/sql/connect/client.py b/python/pyspark/sql/connect/client.py index f4b6d2ec302..e64d612c53e 100644 --- a/python/pyspark/sql/connect/client.py +++ b/python/pyspark/sql/connect/client.py @@ -32,7 +32,7 @@ import pyspark.sql.types from pyspark import cloudpickle from pyspark.sql.connect.dataframe import DataFrame from pyspark.sql.connect.readwriter import DataFrameReader -from pyspark.sql.connect.plan import SQL +from pyspark.sql.connect.plan import SQL, Range from pyspark.sql.types import DataType, StructType, StructField, LongType, StringType from typing import Optional, Any, Union @@ -145,6 +145,39 @@ class RemoteSparkSession(object): def sql(self, sql_string: str) -> "DataFrame": return DataFrame.withPlan(SQL(sql_string), self) +def range( +self, +start: int, +end: int, +step: Optional[int] = None, +numPartitions: Optional[int] = None, +) -> DataFrame: +""" +Create a :class:`DataFrame` with column named ``id`` and typed Long, +containing elements in a range from ``start`` to ``end`` (exclusive) with +step value ``step``. + +.. versionadded:: 3.4.0 + +Parameters +-- +start : int +the start value +end : int +the end value (exclusive) +step : int, optional +the incremental step (default: 1) +numPartitions : int, optional +the number of parti
[spark] branch master updated: [SPARK-40949][CONNECT][PYTHON] Implement `DataFrame.sortWithinPartitions`
This is an automated email from the ASF dual-hosted git repository. ruifengz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new fb64041cea3 [SPARK-40949][CONNECT][PYTHON] Implement `DataFrame.sortWithinPartitions` fb64041cea3 is described below commit fb64041cea3e094b0807cb580deacc721b302408 Author: Ruifeng Zheng AuthorDate: Tue Nov 1 12:48:47 2022 +0800 [SPARK-40949][CONNECT][PYTHON] Implement `DataFrame.sortWithinPartitions` ### What changes were proposed in this pull request? Implement `DataFrame.sortWithinPartitions` ### Why are the changes needed? for api coverage ### Does this PR introduce _any_ user-facing change? yes, new method ### How was this patch tested? added UT Closes #38423 from zhengruifeng/connect_df_sortWithinPartitions. Authored-by: Ruifeng Zheng Signed-off-by: Ruifeng Zheng --- .../main/protobuf/spark/connect/relations.proto| 1 + .../org/apache/spark/sql/connect/dsl/package.scala | 39 ++ .../sql/connect/planner/SparkConnectPlanner.scala | 2 +- .../connect/planner/SparkConnectPlannerSuite.scala | 22 -- .../connect/planner/SparkConnectProtoSuite.scala | 10 + python/pyspark/sql/connect/dataframe.py| 10 - python/pyspark/sql/connect/plan.py | 18 +--- python/pyspark/sql/connect/proto/relations_pb2.py | 48 +++--- python/pyspark/sql/connect/proto/relations_pb2.pyi | 7 +++- .../sql/tests/connect/test_connect_plan_only.py| 26 10 files changed, 147 insertions(+), 36 deletions(-) diff --git a/connector/connect/src/main/protobuf/spark/connect/relations.proto b/connector/connect/src/main/protobuf/spark/connect/relations.proto index e519a564d5c..e88e70ceb73 100644 --- a/connector/connect/src/main/protobuf/spark/connect/relations.proto +++ b/connector/connect/src/main/protobuf/spark/connect/relations.proto @@ -173,6 +173,7 @@ message Aggregate { message Sort { Relation input = 1; repeated SortField sort_fields = 2; + bool is_global = 3; message SortField { Expression expression = 1; diff --git a/connector/connect/src/main/scala/org/apache/spark/sql/connect/dsl/package.scala b/connector/connect/src/main/scala/org/apache/spark/sql/connect/dsl/package.scala index ae884864006..54e51868c75 100644 --- a/connector/connect/src/main/scala/org/apache/spark/sql/connect/dsl/package.scala +++ b/connector/connect/src/main/scala/org/apache/spark/sql/connect/dsl/package.scala @@ -331,6 +331,45 @@ package object dsl { .build() } + def createDefaultSortField(col: String): Sort.SortField = { +Sort.SortField + .newBuilder() + .setNulls(Sort.SortNulls.SORT_NULLS_FIRST) + .setDirection(Sort.SortDirection.SORT_DIRECTION_ASCENDING) + .setExpression( +Expression.newBuilder + .setUnresolvedAttribute( + Expression.UnresolvedAttribute.newBuilder.setUnparsedIdentifier(col).build()) + .build()) + .build() + } + + def sort(columns: String*): Relation = { +Relation + .newBuilder() + .setSort( +Sort + .newBuilder() + .setInput(logicalPlan) + .addAllSortFields(columns.map(createDefaultSortField).asJava) + .setIsGlobal(true) + .build()) + .build() + } + + def sortWithinPartitions(columns: String*): Relation = { +Relation + .newBuilder() + .setSort( +Sort + .newBuilder() + .setInput(logicalPlan) + .addAllSortFields(columns.map(createDefaultSortField).asJava) + .setIsGlobal(false) + .build()) + .build() + } + def groupBy(groupingExprs: Expression*)(aggregateExprs: Expression*): Relation = { val agg = Aggregate.newBuilder() agg.setInput(logicalPlan) diff --git a/connector/connect/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala b/connector/connect/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala index cb04d6eaf29..eea2579e61f 100644 --- a/connector/connect/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala +++ b/connector/connect/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala @@ -364,7 +364,7 @@ class SparkConnectPlanner(plan: proto.Relation, session: SparkSession) { assert(rel.getSortFieldsCount > 0, "'sort_fields' must be present and contain elements.") logical.Sort( child = transformRelation(rel.getInput), - global = true, + global = rel.getIsGlobal, order = rel.getSortFieldsList.asScala.map(transformSortOrderExpr
[spark-website] branch asf-site updated: Use docker image in the example of SQL/Scala/Java (#424)
This is an automated email from the ASF dual-hosted git repository. ruifengz pushed a commit to branch asf-site in repository https://gitbox.apache.org/repos/asf/spark-website.git The following commit(s) were added to refs/heads/asf-site by this push: new 58384eb35 Use docker image in the example of SQL/Scala/Java (#424) 58384eb35 is described below commit 58384eb357d0705223d741039d32e33529683f1f Author: Ruifeng Zheng AuthorDate: Tue Nov 1 09:33:08 2022 +0800 Use docker image in the example of SQL/Scala/Java (#424) * init * del * address comments * Apply suggestions from code review Co-authored-by: Liang-Chi Hsieh Co-authored-by: Liang-Chi Hsieh --- index.md| 6 +++--- site/index.html | 6 +++--- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/index.md b/index.md index 1aceb4c30..8b2dcd244 100644 --- a/index.md +++ b/index.md @@ -155,7 +155,7 @@ filtered_df.summary().show() Run now -$ SPARK-HOME/bin/spark-sql +$ docker run -it --rm apache/spark /opt/spark/bin/spark-sql spark-sql> @@ -175,7 +175,7 @@ FROM json.`logs.json` Run now -$ SPARK-HOME/bin/spark-shell +$ docker run -it --rm apache/spark /opt/spark/bin/spark-shell scala> @@ -193,7 +193,7 @@ df.where("age > 21") Run now -$ SPARK-HOME/bin/spark-shell +$ docker run -it --rm apache/spark /opt/spark/bin/spark-shell scala> diff --git a/site/index.html b/site/index.html index 4571fe6a6..7c9f69009 100644 --- a/site/index.html +++ b/site/index.html @@ -273,7 +273,7 @@ Run now -$ SPARK-HOME/bin/spark-sql +$ docker run -it --rm apache/spark /opt/spark/bin/spark-sql spark-sql @@ -293,7 +293,7 @@ Run now -$ SPARK-HOME/bin/spark-shell +$ docker run -it --rm apache/spark /opt/spark/bin/spark-shell scala @@ -310,7 +310,7 @@ Run now -$ SPARK-HOME/bin/spark-shell +$ docker run -it --rm apache/spark /opt/spark/bin/spark-shell scala - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-40512][SPARK-40896][PS][INFRA] Upgrade pandas to 1.5.0
This is an automated email from the ASF dual-hosted git repository. ruifengz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new cf086b10de7 [SPARK-40512][SPARK-40896][PS][INFRA] Upgrade pandas to 1.5.0 cf086b10de7 is described below commit cf086b10de784fc92ae8b4d16065823ace520a7a Author: itholic AuthorDate: Fri Oct 28 11:23:55 2022 +0800 [SPARK-40512][SPARK-40896][PS][INFRA] Upgrade pandas to 1.5.0 ### What changes were proposed in this pull request? This PR proposes to upgrade pandas version to 1.5.0 since the new pandas version is released. Please refer to [What's new in 1.5.0](https://pandas.pydata.org/docs/whatsnew/v1.5.0.html) for more detail. ### Why are the changes needed? Pandas API on Spark should follow the latest pandas. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? The existing tests should pass Closes #37955 from itholic/SPARK-40512. Authored-by: itholic Signed-off-by: Ruifeng Zheng --- dev/infra/Dockerfile | 4 ++-- python/pyspark/pandas/base.py | 12 ++-- python/pyspark/pandas/indexes/datetimes.py | 28 ++-- python/pyspark/pandas/strings.py | 4 ++-- python/pyspark/pandas/supported_api_gen.py | 2 +- 5 files changed, 25 insertions(+), 25 deletions(-) diff --git a/dev/infra/Dockerfile b/dev/infra/Dockerfile index 2a70bd3f98f..24bad4db408 100644 --- a/dev/infra/Dockerfile +++ b/dev/infra/Dockerfile @@ -32,7 +32,7 @@ RUN $APT_INSTALL software-properties-common git libxml2-dev pkg-config curl wget RUN update-alternatives --set java /usr/lib/jvm/java-8-openjdk-amd64/jre/bin/java RUN curl -sS https://bootstrap.pypa.io/get-pip.py | python3.9 -RUN python3.9 -m pip install numpy pyarrow 'pandas<=1.4.4' scipy unittest-xml-reporting plotly>=4.8 sklearn 'mlflow>=1.0' coverage matplotlib openpyxl +RUN python3.9 -m pip install numpy pyarrow 'pandas<=1.5.0' scipy unittest-xml-reporting plotly>=4.8 sklearn 'mlflow>=1.0' coverage matplotlib openpyxl RUN add-apt-repository ppa:pypy/ppa RUN apt update @@ -45,7 +45,7 @@ RUN mkdir -p /usr/local/pypy/pypy3.7 && \ ln -sf /usr/local/pypy/pypy3.7/bin/pypy /usr/local/bin/pypy3 RUN curl -sS https://bootstrap.pypa.io/get-pip.py | pypy3 -RUN pypy3 -m pip install numpy 'pandas<=1.4.4' scipy coverage matplotlib +RUN pypy3 -m pip install numpy 'pandas<=1.5.0' scipy coverage matplotlib RUN $APT_INSTALL gnupg ca-certificates pandoc RUN echo 'deb https://cloud.r-project.org/bin/linux/ubuntu focal-cran40/' >> /etc/apt/sources.list diff --git a/python/pyspark/pandas/base.py b/python/pyspark/pandas/base.py index 755e111bff8..350674e5e52 100644 --- a/python/pyspark/pandas/base.py +++ b/python/pyspark/pandas/base.py @@ -866,8 +866,8 @@ class IndexOpsMixin(object, metaclass=ABCMeta): 5False Name: animal, dtype: bool ->>> s.rename("a").to_frame().set_index("a").index.isin(['lama']) -Index([True, False, True, False, True, False], dtype='object', name='a') +>>> s.rename("a").to_frame().set_index("a").index.isin(['lama']) # doctest: +SKIP +Index([True, False, True, False, True, False], dtype='bool', name='a') """ if not is_list_like(values): raise TypeError( @@ -910,8 +910,8 @@ class IndexOpsMixin(object, metaclass=ABCMeta): 2 True dtype: bool ->>> ser.rename("a").to_frame().set_index("a").index.isna() -Index([False, False, True], dtype='object', name='a') +>>> ser.rename("a").to_frame().set_index("a").index.isna() # doctest: +SKIP +Index([False, False, True], dtype='bool', name='a') """ from pyspark.pandas.indexes import MultiIndex @@ -953,8 +953,8 @@ class IndexOpsMixin(object, metaclass=ABCMeta): 2False dtype: bool ->>> ser.rename("a").to_frame().set_index("a").index.notna() -Index([True, True, False], dtype='object', name='a') +>>> ser.rename("a").to_frame().set_index("a").index.notna() # doctest: +SKIP +Index([True, True, False], dtype='bool', name='a') """ from pyspark.pandas.indexes import MultiIndex diff --git a/python/pyspark/pandas/indexes/datetimes.py b/python/pyspark/pandas/indexes/datetimes.py index b4a7c1e8356..3343014c6f8 100644 --- a/python/pyspark/pandas/indexes/datetimes.py +++ b/python/pyspark/pandas/indexes/datetimes.py @@ -284,8 +284,8 @@ class DatetimeIndex(Index): Examples
[spark] branch master updated (8a96f69bb53 -> f0950fea814)
This is an automated email from the ASF dual-hosted git repository. ruifengz pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git from 8a96f69bb53 [SPARK-40874][PYTHON] Fix broadcasts in Python UDFs when encryption enabled add f0950fea814 [SPARK-40878][INFRA] pin 'grpcio==1.48.1' 'protobuf==4.21.6' No new revisions were added by this update. Summary of changes: .github/workflows/build_and_test.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (98f9edabb45 -> 6545a0873df)
This is an automated email from the ASF dual-hosted git repository. ruifengz pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git from 98f9edabb45 [SPARK-40796][CONNECT][FOLLOW-UP] Improve README for proto generated files in Connect Python client add 6545a0873df [SPARK-40796][CONNECT][DOC][FOLLOW-UP] Add check command in Readme No new revisions were added by this update. Summary of changes: python/pyspark/sql/connect/README.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (748fa2792e4 -> 98f9edabb45)
This is an automated email from the ASF dual-hosted git repository. ruifengz pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git from 748fa2792e4 [SPARK-40863][BUILD] Upgrade dropwizard metrics 4.2.12 add 98f9edabb45 [SPARK-40796][CONNECT][FOLLOW-UP] Improve README for proto generated files in Connect Python client No new revisions were added by this update. Summary of changes: dev/check-codegen-python.py | 4 +++- python/pyspark/sql/connect/README.md | 5 + 2 files changed, 8 insertions(+), 1 deletion(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-40853][INFRA] Pin `mypy-protobuf==3.3.0`
This is an automated email from the ASF dual-hosted git repository. ruifengz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 0643d02e4f0 [SPARK-40853][INFRA] Pin `mypy-protobuf==3.3.0` 0643d02e4f0 is described below commit 0643d02e4f03cdadb53efc05af0b6533d22db297 Author: Ruifeng Zheng AuthorDate: Thu Oct 20 17:41:16 2022 +0800 [SPARK-40853][INFRA] Pin `mypy-protobuf==3.3.0` ### What changes were proposed in this pull request? [`mypy-protobuf` 3.4.0](https://pypi.org/project/mypy-protobuf/#history) is just released, and will break master shortly ### Why are the changes needed? to make CI happy ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? manually check Closes #38316 from zhengruifeng/infra_ping_mypy_protobuf. Authored-by: Ruifeng Zheng Signed-off-by: Ruifeng Zheng --- .github/workflows/build_and_test.yml | 2 +- dev/requirements.txt | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/build_and_test.yml b/.github/workflows/build_and_test.yml index a3998b330c1..12a1ad0e71e 100644 --- a/.github/workflows/build_and_test.yml +++ b/.github/workflows/build_and_test.yml @@ -587,7 +587,7 @@ jobs: # See also https://issues.apache.org/jira/browse/SPARK-38279. python3.9 -m pip install 'sphinx<3.1.0' mkdocs pydata_sphinx_theme ipython nbsphinx numpydoc 'jinja2<3.0.0' 'markupsafe==2.0.1' 'pyzmq<24.0.0' python3.9 -m pip install ipython_genutils # See SPARK-38517 -python3.9 -m pip install sphinx_plotly_directive 'numpy>=1.20.0' pyarrow pandas 'plotly>=4.8' grpcio protobuf mypy-protobuf +python3.9 -m pip install sphinx_plotly_directive 'numpy>=1.20.0' pyarrow pandas 'plotly>=4.8' grpcio protobuf 'mypy-protobuf==3.3.0' python3.9 -m pip install 'docutils<0.18.0' # See SPARK-39421 apt-get update -y apt-get install -y ruby ruby-dev diff --git a/dev/requirements.txt b/dev/requirements.txt index 651fc280627..fa4b6752f14 100644 --- a/dev/requirements.txt +++ b/dev/requirements.txt @@ -51,4 +51,4 @@ grpcio==1.48.1 protobuf==4.21.6 # Spark Connect python proto generation plugin (optional) -mypy-protobuf +mypy-protobuf==3.3.0 - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-40556][PS][SQL] AQE clean up resources after each stage and eagerly clean Pandas Index Cache
This is an automated email from the ASF dual-hosted git repository. ruifengz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 42790905668 [SPARK-40556][PS][SQL] AQE clean up resources after each stage and eagerly clean Pandas Index Cache 42790905668 is described below commit 42790905668effc2c0c081bae7d081faa1e18424 Author: Ruifeng Zheng AuthorDate: Mon Oct 17 19:22:51 2022 +0800 [SPARK-40556][PS][SQL] AQE clean up resources after each stage and eagerly clean Pandas Index Cache ### What changes were proposed in this pull request? 1. On the SQL side: - Make AQE explicitly invoke `stage.cleanupResources()` at driver after each stage finished. Existing AQE's `cleanupResources` has following limitations: -- method `cleanupResources` is only implemented in `SortExec`, and it is only called in `SortMergeJoinExec`; so a physical operator's `cleanupResources` can be called iff it is a descendant of `SortMergeJoinExec`; -- the above invocations in `SortMergeJoinScanner` happens in the executors side, so a physical operator's `cleanupResources` will never be invoked at driver; - Invoke `plan.cleanupResources()` in `QueryStageExec`, since itself is a leaf; 2. On the Pandas-API-on-Spark side: - apply `persist` instead of `localCheckpoint ` - add a new config `compute. default_index_cache` to control the storage level of temporary RDDs; - unpersist the cached RDD in `cleanupResources` in `AttachDistributedSequenceExec` ### Why are the changes needed? `distributed_sequence` is the default indexing in Pandas-API-on-Spark, it will `localCheckpoint` (also cache internally) a temporary RDD to avoid re-computation. For large-scale dataset, it is prone to fail due to unreliable checkpointing: ``` Caused by: org.apache.spark.SparkException: Checkpoint block rdd_448_38 not found! Either the executor that originally checkpointed this partition is no longer alive, or the original RDD is unpersisted. If this problem persists, you may consider using `rdd.checkpoint()` instead, which is slower than local checkpointing but more fault-tolerant. at org.apache.spark.errors.SparkCoreErrors$.checkpointRDDBlockIdNotFoundError(SparkCoreErrors.scala:82) at ``` we should use `persist` instead, and clean the cached RDDs ASAP. ### Does this PR introduce _any_ user-facing change? yes, new config `compute. default_index_cache ` ### How was this patch tested? added UT Closes #38130 from zhengruifeng/ps_indexing_cleanup. Authored-by: Ruifeng Zheng Signed-off-by: Ruifeng Zheng --- .../source/user_guide/pandas_on_spark/options.rst | 11 +++- python/pyspark/pandas/config.py| 37 python/pyspark/pandas/tests/test_default_index.py | 53 + .../execution/adaptive/AdaptiveSparkPlanExec.scala | 2 + .../sql/execution/adaptive/QueryStageExec.scala| 5 ++ .../python/AttachDistributedSequenceExec.scala | 67 +++--- 6 files changed, 167 insertions(+), 8 deletions(-) diff --git a/python/docs/source/user_guide/pandas_on_spark/options.rst b/python/docs/source/user_guide/pandas_on_spark/options.rst index 67b8f6841f5..cbad934c844 100644 --- a/python/docs/source/user_guide/pandas_on_spark/options.rst +++ b/python/docs/source/user_guide/pandas_on_spark/options.rst @@ -271,6 +271,14 @@ compute.ops_on_diff_frames False This determines whether that method throws an exception. compute.default_index_type 'distributed-sequence' This sets the default index type: sequence, distributed and distributed-sequence. +compute.default_index_cache 'MEMORY_AND_DISK_SER' This sets the default storage level for temporary +RDDs cached in distributed-sequence indexing: 'NONE', +'DISK_ONLY', 'DISK_ONLY_2', 'DISK_ONLY_3', +'MEMORY_ONLY', 'MEMORY_ONLY_2', 'MEMORY_ONLY_SER', +'MEMORY_ONLY_SER_2', 'MEMORY_AND_DISK', +'MEMORY_AND_DISK_2', 'MEMORY_AND_DISK_SER', + 'MEMORY_AND_DISK_SER_2', 'OFF_HEAP', +'LOCAL_CHECKPOINT'. compute.ordered_headFalse 'compute.ordered_head' sets whether or not to operate head with natural ordering. pandas-on-Spark does
[spark] branch master updated: [SPARK-40796][CONNECT][BUILD] Check the generated python protos in GitHub Actions
This is an automated email from the ASF dual-hosted git repository. ruifengz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 014718f8923 [SPARK-40796][CONNECT][BUILD] Check the generated python protos in GitHub Actions 014718f8923 is described below commit 014718f8923d4bbaf85dd9de31b87adacac1e9a8 Author: Ruifeng Zheng AuthorDate: Mon Oct 17 14:19:59 2022 +0800 [SPARK-40796][CONNECT][BUILD] Check the generated python protos in GitHub Actions ### What changes were proposed in this pull request? Check the generated python protos in GitHub Actions ### Why are the changes needed? 1, `generate_protos.sh`: - add an optional argument to specify the output path; - update `sed -i ''` commands to make it works on both MacOS and Linux; 2, `dev/check-codegen-python.py`: - can run it locally to check whether the generated files are out of sync; - add it to CI to detect inconsistent protos; ![image](https://user-images.githubusercontent.com/7322292/195820972-c254974b-02de-43d7-a13d-fc532cd472fd.png) 3, python lint - after install `mypy-protobuf`, need to fix lint in `python/pyspark/sql/connect/plan.py` ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? manually tests and CI Closes #38253 from zhengruifeng/dev_check_codegen. Authored-by: Ruifeng Zheng Signed-off-by: Ruifeng Zheng --- .github/workflows/build_and_test.yml | 12 - connector/connect/dev/generate_protos.sh | 24 +++-- dev/check-codegen-python.py | 84 python/pyspark/sql/connect/plan.py | 16 +++--- 4 files changed, 124 insertions(+), 12 deletions(-) diff --git a/.github/workflows/build_and_test.yml b/.github/workflows/build_and_test.yml index 46e3524b38c..134cdae7029 100644 --- a/.github/workflows/build_and_test.yml +++ b/.github/workflows/build_and_test.yml @@ -556,6 +556,12 @@ jobs: # See also https://issues.apache.org/jira/browse/SPARK-35375. python3.9 -m pip install 'flake8==3.9.0' pydata_sphinx_theme 'mypy==0.920' 'pytest-mypy-plugins==1.9.3' numpydoc 'jinja2<3.0.0' 'black==22.6.0' python3.9 -m pip install 'pandas-stubs==1.2.0.53' +- name: Install dependencies for Python code generation check + run: | +# See more in "Installation" https://docs.buf.build/installation#tarball +curl -LO https://github.com/bufbuild/buf/releases/download/v1.8.0/buf-Linux-x86_64.tar.gz +mkdir -p $HOME/buf +tar -xvzf buf-Linux-x86_64.tar.gz -C $HOME/buf --strip-components 1 - name: Install R linter dependencies and SparkR run: | apt update @@ -565,7 +571,7 @@ jobs: Rscript -e "install.packages(c('devtools'), repos='https://cloud.r-project.org/')" Rscript -e "devtools::install_version('lintr', version='2.0.1', repos='https://cloud.r-project.org')" ./R/install-dev.sh -- name: Instll JavaScript linter dependencies +- name: Install JavaScript linter dependencies run: | apt update apt-get install -y nodejs npm @@ -581,7 +587,7 @@ jobs: # See also https://issues.apache.org/jira/browse/SPARK-38279. python3.9 -m pip install 'sphinx<3.1.0' mkdocs pydata_sphinx_theme ipython nbsphinx numpydoc 'jinja2<3.0.0' 'markupsafe==2.0.1' 'pyzmq<24.0.0' python3.9 -m pip install ipython_genutils # See SPARK-38517 -python3.9 -m pip install sphinx_plotly_directive 'numpy>=1.20.0' pyarrow pandas 'plotly>=4.8' grpcio protobuf +python3.9 -m pip install sphinx_plotly_directive 'numpy>=1.20.0' pyarrow pandas 'plotly>=4.8' grpcio protobuf mypy-protobuf python3.9 -m pip install 'docutils<0.18.0' # See SPARK-39421 apt-get update -y apt-get install -y ruby ruby-dev @@ -601,6 +607,8 @@ jobs: run: ./dev/lint-java - name: Python linter run: PYTHON_EXECUTABLE=python3.9 ./dev/lint-python +- name: Python code generation check + run: PATH=$PATH:$HOME/buf/bin PYTHON_EXECUTABLE=python3.9 ./dev/check-codegen-python.py - name: R linter run: ./dev/lint-r - name: JS linter diff --git a/connector/connect/dev/generate_protos.sh b/connector/connect/dev/generate_protos.sh index 9457e7b33ed..d327fdb0dac 100755 --- a/connector/connect/dev/generate_protos.sh +++ b/connector/connect/dev/generate_protos.sh @@ -18,9 +18,24 @@ # set -ex +if [[ $# -gt 1 ]]; then + echo "Illegal number of parameters." + echo "Usage: ./connector/connect/dev/generate_protos.sh [path]" + exit -1 +fi + + SPARK_HOME="$(cd "`dirname $0`"/../../..; pwd)" cd "$SPARK_HOME" + +OUTPUT_PATH=${SPARK_HOME}/python/pys
[spark] branch master updated (116f6a719ea -> 633c333aa50)
This is an automated email from the ASF dual-hosted git repository. ruifengz pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git from 116f6a719ea [SPARK-40534][CONNECT][FOLLOW-UP] Support all join types in python client add 633c333aa50 [SPARK-40737][CONNECT][FOLLOW-UP] Regenerate protos for `DataFrameWriter` No new revisions were added by this update. Summary of changes: python/pyspark/sql/connect/proto/commands_pb2.py | 26 ++- python/pyspark/sql/connect/proto/commands_pb2.pyi | 184 +- 2 files changed, 200 insertions(+), 10 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-40534][CONNECT][FOLLOW-UP] Support all join types in python client
This is an automated email from the ASF dual-hosted git repository. ruifengz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 116f6a719ea [SPARK-40534][CONNECT][FOLLOW-UP] Support all join types in python client 116f6a719ea is described below commit 116f6a719eaf1220d8eed98433f724fca672acc6 Author: Rui Wang AuthorDate: Mon Oct 17 10:27:58 2022 +0800 [SPARK-40534][CONNECT][FOLLOW-UP] Support all join types in python client ### What changes were proposed in this pull request? Following up https://github.com/apache/spark/pull/38157, add all join types support for python. Please note that in the PR we decided to not support CROSS Join now. ### Why are the changes needed? Finish the join type support in Connect. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? UT Closes #38243 from amaliujia/python_join_types. Authored-by: Rui Wang Signed-off-by: Ruifeng Zheng --- python/pyspark/sql/connect/dataframe.py| 2 +- python/pyspark/sql/connect/plan.py | 32 +++--- .../sql/tests/connect/test_connect_select_ops.py | 18 +++- 3 files changed, 46 insertions(+), 6 deletions(-) diff --git a/python/pyspark/sql/connect/dataframe.py b/python/pyspark/sql/connect/dataframe.py index bb74282cfda..31215b4da79 100644 --- a/python/pyspark/sql/connect/dataframe.py +++ b/python/pyspark/sql/connect/dataframe.py @@ -183,7 +183,7 @@ class DataFrame(object): return self.toPandas() # TODO(martin.grund) fix mypu -def join(self, other: "DataFrame", on: Any, how: Any = None) -> "DataFrame": +def join(self, other: "DataFrame", on: Any, how: Optional[str] = None) -> "DataFrame": if self._plan is None: raise Exception("Cannot join when self._plan is empty.") if other._plan is None: diff --git a/python/pyspark/sql/connect/plan.py b/python/pyspark/sql/connect/plan.py index 67ed6b964fa..486778b9d37 100644 --- a/python/pyspark/sql/connect/plan.py +++ b/python/pyspark/sql/connect/plan.py @@ -368,21 +368,45 @@ class Join(LogicalPlan): left: Optional["LogicalPlan"], right: "LogicalPlan", on: "ColumnOrString", -how: proto.Join.JoinType.ValueType = proto.Join.JoinType.JOIN_TYPE_INNER, +how: Optional[str], ) -> None: super().__init__(left) self.left = cast(LogicalPlan, left) self.right = right self.on = on if how is None: -how = proto.Join.JoinType.JOIN_TYPE_INNER -self.how = how +join_type = proto.Join.JoinType.JOIN_TYPE_INNER +elif how == "inner": +join_type = proto.Join.JoinType.JOIN_TYPE_INNER +elif how in ["outer", "full", "fullouter"]: +join_type = proto.Join.JoinType.JOIN_TYPE_FULL_OUTER +elif how in ["leftouter", "left"]: +join_type = proto.Join.JoinType.JOIN_TYPE_LEFT_OUTER +elif how in ["rightouter", "right"]: +join_type = proto.Join.JoinType.JOIN_TYPE_RIGHT_OUTER +elif how in ["leftsemi", "semi"]: +join_type = proto.Join.JoinType.JOIN_TYPE_LEFT_SEMI +elif how in ["leftanti", "anti"]: +join_type = proto.Join.JoinType.JOIN_TYPE_LEFT_ANTI +else: +raise NotImplementedError( +""" +Unsupported join type: %s. Supported join types include: +"inner", "outer", "full", "fullouter", "full_outer", +"leftouter", "left", "left_outer", "rightouter", +"right", "right_outer", "leftsemi", "left_semi", +"semi", "leftanti", "left_anti", "anti", +""" +% how +) +self.how = join_type def plan(self, session: Optional["RemoteSparkSession"]) -> proto.Relation: rel = proto.Relation() rel.join.left.CopyFrom(self.left.plan(session)) rel.join.right.CopyFrom(self.right.plan(session)) -rel.join.on.CopyFrom(self.to_attr_or_expression(self.on, session)) +rel.join.join_condition.CopyFrom(self.to_attr_or_expression(self.on, session)) +rel.join.join_type = self.how return rel def print(self, indent: int = 0) -> str: diff --git a/python/pyspark/sql/tests/connect/test_connect_select_ops.py b/python/pyspar
[spark] branch master updated: [SPARK-40557][CONNECT][FOLLOW-UP] Sync python generated proto files
This is an automated email from the ASF dual-hosted git repository. ruifengz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 39e08b10c24 [SPARK-40557][CONNECT][FOLLOW-UP] Sync python generated proto files 39e08b10c24 is described below commit 39e08b10c246ca1d47ed6adb8992802bd1113657 Author: Rui Wang AuthorDate: Fri Oct 14 12:03:11 2022 +0800 [SPARK-40557][CONNECT][FOLLOW-UP] Sync python generated proto files ### What changes were proposed in this pull request? This PR syncs python generated proto files. The proto changes in this file is generated by https://github.com/apache/spark/blob/master/connector/connect/dev/generate_protos.sh. ### Why are the changes needed? Python client side proto files are out of sync. Other python related PRs needs to re-generate proto files which has caused troubles on code review. We are looking for ways to automatically keep the python proto files in sync. Before that is done, we need to manually update the proto files. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? UT Closes #38244 from amaliujia/sync_python_proto. Authored-by: Rui Wang Signed-off-by: Ruifeng Zheng --- python/pyspark/sql/connect/plan.py | 4 +- python/pyspark/sql/connect/proto/commands_pb2.py | 8 +- python/pyspark/sql/connect/proto/commands_pb2.pyi | 8 +- .../pyspark/sql/connect/proto/expressions_pb2.py | 62 +++--- .../pyspark/sql/connect/proto/expressions_pb2.pyi | 90 +++- python/pyspark/sql/connect/proto/relations_pb2.py | 82 python/pyspark/sql/connect/proto/relations_pb2.pyi | 164 --- python/pyspark/sql/connect/proto/types_pb2.py | 102 - python/pyspark/sql/connect/proto/types_pb2.pyi | 232 +++-- 9 files changed, 424 insertions(+), 328 deletions(-) diff --git a/python/pyspark/sql/connect/plan.py b/python/pyspark/sql/connect/plan.py index 09f6680a416..67ed6b964fa 100644 --- a/python/pyspark/sql/connect/plan.py +++ b/python/pyspark/sql/connect/plan.py @@ -319,9 +319,9 @@ class Aggregate(LogicalPlan): def _convert_measure( self, m: MeasureType, session: Optional["RemoteSparkSession"] -) -> proto.Aggregate.Measure: +) -> proto.Aggregate.AggregateFunction: exp, fun = m -measure = proto.Aggregate.Measure() +measure = proto.Aggregate.AggregateFunction() measure.function.name = fun if type(exp) is str: measure.function.arguments.append(self.unresolved_attr(exp)) diff --git a/python/pyspark/sql/connect/proto/commands_pb2.py b/python/pyspark/sql/connect/proto/commands_pb2.py index 46d405dd008..875f5d02db2 100644 --- a/python/pyspark/sql/connect/proto/commands_pb2.py +++ b/python/pyspark/sql/connect/proto/commands_pb2.py @@ -32,7 +32,7 @@ from pyspark.sql.connect.proto import types_pb2 as spark_dot_connect_dot_types__ DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile( - b'\n\x1cspark/connect/commands.proto\x12\rspark.connect\x1a\x19spark/connect/types.proto"i\n\x07\x43ommand\x12N\n\x0f\x63reate_function\x18\x01 \x01(\x0b\x32#.spark.connect.CreateScalarFunctionH\x00R\x0e\x63reateFunctionB\x0e\n\x0c\x63ommand_type"\x8f\x04\n\x14\x43reateScalarFunction\x12\x14\n\x05parts\x18\x01 \x03(\tR\x05parts\x12P\n\x08language\x18\x02 \x01(\x0e\x32\x34.spark.connect.CreateScalarFunction.FunctionLanguageR\x08language\x12\x1c\n\ttemporary\x18\x03 \x01(\x08R\ttempora [...] + b'\n\x1cspark/connect/commands.proto\x12\rspark.connect\x1a\x19spark/connect/types.proto"i\n\x07\x43ommand\x12N\n\x0f\x63reate_function\x18\x01 \x01(\x0b\x32#.spark.connect.CreateScalarFunctionH\x00R\x0e\x63reateFunctionB\x0e\n\x0c\x63ommand_type"\x97\x04\n\x14\x43reateScalarFunction\x12\x14\n\x05parts\x18\x01 \x03(\tR\x05parts\x12P\n\x08language\x18\x02 \x01(\x0e\x32\x34.spark.connect.CreateScalarFunction.FunctionLanguageR\x08language\x12\x1c\n\ttemporary\x18\x03 \x01(\x08R\ttempora [...] ) _builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, globals()) @@ -44,7 +44,7 @@ if _descriptor._USE_C_DESCRIPTORS == False: _COMMAND._serialized_start = 74 _COMMAND._serialized_end = 179 _CREATESCALARFUNCTION._serialized_start = 182 -_CREATESCALARFUNCTION._serialized_end = 709 -_CREATESCALARFUNCTION_FUNCTIONLANGUAGE._serialized_start = 547 -_CREATESCALARFUNCTION_FUNCTIONLANGUAGE._serialized_end = 686 +_CREATESCALARFUNCTION._serialized_end = 717 +_CREATESCALARFUNCTION_FUNCTIONLANGUAGE._serialized_start = 555 +_CREATESCALARFUNCTION_FUNCTIONLANGUAGE._serialized_end = 694 # @@protoc_insertion_point(module_scope) diff --git a/python/pyspark/sql/connect/proto/commands_pb2.pyi b/python/pyspark/sql/connect/prot
[spark] branch master updated: [SPARK-40724][PS][FOLLOW-UP] Simplify `corrwith` with method `inline`
This is an automated email from the ASF dual-hosted git repository. ruifengz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 45d537ddeba [SPARK-40724][PS][FOLLOW-UP] Simplify `corrwith` with method `inline` 45d537ddeba is described below commit 45d537ddeba2835449972d08a5a65d8276ec2978 Author: Ruifeng Zheng AuthorDate: Thu Oct 13 08:47:29 2022 +0800 [SPARK-40724][PS][FOLLOW-UP] Simplify `corrwith` with method `inline` ### What changes were proposed in this pull request? Use `inline` instead of `explode` in `corrwith` ### Why are the changes needed? do not need the temporary column ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? existing UTs Closes #38221 from zhengruifeng/ps_df_corrwith_inline. Authored-by: Ruifeng Zheng Signed-off-by: Ruifeng Zheng --- python/pyspark/pandas/frame.py | 11 +-- 1 file changed, 1 insertion(+), 10 deletions(-) diff --git a/python/pyspark/pandas/frame.py b/python/pyspark/pandas/frame.py index 1aa94623ac3..835c13d6fdd 100644 --- a/python/pyspark/pandas/frame.py +++ b/python/pyspark/pandas/frame.py @@ -1747,7 +1747,6 @@ class DataFrame(Frame, Generic[T]): sdf = combined._internal.spark_frame index_col_name = verify_temp_column_name(sdf, "__corrwith_index_temp_column__") -tuple_col_name = verify_temp_column_name(sdf, "__corrwith_tuple_temp_column__") this_numeric_column_labels: List[Label] = [] for column_label in this._internal.column_labels: @@ -1797,15 +1796,7 @@ class DataFrame(Frame, Generic[T]): ) if len(pair_scols) > 0: -sdf = sdf.select(F.explode(F.array(*pair_scols)).alias(tuple_col_name)).select( - F.col(f"{tuple_col_name}.{index_col_name}").alias(index_col_name), -F.col(f"{tuple_col_name}.{CORRELATION_VALUE_1_COLUMN}").alias( -CORRELATION_VALUE_1_COLUMN -), -F.col(f"{tuple_col_name}.{CORRELATION_VALUE_2_COLUMN}").alias( -CORRELATION_VALUE_2_COLUMN -), -) +sdf = sdf.select(F.inline(F.array(*pair_scols))) sdf = compute(sdf=sdf, groupKeys=[index_col_name], method=method).select( index_col_name, CORRELATION_CORR_OUTPUT_COLUMN - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (cb53e34343b -> 18f1f53d303)
This is an automated email from the ASF dual-hosted git repository. ruifengz pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git from cb53e34343b [MINOR][BUILD] Add `dist`dir to `fileset` of`maven-clean-plugin` add 18f1f53d303 [SPARK-38774][PS][FOLLOW-UP] Make parameter name in `Series.autocorr` consistent with Pandas No new revisions were added by this update. Summary of changes: python/pyspark/pandas/series.py| 12 ++-- python/pyspark/pandas/tests/test_series.py | 2 +- 2 files changed, 7 insertions(+), 7 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (67c6408f133 -> f8d68b00071)
This is an automated email from the ASF dual-hosted git repository. ruifengz pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git from 67c6408f133 [SPARK-40534][CONNECT] Extend the support for Join with different join types add f8d68b00071 [SPARK-40725][INFRA][FOLLOWUP] Mark mypy-protobuf as an optional dependency for Spark Connect No new revisions were added by this update. Summary of changes: dev/requirements.txt | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (d323fa8fc07 -> 76a7c5a3625)
This is an automated email from the ASF dual-hosted git repository. ruifengz pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git from d323fa8fc07 [SPARK-40448][CONNECT][FOLLOWUP] Use more suitable variable name and fix code style add 76a7c5a3625 [SPARK-40724][PS] Simplify `corr` with method `inline` No new revisions were added by this update. Summary of changes: python/pyspark/pandas/frame.py | 23 +++ 1 file changed, 7 insertions(+), 16 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (e2176338c9b -> 03b055fa4e5)
This is an automated email from the ASF dual-hosted git repository. ruifengz pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git from e2176338c9b [SPARK-40665][CONNECT] Avoid embedding Spark Connect in the Apache Spark binary release add 03b055fa4e5 [SPARK-40643][PS] Implement `min_count` in `GroupBy.last` No new revisions were added by this update. Summary of changes: python/pyspark/pandas/groupby.py| 48 + python/pyspark/pandas/tests/test_groupby.py | 17 ++ 2 files changed, 59 insertions(+), 6 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (77f5c8442b1 -> 7bcf4b9b8d0)
This is an automated email from the ASF dual-hosted git repository. ruifengz pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git from 77f5c8442b1 [SPARK-40577][PS] Fix `CategoricalIndex.append` to match pandas 1.5.0 add 7bcf4b9b8d0 [SPARK-40606][PS][TEST] Eliminate `to_pandas` warnings in test No new revisions were added by this update. Summary of changes: .../pandas/tests/data_type_ops/test_binary_ops.py | 2 +- .../pandas/tests/data_type_ops/test_boolean_ops.py | 2 +- .../tests/data_type_ops/test_categorical_ops.py| 2 +- .../pandas/tests/data_type_ops/test_complex_ops.py | 2 +- .../pandas/tests/data_type_ops/test_date_ops.py| 2 +- .../tests/data_type_ops/test_datetime_ops.py | 2 +- .../pandas/tests/data_type_ops/test_null_ops.py| 2 +- .../pandas/tests/data_type_ops/test_num_ops.py | 6 +-- .../pandas/tests/data_type_ops/test_string_ops.py | 4 +- .../tests/data_type_ops/test_timedelta_ops.py | 2 +- .../pandas/tests/data_type_ops/test_udt_ops.py | 2 +- python/pyspark/pandas/tests/indexes/test_base.py | 16 +++ .../pandas/tests/plot/test_frame_plot_plotly.py| 2 +- .../tests/plot/test_series_plot_matplotlib.py | 2 +- .../pandas/tests/plot/test_series_plot_plotly.py | 6 +-- python/pyspark/pandas/tests/test_dataframe.py | 52 +++--- .../pandas/tests/test_dataframe_spark_io.py| 8 ++-- python/pyspark/pandas/tests/test_default_index.py | 2 +- python/pyspark/pandas/tests/test_groupby.py| 6 +-- python/pyspark/pandas/tests/test_indexing.py | 4 +- .../pandas/tests/test_ops_on_diff_frames.py| 8 ++-- python/pyspark/pandas/tests/test_repr.py | 48 ++-- python/pyspark/pandas/tests/test_series.py | 4 +- .../pyspark/pandas/tests/test_series_datetime.py | 6 +-- 24 files changed, 97 insertions(+), 95 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-40589][PS][TEST] Fix test for `DataFrame.corr_with` skip the pandas regression
This is an automated email from the ASF dual-hosted git repository. ruifengz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new e617503c3f0 [SPARK-40589][PS][TEST] Fix test for `DataFrame.corr_with` skip the pandas regression e617503c3f0 is described below commit e617503c3f06be9eea0af529bab7984fc07e87a2 Author: itholic AuthorDate: Fri Sep 30 09:45:57 2022 +0800 [SPARK-40589][PS][TEST] Fix test for `DataFrame.corr_with` skip the pandas regression ### What changes were proposed in this pull request? This PR proposes to skip the `DataFrame.corr_with` test when the `other` is `pyspark.pandas.Series` and the `method` is "spearman" or "pearson", since there is regression in pandas 1.5.0 for that cases. ### Why are the changes needed? There are some regressions in pandas 1.5.0, so we're not going to match the behavior for those cases. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Manually tested with pandas 1.5.0, confirmed the test pass. Closes #38031 from itholic/SPARK-40589. Authored-by: itholic Signed-off-by: Ruifeng Zheng --- python/pyspark/pandas/tests/test_dataframe.py | 9 - 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/python/pyspark/pandas/tests/test_dataframe.py b/python/pyspark/pandas/tests/test_dataframe.py index 5da0974c906..dfac3c6d1b8 100644 --- a/python/pyspark/pandas/tests/test_dataframe.py +++ b/python/pyspark/pandas/tests/test_dataframe.py @@ -6076,7 +6076,14 @@ class DataFrameTest(ComparisonTestBase, SQLTestUtils): def _test_corrwith(self, psdf, psobj): pdf = psdf.to_pandas() pobj = psobj.to_pandas() -for method in ["pearson", "spearman", "kendall"]: +# Regression in pandas 1.5.0 when other is Series and method is "pearson" or "spearman" +# See https://github.com/pandas-dev/pandas/issues/48826 for the reported issue, +# and https://github.com/pandas-dev/pandas/pull/46174 for the initial PR that causes. +if LooseVersion(pd.__version__) >= LooseVersion("1.5.0") and isinstance(pobj, pd.Series): +methods = ["kendall"] +else: +methods = ["pearson", "spearman", "kendall"] +for method in methods: for drop in [True, False]: p_corr = pdf.corrwith(pobj, drop=drop, method=method) ps_corr = psdf.corrwith(psobj, drop=drop, method=method) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (ab49dc21e43 -> 728f7e9841c)
This is an automated email from the ASF dual-hosted git repository. ruifengz pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git from ab49dc21e43 [SPARK-39853][CORE] Support stage level task resource profile for standalone cluster when dynamic allocation disabled add 728f7e9841c [SPARK-40314][SQL][PYTHON] Add scala and python bindings for inline and inline_outer No new revisions were added by this update. Summary of changes: .../source/reference/pyspark.sql/functions.rst | 2 + python/pyspark/sql/functions.py| 76 ++ python/pyspark/sql/tests/test_functions.py | 16 + .../scala/org/apache/spark/sql/functions.scala | 17 + .../apache/spark/sql/GeneratorFunctionSuite.scala | 32 - 5 files changed, 127 insertions(+), 16 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (31aadc4687c -> 7e5a87f0128)
This is an automated email from the ASF dual-hosted git repository. ruifengz pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git from 31aadc4687c [SPARK-40580][PS][DOCS] Update the document for `DataFrame.to_orc` add 7e5a87f0128 [SPARK-40604][PS] Verify the temporary column names No new revisions were added by this update. Summary of changes: python/pyspark/pandas/frame.py | 10 +- python/pyspark/pandas/series.py | 9 + 2 files changed, 10 insertions(+), 9 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (38599e9a368 -> c28740a204c)
This is an automated email from the ASF dual-hosted git repository. ruifengz pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git from 38599e9a368 [SPARK-40509][SS][PYTHON] Add example for applyInPandasWithState add c28740a204c [SPARK-40592][PS] Implement `min_count` in `GroupBy.max` No new revisions were added by this update. Summary of changes: python/pyspark/pandas/groupby.py| 42 ++--- python/pyspark/pandas/tests/test_groupby.py | 2 ++ 2 files changed, 40 insertions(+), 4 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-40575][DOCS] Add badges for PySpark downloads
This is an automated email from the ASF dual-hosted git repository. ruifengz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 8e0a332bf0e [SPARK-40575][DOCS] Add badges for PySpark downloads 8e0a332bf0e is described below commit 8e0a332bf0e7357ddc97ad8d99caf78ae567981b Author: Ruifeng Zheng AuthorDate: Wed Sep 28 14:00:24 2022 +0800 [SPARK-40575][DOCS] Add badges for PySpark downloads ### What changes were proposed in this pull request? Add badges for PySpark downloads ### Why are the changes needed? famous python projects like [pandas](https://github.com/pandas-dev/pandas#pandas-powerful-python-data-analysis-toolkit) contains these badges ### Does this PR introduce _any_ user-facing change? https://user-images.githubusercontent.com/7322292/192681316-cbaa7f02-6a9c-46d0-a290-b3ee6778fff4.png;> ### How was this patch tested? manually check Closes #38014 from zhengruifeng/doc_pyspark_downloads. Authored-by: Ruifeng Zheng Signed-off-by: Ruifeng Zheng --- README.md | 1 + 1 file changed, 1 insertion(+) diff --git a/README.md b/README.md index f7bc1994fc8..310df41f465 100644 --- a/README.md +++ b/README.md @@ -12,6 +12,7 @@ and Structured Streaming for stream processing. [![GitHub Actions Build](https://github.com/apache/spark/actions/workflows/build_main.yml/badge.svg)](https://github.com/apache/spark/actions/workflows/build_main.yml) [![AppVeyor Build](https://img.shields.io/appveyor/ci/ApacheSoftwareFoundation/spark/master.svg?style=plastic=appveyor)](https://ci.appveyor.com/project/ApacheSoftwareFoundation/spark) [![PySpark Coverage](https://codecov.io/gh/apache/spark/branch/master/graph/badge.svg)](https://codecov.io/gh/apache/spark) +[![PyPI Downloads](https://static.pepy.tech/personalized-badge/pyspark?period=month=international_system_color=black_color=orange_text=PyPI%20downloads)](https://pypi.org/project/pyspark/) ## Online Documentation - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (f2ba6b55c1d -> 311a855884c)
This is an automated email from the ASF dual-hosted git repository. ruifengz pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git from f2ba6b55c1d [SPARK-40573][PS] Make `ddof` in `GroupBy.std`, `GroupBy.var` and `GroupBy.sem` accept arbitary integers add 311a855884c [MINOR] Clarify that xxhash64 seed is 42 No new revisions were added by this update. Summary of changes: R/pkg/R/functions.R| 2 +- python/pyspark/sql/functions.py| 2 +- .../main/scala/org/apache/spark/sql/catalyst/expressions/hash.scala| 3 ++- sql/core/src/main/scala/org/apache/spark/sql/functions.scala | 2 +- 4 files changed, 5 insertions(+), 4 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (072575c9e6f -> f2ba6b55c1d)
This is an automated email from the ASF dual-hosted git repository. ruifengz pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git from 072575c9e6f [SPARK-40557][CONNECT] Update generated proto files for Spark Connect add f2ba6b55c1d [SPARK-40573][PS] Make `ddof` in `GroupBy.std`, `GroupBy.var` and `GroupBy.sem` accept arbitary integers No new revisions were added by this update. Summary of changes: python/pyspark/pandas/groupby.py| 41 ++--- python/pyspark/pandas/tests/test_groupby.py | 2 +- 2 files changed, 27 insertions(+), 16 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-40561][PS] Implement `min_count` in `GroupBy.min`
This is an automated email from the ASF dual-hosted git repository. ruifengz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 211ce40888d [SPARK-40561][PS] Implement `min_count` in `GroupBy.min` 211ce40888d is described below commit 211ce40888dcaaa3c3ffbd316109e17d0caad4e3 Author: Ruifeng Zheng AuthorDate: Tue Sep 27 09:53:09 2022 +0800 [SPARK-40561][PS] Implement `min_count` in `GroupBy.min` ### What changes were proposed in this pull request? Implement `min_count` in `GroupBy.min` ### Why are the changes needed? for API coverage ### Does this PR introduce _any_ user-facing change? yes, new parameter `min_count` supported ``` >>> df.groupby("D").min(min_count=3).sort_index() A BC D a 1.0 False 3.0 b NaN None NaN ``` ### How was this patch tested? added UT and doctest Closes #37998 from zhengruifeng/ps_groupby_min. Authored-by: Ruifeng Zheng Signed-off-by: Ruifeng Zheng --- python/pyspark/pandas/groupby.py| 41 ++--- python/pyspark/pandas/tests/test_groupby.py | 2 ++ 2 files changed, 40 insertions(+), 3 deletions(-) diff --git a/python/pyspark/pandas/groupby.py b/python/pyspark/pandas/groupby.py index 6d36cfecce6..7085d2ec059 100644 --- a/python/pyspark/pandas/groupby.py +++ b/python/pyspark/pandas/groupby.py @@ -643,16 +643,23 @@ class GroupBy(Generic[FrameLike], metaclass=ABCMeta): bool_to_numeric=True, ) -def min(self, numeric_only: Optional[bool] = False) -> FrameLike: +def min(self, numeric_only: Optional[bool] = False, min_count: int = -1) -> FrameLike: """ Compute min of group values. +.. versionadded:: 3.3.0 + Parameters -- numeric_only : bool, default False Include only float, int, boolean columns. If None, will attempt to use everything, then use only numeric data. +.. versionadded:: 3.4.0 +min_count : bool, default -1 +The required number of valid values to perform the operation. If fewer +than min_count non-NA values are present the result will be NA. + .. versionadded:: 3.4.0 See Also @@ -663,7 +670,7 @@ class GroupBy(Generic[FrameLike], metaclass=ABCMeta): Examples >>> df = ps.DataFrame({"A": [1, 2, 1, 2], "B": [True, False, False, True], -..."C": [3, 4, 3, 4], "D": ["a", "b", "b", "a"]}) +..."C": [3, 4, 3, 4], "D": ["a", "a", "b", "a"]}) >>> df.groupby("A").min().sort_index() B C D A @@ -677,9 +684,37 @@ class GroupBy(Generic[FrameLike], metaclass=ABCMeta): A 1 False 3 2 False 4 + +>>> df.groupby("D").min().sort_index() + A B C +D +a 1 False 3 +b 1 False 3 + + +>>> df.groupby("D").min(min_count=3).sort_index() + A BC +D +a 1.0 False 3.0 +b NaN None NaN """ +if not isinstance(min_count, int): +raise TypeError("min_count must be integer") + +if min_count > 0: + +def min(col: Column) -> Column: +return F.when( +F.count(F.when(~F.isnull(col), F.lit(0))) < min_count, F.lit(None) +).otherwise(F.min(col)) + +else: + +def min(col: Column) -> Column: +return F.min(col) + return self._reduce_for_stat_function( -F.min, accepted_spark_types=(NumericType, BooleanType) if numeric_only else None +min, accepted_spark_types=(NumericType, BooleanType) if numeric_only else None ) # TODO: sync the doc. diff --git a/python/pyspark/pandas/tests/test_groupby.py b/python/pyspark/pandas/tests/test_groupby.py index 4a57a3421df..f0b3a04be17 100644 --- a/python/pyspark/pandas/tests/test_groupby.py +++ b/python/pyspark/pandas/tests/test_groupby.py @@ -1401,8 +1401,10 @@ class GroupByTest(PandasOnSparkTestCase, TestUtils): def test_min(self): self._test_stat_func(lambda groupby_obj: groupby_obj.min()) +self._test_stat_func(lambda groupby_obj: groupby_obj.min(min_count=2)) self._test_stat_func(lambda groupby_obj: groupby_obj.min(numeric_only=Non
[spark] branch master updated (b4014eb13a1 -> 0deba9fdd1a)
This is an automated email from the ASF dual-hosted git repository. ruifengz pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git from b4014eb13a1 [SPARK-40330][PS] Implement `Series.searchsorted` add 0deba9fdd1a [SPARK-40554][PS] Make `ddof` in `DataFrame.sem` and `Series.sem` accept arbitary integers No new revisions were added by this update. Summary of changes: python/pyspark/pandas/generic.py | 18 +- python/pyspark/pandas/tests/test_stats.py | 6 ++ 2 files changed, 19 insertions(+), 5 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-40334][PS] Implement `GroupBy.prod`
This is an automated email from the ASF dual-hosted git repository. ruifengz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new c01e524c298 [SPARK-40334][PS] Implement `GroupBy.prod` c01e524c298 is described below commit c01e524c2985be06027191e51bb94d9ee5637d40 Author: artsiomyudovin AuthorDate: Mon Sep 26 08:00:20 2022 +0800 [SPARK-40334][PS] Implement `GroupBy.prod` ### What changes were proposed in this pull request? Implement `GroupBy.prod` ### Why are the changes needed? for API coverage ### Does this PR introduce _any_ user-facing change? yes, the new API ``` df = ps.DataFrame({'A': [1, 1, 2, 1, 2], 'B': [np.nan, 2, 3, 4, 5], 'C': [1, 2, 1, 1, 2], 'D': [True, False, True, False, True]}) Groupby one column and return the prod of the remaining columns in each group. df.groupby('A').prod() B C D A 1 8.0 2 0 2 15.0 2 11 df.groupby('A').prod(min_count=3) B C D A 1 NaN 2 0 2 NaN NaN NaN ``` ### How was this patch tested? added UT Closes #37923 from ayudovin/ps_group_by_prod. Authored-by: artsiomyudovin Signed-off-by: Ruifeng Zheng --- .../source/reference/pyspark.pandas/groupby.rst| 1 + python/pyspark/pandas/groupby.py | 106 - python/pyspark/pandas/missing/groupby.py | 2 - python/pyspark/pandas/tests/test_groupby.py| 10 ++ 4 files changed, 114 insertions(+), 5 deletions(-) diff --git a/python/docs/source/reference/pyspark.pandas/groupby.rst b/python/docs/source/reference/pyspark.pandas/groupby.rst index 4c29964966c..da1579fd723 100644 --- a/python/docs/source/reference/pyspark.pandas/groupby.rst +++ b/python/docs/source/reference/pyspark.pandas/groupby.rst @@ -74,6 +74,7 @@ Computations / Descriptive Stats GroupBy.median GroupBy.min GroupBy.nth + GroupBy.prod GroupBy.rank GroupBy.sem GroupBy.std diff --git a/python/pyspark/pandas/groupby.py b/python/pyspark/pandas/groupby.py index 2e5c9ab219a..6d36cfecce6 100644 --- a/python/pyspark/pandas/groupby.py +++ b/python/pyspark/pandas/groupby.py @@ -18,7 +18,6 @@ """ A wrapper for GroupedData to behave similar to pandas GroupBy. """ - from abc import ABCMeta, abstractmethod import inspect from collections import defaultdict, namedtuple @@ -63,6 +62,7 @@ from pyspark.sql.types import ( StructField, StructType, StringType, +IntegralType, ) from pyspark import pandas as ps # For running doctests and reference resolution in PyCharm. @@ -1055,6 +1055,106 @@ class GroupBy(Generic[FrameLike], metaclass=ABCMeta): return self._prepare_return(DataFrame(internal)) +def prod(self, numeric_only: Optional[bool] = True, min_count: int = 0) -> FrameLike: +""" +Compute prod of groups. + +.. versionadded:: 3.4.0 + +Parameters +-- +numeric_only : bool, default False +Include only float, int, boolean columns. If None, will attempt to use +everything, then use only numeric data. + +min_count: int, default 0 +The required number of valid values to perform the operation. +If fewer than min_count non-NA values are present the result will be NA. + +Returns +--- +Series or DataFrame +Computed prod of values within each group. + +See Also + +pyspark.pandas.Series.groupby +pyspark.pandas.DataFrame.groupby + +Examples + +>>> import numpy as np +>>> df = ps.DataFrame( +... { +... "A": [1, 1, 2, 1, 2], +... "B": [np.nan, 2, 3, 4, 5], +... "C": [1, 2, 1, 1, 2], +... "D": [True, False, True, False, True], +... } +... ) + +Groupby one column and return the prod of the remaining columns in +each group. + +>>> df.groupby('A').prod().sort_index() + B C D +A +1 8.0 2 0 +2 15.0 2 1 + +>>> df.groupby('A').prod(min_count=3).sort_index() + B C D +A +1 NaN 2.0 0.0 +2 NaN NaN NaN +""" + +self._validate_agg_columns(numeric_only=numeric_only, function_name="prod") + +groupkey_names = [SPARK_INDEX_NAME_FORMAT(i) for i in range(len(self._groupkeys))] +
[spark] branch master updated: [SPARK-40543][PS][SQL] Make `ddof` in `DataFrame.var` and `Series.var` accept arbitary integers
This is an automated email from the ASF dual-hosted git repository. ruifengz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new def8cca7d89 [SPARK-40543][PS][SQL] Make `ddof` in `DataFrame.var` and `Series.var` accept arbitary integers def8cca7d89 is described below commit def8cca7d891b95e56f60ea3c007398d3e21bcd1 Author: Ruifeng Zheng AuthorDate: Fri Sep 23 14:54:12 2022 +0800 [SPARK-40543][PS][SQL] Make `ddof` in `DataFrame.var` and `Series.var` accept arbitary integers ### What changes were proposed in this pull request? add a new `var` expression to support arbitary integeral `ddof` ### Why are the changes needed? for API coverage ### Does this PR introduce _any_ user-facing change? yes, it accept `ddof` other than {0, 1} before ``` In [1]: import pyspark.pandas as ps In [2]: import numpy as np In [3]: df = ps.DataFrame({'a': [1, 2, 3, np.nan], 'b': [0.1, 0.2, 0.3, np.nan]}, columns=['a', 'b']) In [4]: df.var(ddof=2) --- AssertionErrorTraceback (most recent call last) Cell In [4], line 1 > 1 df.var(ddof=2) File ~/Dev/spark/python/pyspark/pandas/generic.py:1958, in Frame.var(self, axis, ddof, numeric_only) 1904 def var( 1905 self, axis: Optional[Axis] = None, ddof: int = 1, numeric_only: bool = None 1906 ) -> Union[Scalar, "Series"]: 1907 """ 1908 Return unbiased variance. 1909 (...) 1956 0. 1957 """ -> 1958 assert ddof in (0, 1) 1960 axis = validate_axis(axis) 1962 if numeric_only is None and axis == 0: AssertionError: ``` after ``` In [4]: df.var(ddof=2) Out[4]: a2.00 b0.02 dtype: float64 In [5]: df.to_pandas().var(ddof=2) /Users/ruifeng.zheng/Dev/spark/python/pyspark/pandas/utils.py:975: PandasAPIOnSparkAdviceWarning: `to_pandas` loads all data into the driver's memory. It should only be used if the resulting pandas DataFrame is expected to be small. warnings.warn(message, PandasAPIOnSparkAdviceWarning) Out[5]: a2.00 b0.02 dtype: float64 ``` ### How was this patch tested? added UT Closes #37975 from zhengruifeng/ps_var_ddof. Authored-by: Ruifeng Zheng Signed-off-by: Ruifeng Zheng --- python/pyspark/pandas/generic.py | 21 - python/pyspark/pandas/spark/functions.py | 5 + .../pyspark/pandas/tests/test_generic_functions.py | 7 +++ python/pyspark/pandas/tests/test_stats.py | 6 ++ .../expressions/aggregate/CentralMomentAgg.scala | 22 ++ .../spark/sql/api/python/PythonSQLUtils.scala | 4 6 files changed, 60 insertions(+), 5 deletions(-) diff --git a/python/pyspark/pandas/generic.py b/python/pyspark/pandas/generic.py index 6ba967da7f5..49032f07c75 100644 --- a/python/pyspark/pandas/generic.py +++ b/python/pyspark/pandas/generic.py @@ -1907,6 +1907,8 @@ class Frame(object, metaclass=ABCMeta): """ Return unbiased variance. +.. versionadded:: 3.3.0 + Parameters -- axis : {index (0), columns (1)} @@ -1914,6 +1916,9 @@ class Frame(object, metaclass=ABCMeta): ddof : int, default 1 Delta Degrees of Freedom. The divisor used in calculations is N - ddof, where N represents the number of elements. + +.. versionchanged:: 3.4.0 + Supported including arbitary integers. numeric_only : bool, default None Include only float, int, boolean columns. False is not supported. This parameter is mainly for pandas compatibility. @@ -1935,6 +1940,11 @@ class Frame(object, metaclass=ABCMeta): b0.01 dtype: float64 +>>> df.var(ddof=2) +a2.00 +b0.02 +dtype: float64 + >>> df.var(axis=1) 00.405 11.620 @@ -1954,8 +1964,12 @@ class Frame(object, metaclass=ABCMeta): >>> df['a'].var(ddof=0) 0. + +>>> df['a'].var(ddof=-2) +0.4 """ -assert ddof in (0, 1) +if not isinstance(ddof, int): +raise TypeError("ddof must be integer") axis = validate_axis(axis) @@ -1973,10 +1987,7 @@ class Frame(object, metaclass=ABCMeta): spark_type_to_pandas_dtype(spark_type), spark_type.simpleString() )
[spark] branch master updated: [SPARK-40542][PS][SQL] Make `ddof` in `DataFrame.std` and `Series.std` accept arbitary integers
This is an automated email from the ASF dual-hosted git repository. ruifengz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 83167e56ff9 [SPARK-40542][PS][SQL] Make `ddof` in `DataFrame.std` and `Series.std` accept arbitary integers 83167e56ff9 is described below commit 83167e56ff9cdfeb29da81d07d56b482ccfedc74 Author: Ruifeng Zheng AuthorDate: Fri Sep 23 10:13:33 2022 +0800 [SPARK-40542][PS][SQL] Make `ddof` in `DataFrame.std` and `Series.std` accept arbitary integers ### What changes were proposed in this pull request? add a new `std` expression to support arbitary integeral `ddof` ### Why are the changes needed? for API coverage ### Does this PR introduce _any_ user-facing change? yes, it accept `ddof` other than {0, 1} before ``` In [4]: df = ps.DataFrame({'a': [1, 2, 3, np.nan], 'b': [0.1, 0.2, 0.3, np.nan]}, columns=['a', 'b']) In [5]: df.std(ddof=2) --- AssertionErrorTraceback (most recent call last) Cell In [5], line 1 > 1 df.std(ddof=2) File ~/Dev/spark/python/pyspark/pandas/generic.py:1866, in Frame.std(self, axis, skipna, ddof, numeric_only) 1803 def std( 1804 self, 1805 axis: Optional[Axis] = None, (...) 1808 numeric_only: bool = None, 1809 ) -> Union[Scalar, "Series"]: 1810 """ 1811 Return sample standard deviation. 1812 (...) 1864 0.816496580927726 1865 """ -> 1866 assert ddof in (0, 1) 1868 axis = validate_axis(axis) 1870 if numeric_only is None and axis == 0: AssertionError: ``` after: ``` In [3]: df = ps.DataFrame({'a': [1, 2, 3, np.nan], 'b': [0.1, 0.2, 0.3, np.nan]}, columns=['a', 'b']) In [4]: df.std(ddof=2) Out[4]: a1.414214 b0.141421 dtype: float64 In [5]: df.to_pandas().std(ddof=2) /Users/ruifeng.zheng/Dev/spark/python/pyspark/pandas/utils.py:975: PandasAPIOnSparkAdviceWarning: `to_pandas` loads all data into the driver's memory. It should only be used if the resulting pandas DataFrame is expected to be small. warnings.warn(message, PandasAPIOnSparkAdviceWarning) Out[5]: a1.414214 b0.141421 dtype: float64 ``` ### How was this patch tested? added testsuites Closes #37974 from zhengruifeng/ps_std_ddof. Authored-by: Ruifeng Zheng Signed-off-by: Ruifeng Zheng --- python/pyspark/pandas/generic.py | 21 +++- python/pyspark/pandas/spark/functions.py | 5 + .../pyspark/pandas/tests/test_generic_functions.py | 6 ++ python/pyspark/pandas/tests/test_stats.py | 6 ++ .../expressions/aggregate/CentralMomentAgg.scala | 23 ++ .../spark/sql/api/python/PythonSQLUtils.scala | 4 6 files changed, 60 insertions(+), 5 deletions(-) diff --git a/python/pyspark/pandas/generic.py b/python/pyspark/pandas/generic.py index cafa37e3d9b..6ba967da7f5 100644 --- a/python/pyspark/pandas/generic.py +++ b/python/pyspark/pandas/generic.py @@ -1810,6 +1810,8 @@ class Frame(object, metaclass=ABCMeta): """ Return sample standard deviation. +.. versionadded:: 3.3.0 + Parameters -- axis : {index (0), columns (1)} @@ -1822,6 +1824,9 @@ class Frame(object, metaclass=ABCMeta): ddof : int, default 1 Delta Degrees of Freedom. The divisor used in calculations is N - ddof, where N represents the number of elements. + +.. versionchanged:: 3.4.0 + Supported including arbitary integers. numeric_only : bool, default None Include only float, int, boolean columns. False is not supported. This parameter is mainly for pandas compatibility. @@ -1843,6 +1848,11 @@ class Frame(object, metaclass=ABCMeta): b0.1 dtype: float64 +>>> df.std(ddof=2) +a1.414214 +b0.141421 +dtype: float64 + >>> df.std(axis=1) 00.636396 11.272792 @@ -1862,8 +1872,12 @@ class Frame(object, metaclass=ABCMeta): >>> df['a'].std(ddof=0) 0.816496580927726 + +>>> df['a'].std(ddof=-1) +0.707106... """ -assert ddof in (0, 1) +if not isinstance(ddof, int): +raise TypeError("ddof must be integer") axis = validate_axis(axis) @@ -1881,10 +1895,7 @@ clas
[spark] branch master updated (e5b4b32b2cd -> 1476a9f96e5)
This is an automated email from the ASF dual-hosted git repository. ruifengz pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git from e5b4b32b2cd [SPARK-40434][SS][PYTHON][FOLLOWUP] Address review comments add 1476a9f96e5 [SPARK-40510][PS] Implement `ddof` in `Series.cov` No new revisions were added by this update. Summary of changes: python/pyspark/pandas/frame.py | 2 +- python/pyspark/pandas/series.py| 22 ++- python/pyspark/pandas/tests/test_series.py | 35 ++ 3 files changed, 43 insertions(+), 16 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (603dc509821 -> 2a6017a5b1d)
This is an automated email from the ASF dual-hosted git repository. ruifengz pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git from 603dc509821 [SPARK-40434][SS][PYTHON] Implement applyInPandasWithState in PySpark add 2a6017a5b1d [SPARK-40327][PS][DOCS] Add resampling to API references No new revisions were added by this update. Summary of changes: .../docs/source/reference/pyspark.pandas/index.rst | 1 + .../resampling.rst}| 44 ++- python/pyspark/pandas/resample.py | 294 - 3 files changed, 311 insertions(+), 28 deletions(-) copy python/docs/source/reference/{pyspark.sql/data_types.rst => pyspark.pandas/resampling.rst} (67%) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (3921e689e82 -> c6f4526be5e)
This is an automated email from the ASF dual-hosted git repository. ruifengz pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git from 3921e689e82 [SPARK-40332][PS][DOCS][FOLLOWUP] Fix wrong underline length add c6f4526be5e [SPARK-40498][PS] Implement `kendall` and `min_periods` in `Series.corr` No new revisions were added by this update. Summary of changes: python/pyspark/pandas/frame.py| 3 + python/pyspark/pandas/series.py | 110 -- python/pyspark/pandas/tests/test_stats.py | 89 3 files changed, 151 insertions(+), 51 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (a031aaa487e -> 3921e689e82)
This is an automated email from the ASF dual-hosted git repository. ruifengz pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git from a031aaa487e [SPARK-40496][SQL] Fix configs to control "enableDateTimeParsingFallback" add 3921e689e82 [SPARK-40332][PS][DOCS][FOLLOWUP] Fix wrong underline length No new revisions were added by this update. Summary of changes: python/pyspark/pandas/groupby.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (2aeb8d74c45 -> eee6e45ed17)
This is an automated email from the ASF dual-hosted git repository. ruifengz pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git from 2aeb8d74c45 [MINOR][DOCS][PYTHON] Document datetime.timedelta <> DayTimeIntervalType add eee6e45ed17 [SPARK-40486][PS] Implement `spearman` and `kendall` in `DataFrame.corrwith` No new revisions were added by this update. Summary of changes: python/pyspark/pandas/correlation.py | 262 +++ python/pyspark/pandas/frame.py| 446 +- python/pyspark/pandas/tests/test_dataframe.py | 18 +- 3 files changed, 429 insertions(+), 297 deletions(-) create mode 100644 python/pyspark/pandas/correlation.py - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-40447][PS] Implement `kendall` correlation in `DataFrame.corr`
This is an automated email from the ASF dual-hosted git repository. ruifengz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 5b2bd1c9c0c [SPARK-40447][PS] Implement `kendall` correlation in `DataFrame.corr` 5b2bd1c9c0c is described below commit 5b2bd1c9c0cb109f8a801dfcfb6ba1305bf864c6 Author: Ruifeng Zheng AuthorDate: Sat Sep 17 07:30:31 2022 +0800 [SPARK-40447][PS] Implement `kendall` correlation in `DataFrame.corr` ### What changes were proposed in this pull request? Implement `kendall` correlation in `DataFrame.corr` ### Why are the changes needed? for API coverage ### Does this PR introduce _any_ user-facing change? yes, new correlation option: ``` In [1]: import pyspark.pandas as ps In [2]: df = ps.DataFrame([(.2, .3), (.0, .6), (.6, .0), (.2, .1)], columns=['dogs', 'cats']) In [3]: df.corr('kendall') dogs cats dogs 1.00 -0.912871 cats -0.912871 1.00 In [4]: df.to_pandas().corr('kendall') /Users/ruifeng.zheng/Dev/spark/python/pyspark/pandas/utils.py:975: PandasAPIOnSparkAdviceWarning: `to_pandas` loads all data into the driver's memory. It should only be used if the resulting pandas DataFrame is expected to be small. warnings.warn(message, PandasAPIOnSparkAdviceWarning) Out[4]: dogs cats dogs 1.00 -0.912871 cats -0.912871 1.00 ``` ### How was this patch tested? added UT Closes #37913 from zhengruifeng/ps_df_kendall. Authored-by: Ruifeng Zheng Signed-off-by: Ruifeng Zheng --- python/pyspark/pandas/frame.py| 260 +- python/pyspark/pandas/tests/test_stats.py | 32 ++-- 2 files changed, 204 insertions(+), 88 deletions(-) diff --git a/python/pyspark/pandas/frame.py b/python/pyspark/pandas/frame.py index 4149868dde9..d7b26cacda3 100644 --- a/python/pyspark/pandas/frame.py +++ b/python/pyspark/pandas/frame.py @@ -1424,9 +1424,10 @@ class DataFrame(Frame, Generic[T]): Parameters -- -method : {'pearson', 'spearman'} +method : {'pearson', 'spearman', 'kendall'} * pearson : standard correlation coefficient * spearman : Spearman rank correlation +* kendall : Kendall Tau correlation coefficient min_periods : int, optional Minimum number of observations required per pair of columns to have a valid result. @@ -1435,12 +1436,21 @@ class DataFrame(Frame, Generic[T]): Returns --- -y : DataFrame +DataFrame See Also +DataFrame.corrwith Series.corr +Notes +- +1. Pearson, Kendall and Spearman correlation are currently computed using pairwise + complete observations. + +2. The complexity of Spearman correlation is O(#row * #row), if the dataset is too + large, sampling ahead of correlation computation is recommended. + Examples >>> df = ps.DataFrame([(.2, .3), (.0, .6), (.6, .0), (.2, .1)], @@ -1455,16 +1465,13 @@ class DataFrame(Frame, Generic[T]): dogs 1.00 -0.948683 cats -0.948683 1.00 -Notes -- -There are behavior differences between pandas-on-Spark and pandas. - -* the `method` argument only accepts 'pearson', 'spearman' +>>> df.corr('kendall') + dogs cats +dogs 1.00 -0.912871 +cats -0.912871 1.00 """ if method not in ["pearson", "spearman", "kendall"]: raise ValueError(f"Invalid method {method}") -if method == "kendall": -raise NotImplementedError("method doesn't support kendall for now") if min_periods is not None and not isinstance(min_periods, int): raise TypeError(f"Invalid min_periods type {type(min_periods).__name__}") @@ -1537,87 +1544,196 @@ class DataFrame(Frame, Generic[T]): .otherwise(F.col(f"{tmp_tuple_col_name}.{tmp_value_2_col_name}")) .alias(tmp_value_2_col_name), ) +not_null_cond = ( +F.col(tmp_value_1_col_name).isNotNull() & F.col(tmp_value_2_col_name).isNotNull() +) -# convert values to avg ranks for spearman correlation -if method == "spearman": -tmp_row_number_col_name = verify_temp_column_name(sdf, "__tmp_row_number_col__") -tmp_dense_rank_col_name = verify_temp_column_name(sdf, "__tmp_dense_rank_col__") -window = Window.partitionBy(tmp_index_1_col_name, tmp_inde
[spark] branch master updated (4e0fea2393b -> 4e5ece2be34)
This is an automated email from the ASF dual-hosted git repository. ruifengz pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git from 4e0fea2393b [SPARK-40169][SQL] Don't pushdown Parquet filters with no reference to data schema add 4e5ece2be34 [SPARK-40445][PS] Refactor `Resampler` for consistency and simplicity No new revisions were added by this update. Summary of changes: python/pyspark/pandas/groupby.py | 22 ++--- python/pyspark/pandas/resample.py | 67 ++- python/pyspark/pandas/series.py | 2 +- python/pyspark/pandas/window.py | 2 +- 4 files changed, 43 insertions(+), 50 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-40196][PYTHON][PS][FOLLOWUP] Skip SparkFunctionsTests.test_repeat
This is an automated email from the ASF dual-hosted git repository. ruifengz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new cd8c8746c34 [SPARK-40196][PYTHON][PS][FOLLOWUP] Skip SparkFunctionsTests.test_repeat cd8c8746c34 is described below commit cd8c8746c344fc48ba008830a0b816be05d98ca4 Author: Yikun Jiang AuthorDate: Fri Sep 16 15:07:49 2022 +0800 [SPARK-40196][PYTHON][PS][FOLLOWUP] Skip SparkFunctionsTests.test_repeat ### What changes were proposed in this pull request? Mark `SparkFunctionsTests.test_repeat` as placeholder. ### Why are the changes needed? ``` test_repeat (pyspark.pandas.tests.test_spark_functions.SparkFunctionsTests) ... FAIL (0.052s) == FAIL [0.052s]: test_repeat (pyspark.pandas.tests.test_spark_functions.SparkFunctionsTests) -- Traceback (most recent call last): File "/__w/spark/spark/python/pyspark/pandas/tests/test_spark_functions.py", line 28, in test_repeat self.assertTrue(spark_column_equals(SF.repeat(F.lit(1), 2), F.repeat(F.lit(1), 2))) AssertionError: False is not true -- Ran 1 test in 8.471s ``` According to https://github.com/apache/spark/pull/37888#discussion_r971408190 we'd better skip it first. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? CI passed Closes #37912 from Yikun/37888. Authored-by: Yikun Jiang Signed-off-by: Ruifeng Zheng --- python/pyspark/pandas/tests/test_spark_functions.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/python/pyspark/pandas/tests/test_spark_functions.py b/python/pyspark/pandas/tests/test_spark_functions.py index 4b95a8eb7d5..c18dc30240c 100644 --- a/python/pyspark/pandas/tests/test_spark_functions.py +++ b/python/pyspark/pandas/tests/test_spark_functions.py @@ -25,7 +25,8 @@ from pyspark.testing.pandasutils import PandasOnSparkTestCase class SparkFunctionsTests(PandasOnSparkTestCase): def test_repeat(self): -self.assertTrue(spark_column_equals(SF.repeat(F.lit(1), 2), F.repeat(F.lit(1), 2))) +# TODO: Placeholder +pass if __name__ == "__main__": - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-40440][PS][DOCS] Fix wrong reference and content in PS windows related doc
This is an automated email from the ASF dual-hosted git repository. ruifengz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 3d14b745773 [SPARK-40440][PS][DOCS] Fix wrong reference and content in PS windows related doc 3d14b745773 is described below commit 3d14b745773f66d50c5ee5b3d7835b5f11132ec8 Author: Yikun Jiang AuthorDate: Thu Sep 15 17:14:21 2022 +0800 [SPARK-40440][PS][DOCS] Fix wrong reference and content in PS windows related doc ### What changes were proposed in this pull request? Fix wrong reference and content in PS windows related doc: - Add `pyspark.pandas.` for window function doc - Change `pandas_on_spark.DataFrame` to `pyspark.pandas.DataFrame` to make sure link generate correctly. - Fix `Returns` and `See Also` for `Rolling.count` - Add ewm doc for `Dataframe` and `series` ### Why are the changes needed? ### Does this PR introduce _any_ user-facing change? ### How was this patch tested? ``` cd ~/spark/python/docs make html ``` ![image](https://user-images.githubusercontent.com/1736354/190328623-4c3250af-3968-430e-adf3-890d1bda850e.png) ![image](https://user-images.githubusercontent.com/1736354/190346064-b359e217-dac0-41df-8033-8477bc94b09b.png) ![image](https://user-images.githubusercontent.com/1736354/190346138-10bfc3ad-0a37-4bdb-b4aa-cc1ee2a3b018.png) Closes #37895 from Yikun/add-doc. Authored-by: Yikun Jiang Signed-off-by: Ruifeng Zheng --- .../docs/source/reference/pyspark.pandas/frame.rst | 1 + .../source/reference/pyspark.pandas/series.rst | 1 + .../source/reference/pyspark.pandas/window.rst | 8 +- python/pyspark/pandas/window.py| 301 +++-- 4 files changed, 160 insertions(+), 151 deletions(-) diff --git a/python/docs/source/reference/pyspark.pandas/frame.rst b/python/docs/source/reference/pyspark.pandas/frame.rst index ff743371320..9c69ca647c4 100644 --- a/python/docs/source/reference/pyspark.pandas/frame.rst +++ b/python/docs/source/reference/pyspark.pandas/frame.rst @@ -151,6 +151,7 @@ Computations / Descriptive Stats DataFrame.count DataFrame.cov DataFrame.describe + DataFrame.ewm DataFrame.kurt DataFrame.kurtosis DataFrame.mad diff --git a/python/docs/source/reference/pyspark.pandas/series.rst b/python/docs/source/reference/pyspark.pandas/series.rst index 1cf63c1a8ae..5ed6df6b2a1 100644 --- a/python/docs/source/reference/pyspark.pandas/series.rst +++ b/python/docs/source/reference/pyspark.pandas/series.rst @@ -145,6 +145,7 @@ Computations / Descriptive Stats Series.cumsum Series.cumprod Series.describe + Series.ewm Series.filter Series.kurt Series.mad diff --git a/python/docs/source/reference/pyspark.pandas/window.rst b/python/docs/source/reference/pyspark.pandas/window.rst index b5d74f2029a..c840be357fa 100644 --- a/python/docs/source/reference/pyspark.pandas/window.rst +++ b/python/docs/source/reference/pyspark.pandas/window.rst @@ -21,9 +21,11 @@ Window == .. currentmodule:: pyspark.pandas.window -Rolling objects are returned by ``.rolling`` calls: :func:`pandas_on_spark.DataFrame.rolling`, :func:`pandas_on_spark.Series.rolling`, etc. -Expanding objects are returned by ``.expanding`` calls: :func:`pandas_on_spark.DataFrame.expanding`, :func:`pandas_on_spark.Series.expanding`, etc. -ExponentialMoving objects are returned by ``.ewm`` calls: :func:`pandas_on_spark.DataFrame.ewm`, :func:`pandas_on_spark.Series.ewm`, etc. +Rolling objects are returned by ``.rolling`` calls: :func:`pyspark.pandas.DataFrame.rolling`, :func:`pyspark.pandas.Series.rolling`, etc. + +Expanding objects are returned by ``.expanding`` calls: :func:`pyspark.pandas.DataFrame.expanding`, :func:`pyspark.pandas.Series.expanding`, etc. + +ExponentialMoving objects are returned by ``.ewm`` calls: :func:`pyspark.pandas.DataFrame.ewm`, :func:`pyspark.pandas.Series.ewm`, etc. Standard moving window functions diff --git a/python/pyspark/pandas/window.py b/python/pyspark/pandas/window.py index 274000cbb2c..7ff7a6a8eb6 100644 --- a/python/pyspark/pandas/window.py +++ b/python/pyspark/pandas/window.py @@ -224,10 +224,15 @@ class Rolling(RollingLike[FrameLike]): Returns --- -Series.expanding : Calling object with Series data. -DataFrame.expanding : Calling object with DataFrames. -Series.count : Count of the full Series. -DataFrame.count : Count of the full DataFrame. +Series or DataFrame +Return type is the same as the original object with `np.float64` dtype. + +See Also + +pyspark.pandas.Series.expanding : Calling object with Series data. +pyspark.pandas.DataFrame.expanding : Calling
[spark] branch master updated (ea6857abff8 -> 1c46c87ddb1)
This is an automated email from the ASF dual-hosted git repository. ruifengz pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git from ea6857abff8 [SPARK-40426][SQL] Return a map from SparkThrowable.getMessageParameters add 1c46c87ddb1 [SPARK-40421][PS] Make `spearman` correlation in `DataFrame.corr` support missing values and `min_periods` No new revisions were added by this update. Summary of changes: python/pyspark/pandas/frame.py| 389 +- python/pyspark/pandas/tests/test_stats.py | 66 - 2 files changed, 275 insertions(+), 180 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-40399][PS] Make `pearson` correlation in `DataFrame.corr` support missing values and `min_periods `
This is an automated email from the ASF dual-hosted git repository. ruifengz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new ae08787f5c5 [SPARK-40399][PS] Make `pearson` correlation in `DataFrame.corr` support missing values and `min_periods ` ae08787f5c5 is described below commit ae08787f5c50e485ef4432a0c2da8b3b7290d725 Author: Ruifeng Zheng AuthorDate: Tue Sep 13 14:44:18 2022 +0800 [SPARK-40399][PS] Make `pearson` correlation in `DataFrame.corr` support missing values and `min_periods ` ### What changes were proposed in this pull request? refactor `pearson` correlation in `DataFrame.corr` to: 1. support missing values; 2. add parameter `min_periods`; 3. enable arrow execution since no longer depend on `VectorUDT`; 4. support lazy evaluation; before ``` In [1]: import pyspark.pandas as ps In [2]: df = ps.DataFrame([[1,2], [3,None]]) In [3]: df 01 0 1 2.0 1 3 NaN In [4]: df.corr() 22/09/09 16:53:18 ERROR Executor: Exception in task 9.0 in stage 5.0 (TID 24) org.apache.spark.SparkException: [FAILED_EXECUTE_UDF] Failed to execute user defined function (VectorAssembler$$Lambda$2660/0x000801215840: (struct<0_double_VectorAssembler_0915f96ec689:double,1:double>) => struct,values:array>) ``` after ``` In [1]: import pyspark.pandas as ps In [2]: df = ps.DataFrame([[1,2], [3,None]]) In [3]: df.corr() 0 1 0 1.0 NaN 1 NaN NaN In [4]: df.to_pandas().corr() /Users/ruifeng.zheng/Dev/spark/python/pyspark/pandas/utils.py:976: PandasAPIOnSparkAdviceWarning: `to_pandas` loads all data into the driver's memory. It should only be used if the resulting pandas DataFrame is expected to be small. warnings.warn(message, PandasAPIOnSparkAdviceWarning) Out[4]: 0 1 0 1.0 NaN 1 NaN NaN ``` ### Why are the changes needed? for API coverage and support common cases containing missing values ### Does this PR introduce _any_ user-facing change? yes, API change, new parameter supported ### How was this patch tested? added UT Closes #37845 from zhengruifeng/ps_df_corr_missing_value. Authored-by: Ruifeng Zheng Signed-off-by: Ruifeng Zheng --- python/pyspark/pandas/frame.py| 209 +- python/pyspark/pandas/tests/test_stats.py | 34 + 2 files changed, 238 insertions(+), 5 deletions(-) diff --git a/python/pyspark/pandas/frame.py b/python/pyspark/pandas/frame.py index 3438d07896e..cf14a548266 100644 --- a/python/pyspark/pandas/frame.py +++ b/python/pyspark/pandas/frame.py @@ -1417,15 +1417,23 @@ class DataFrame(Frame, Generic[T]): agg = aggregate -def corr(self, method: str = "pearson") -> "DataFrame": +def corr(self, method: str = "pearson", min_periods: Optional[int] = None) -> "DataFrame": """ Compute pairwise correlation of columns, excluding NA/null values. +.. versionadded:: 3.3.0 + Parameters -- method : {'pearson', 'spearman'} * pearson : standard correlation coefficient * spearman : Spearman rank correlation +min_periods : int, optional +Minimum number of observations required per pair of columns +to have a valid result. Currently only available for Pearson +correlation. + +.. versionadded:: 3.4.0 Returns --- @@ -1454,11 +1462,202 @@ class DataFrame(Frame, Generic[T]): There are behavior differences between pandas-on-Spark and pandas. * the `method` argument only accepts 'pearson', 'spearman' -* the data should not contain NaNs. pandas-on-Spark will return an error. -* pandas-on-Spark doesn't support the following argument(s). +* if the `method` is `spearman`, the data should not contain NaNs. +* if the `method` is `spearman`, `min_periods` argument is not supported. +""" +if method not in ["pearson", "spearman", "kendall"]: +raise ValueError(f"Invalid method {method}") +if method == "kendall": +raise NotImplementedError("method doesn't support kendall for now") +if min_periods is not None and not isinstance(min_periods, int): +raise TypeError(f"Invalid min_periods type {type(min_periods).__name__}") +if min_periods is not None and method == "spearman": +raise NotImplementedError("min_periods doesn't support spearman for now") + +if method == "pea
[spark] branch master updated: [SPARK-40386][PS][SQL] Implement `ddof` in `DataFrame.cov`
This is an automated email from the ASF dual-hosted git repository. ruifengz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new f92b4941c63 [SPARK-40386][PS][SQL] Implement `ddof` in `DataFrame.cov` f92b4941c63 is described below commit f92b4941c631526a387c6f23554db53fbf922b96 Author: Ruifeng Zheng AuthorDate: Tue Sep 13 11:43:47 2022 +0800 [SPARK-40386][PS][SQL] Implement `ddof` in `DataFrame.cov` ### What changes were proposed in this pull request? 1, add a dedicated expression for `DataFrame.cov`; 2, add missing parameter `ddof` in `DataFrame.cov` ### Why are the changes needed? for api coverage ### Does this PR introduce _any_ user-facing change? yes, API change ``` >>> np.random.seed(42) >>> df = ps.DataFrame(np.random.randn(1000, 5), ... columns=['a', 'b', 'c', 'd', 'e']) >>> df.cov() a b c d e a 0.998438 -0.020161 0.059277 -0.008943 0.014144 b -0.020161 1.059352 -0.008543 -0.024738 0.009826 c 0.059277 -0.008543 1.010670 -0.001486 -0.000271 d -0.008943 -0.024738 -0.001486 0.921297 -0.013692 e 0.014144 0.009826 -0.000271 -0.013692 0.977795 >>> df.cov(ddof=2) a b c d e a 0.999439 -0.020181 0.059336 -0.008952 0.014159 b -0.020181 1.060413 -0.008551 -0.024762 0.009836 c 0.059336 -0.008551 1.011683 -0.001487 -0.000271 d -0.008952 -0.024762 -0.001487 0.90 -0.013705 e 0.014159 0.009836 -0.000271 -0.013705 0.978775 >>> df.cov(ddof=-1) a b c d e a 0.996444 -0.020121 0.059158 -0.008926 0.014116 b -0.020121 1.057235 -0.008526 -0.024688 0.009807 c 0.059158 -0.008526 1.008650 -0.001483 -0.000270 d -0.008926 -0.024688 -0.001483 0.919456 -0.013664 e 0.014116 0.009807 -0.000270 -0.013664 0.975842 ``` ### How was this patch tested? added tests Closes #37829 from zhengruifeng/ps_cov_ddof. Authored-by: Ruifeng Zheng Signed-off-by: Ruifeng Zheng --- python/pyspark/pandas/frame.py | 31 +- python/pyspark/pandas/spark/functions.py | 5 python/pyspark/pandas/tests/test_dataframe.py | 10 +++ .../expressions/aggregate/Covariance.scala | 22 +++ .../spark/sql/api/python/PythonSQLUtils.scala | 4 +++ 5 files changed, 66 insertions(+), 6 deletions(-) diff --git a/python/pyspark/pandas/frame.py b/python/pyspark/pandas/frame.py index 2a7fda2d527..3438d07896e 100644 --- a/python/pyspark/pandas/frame.py +++ b/python/pyspark/pandas/frame.py @@ -8738,8 +8738,7 @@ defaultdict(, {'col..., 'col...})] internal = self._internal.with_new_sdf(sdf, data_fields=data_fields) self._update_internal_frame(internal, check_same_anchor=False) -# TODO: ddof should be implemented. -def cov(self, min_periods: Optional[int] = None) -> "DataFrame": +def cov(self, min_periods: Optional[int] = None, ddof: int = 1) -> "DataFrame": """ Compute pairwise covariance of columns, excluding NA/null values. @@ -8755,8 +8754,7 @@ defaultdict(, {'col..., 'col...})] below this threshold will be returned as ``NaN``. This method is generally used for the analysis of time series data to -understand the relationship between different measures -across time. +understand the relationship between different measures across time. .. versionadded:: 3.3.0 @@ -8765,6 +8763,11 @@ defaultdict(, {'col..., 'col...})] min_periods : int, optional Minimum number of observations required per pair of columns to have a valid result. +ddof : int, default 1 +Delta degrees of freedom. The divisor used in calculations +is ``N - ddof``, where ``N`` represents the number of elements. + +.. versionadded:: 3.4.0 Returns --- @@ -8794,6 +8797,20 @@ defaultdict(, {'col..., 'col...})] c 0.059277 -0.008543 1.010670 -0.001486 -0.000271 d -0.008943 -0.024738 -0.001486 0.921297 -0.013692 e 0.014144 0.009826 -0.000271 -0.013692 0.977795 +>>> df.cov(ddof=2) + a b c d e +a 0.999439 -0.020181 0.059336 -0.008952 0.014159 +b -0.020181 1.060413 -0.008551 -0.024762 0.009836 +c 0.059336 -0.008551 1.011683 -0.001487 -0.000
[spark] branch master updated: [SPARK-40332][PS] Implement `GroupBy.quantile`
This is an automated email from the ASF dual-hosted git repository. ruifengz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 6577c43852d [SPARK-40332][PS] Implement `GroupBy.quantile` 6577c43852d is described below commit 6577c43852dca1725ab77650103b7763e6520500 Author: Yikun Jiang AuthorDate: Fri Sep 9 08:24:29 2022 +0800 [SPARK-40332][PS] Implement `GroupBy.quantile` ### What changes were proposed in this pull request? Implement `GroupBy.quantile` ### Why are the changes needed? Improve PS api coverage ### Does this PR introduce _any_ user-facing change? yes, new API ```python >>> df = ps.DataFrame([ ... ['a', 1], ['a', 2], ['a', 3], ... ['b', 1], ['b', 3], ['b', 5] ... ], columns=['key', 'val']) >>> df.groupby('key').quantile() val key a2.0 b3.0 ``` ### How was this patch tested? UT Closes #37816 from Yikun/SPARK-40332. Authored-by: Yikun Jiang Signed-off-by: Ruifeng Zheng --- .../source/reference/pyspark.pandas/groupby.rst| 1 + python/pyspark/pandas/groupby.py | 64 +- python/pyspark/pandas/missing/groupby.py | 2 - python/pyspark/pandas/tests/test_groupby.py| 42 ++ 4 files changed, 106 insertions(+), 3 deletions(-) diff --git a/python/docs/source/reference/pyspark.pandas/groupby.rst b/python/docs/source/reference/pyspark.pandas/groupby.rst index 24e3bde91f5..4c29964966c 100644 --- a/python/docs/source/reference/pyspark.pandas/groupby.rst +++ b/python/docs/source/reference/pyspark.pandas/groupby.rst @@ -80,6 +80,7 @@ Computations / Descriptive Stats GroupBy.sum GroupBy.var GroupBy.nunique + GroupBy.quantile GroupBy.size GroupBy.diff GroupBy.idxmax diff --git a/python/pyspark/pandas/groupby.py b/python/pyspark/pandas/groupby.py index 01163b61375..2e2e5540bd4 100644 --- a/python/pyspark/pandas/groupby.py +++ b/python/pyspark/pandas/groupby.py @@ -45,7 +45,7 @@ from typing import ( import warnings import pandas as pd -from pandas.api.types import is_hashable, is_list_like # type: ignore[attr-defined] +from pandas.api.types import is_number, is_hashable, is_list_like # type: ignore[attr-defined] if LooseVersion(pd.__version__) >= LooseVersion("1.3.0"): from pandas.core.common import _builtin_table # type: ignore[attr-defined] @@ -58,6 +58,7 @@ from pyspark.sql import Column, DataFrame as SparkDataFrame, Window, functions a from pyspark.sql.types import ( BooleanType, DataType, +DoubleType, NumericType, StructField, StructType, @@ -581,6 +582,67 @@ class GroupBy(Generic[FrameLike], metaclass=ABCMeta): F.mean, accepted_spark_types=(NumericType,), bool_to_numeric=True ) +# TODO: 'q' accepts list like type +def quantile(self, q: float = 0.5, accuracy: int = 1) -> FrameLike: +""" +Return group values at the given quantile. + +.. versionadded:: 3.4.0 + +Parameters +-- +q : float, default 0.5 (50% quantile) +Value between 0 and 1 providing the quantile to compute. +accuracy : int, optional +Default accuracy of approximation. Larger value means better accuracy. +The relative error can be deduced by 1.0 / accuracy. +This is a panda-on-Spark specific parameter. + +Returns +--- +pyspark.pandas.Series or pyspark.pandas.DataFrame +Return type determined by caller of GroupBy object. + +Notes +--- +`quantile` in pandas-on-Spark are using distributed percentile approximation +algorithm unlike pandas, the result might different with pandas, also +`interpolation` parameters are not supported yet. + +See Also + +pyspark.pandas.Series.quantile +pyspark.pandas.DataFrame.quantile +pyspark.sql.functions.percentile_approx + +Examples + +>>> df = ps.DataFrame([ +... ['a', 1], ['a', 2], ['a', 3], +... ['b', 1], ['b', 3], ['b', 5] +... ], columns=['key', 'val']) + +Groupby one column and return the quantile of the remaining columns in +each group. + +>>> df.groupby('key').quantile() + val +key +a2.0 +b3.0 +""" +if is_list_like(q): +raise NotImplementedError("q doesn't support for list like type for now") +if not is_number(q): +raise TypeError("must be real number, not %s" % type(q).__name__) +if not 0 <= q <= 1: +raise Val
[spark] branch master updated: [SPARK-40383][INFRA] Pin `mypy==0.920` in dev/requirements.txt
This is an automated email from the ASF dual-hosted git repository. ruifengz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new f8854184a76 [SPARK-40383][INFRA] Pin `mypy==0.920` in dev/requirements.txt f8854184a76 is described below commit f8854184a76562b4dfdd44a171bfb904fa17cd76 Author: Ruifeng Zheng AuthorDate: Fri Sep 9 08:18:08 2022 +0800 [SPARK-40383][INFRA] Pin `mypy==0.920` in dev/requirements.txt ### What changes were proposed in this pull request? Pin `mypy==0.920` in dev/requirements.txt, the same as that in `.github/workflows/build_and_test.yml` ### Why are the changes needed? before: the installed version of mypy is `mypy 0.971` now, then `dev/lint-python ` always fail: ``` (spark_dev) ➜ spark git:(master) ✗ dev/lint-python starting python compilation test... python compilation succeeded. starting black test... black checks passed. starting flake8 test... flake8 checks passed. starting mypy annotations test... annotations failed mypy checks: python/pyspark/streaming/context.py:372: error: Unused "type: ignore" comment python/pyspark/streaming/context.py:378: error: Unused "type: ignore" comment Found 2 errors in 1 file (checked 339 source files) 1 ``` after: ``` (spark_dev) ➜ spark git:(master) ✗ dev/lint-python starting python compilation test... python compilation succeeded. starting black test... black checks passed. starting flake8 test... flake8 checks passed. starting mypy annotations test... annotations passed mypy checks. starting mypy examples test... examples passed mypy checks. starting mypy data test... annotations passed data checks. all lint-python tests passed! ``` ### Does this PR introduce _any_ user-facing change? No, only for spark contributors. ### How was this patch tested? manually test Closes #37827 from zhengruifeng/infra_pin_mypy. Authored-by: Ruifeng Zheng Signed-off-by: Ruifeng Zheng --- dev/requirements.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dev/requirements.txt b/dev/requirements.txt index 7771b97a732..00055295aaf 100644 --- a/dev/requirements.txt +++ b/dev/requirements.txt @@ -18,7 +18,7 @@ unittest-xml-reporting coverage # Linter -mypy +mypy==0.920 pytest-mypy-plugins==1.9.3 flake8==3.9.0 # See SPARK-38680. - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-40333][PS] Implement `GroupBy.nth`
This is an automated email from the ASF dual-hosted git repository. ruifengz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 4d73552abf3 [SPARK-40333][PS] Implement `GroupBy.nth` 4d73552abf3 is described below commit 4d73552abf39c687a1ef1f742fcecdf7492995af Author: Ruifeng Zheng AuthorDate: Thu Sep 8 16:04:26 2022 +0800 [SPARK-40333][PS] Implement `GroupBy.nth` ### What changes were proposed in this pull request? Implement `GroupBy.nth` ### Why are the changes needed? for API coverage ### Does this PR introduce _any_ user-facing change? yes, new API ``` In [4]: import pyspark.pandas as ps In [5]: import numpy as np In [6]: df = ps.DataFrame({'A': [1, 1, 2, 1, 2], 'B': [np.nan, 2, 3, 4, 5], 'C': ['a', 'b', 'c', 'd', 'e']}, columns=['A', 'B', 'C']) In [7]: df.groupby('A').nth(0) B C A 1 NaN a 2 3.0 c In [8]: df.groupby('A').nth(2) Out[8]: B C A 1 4.0 d In [9]: df.C.groupby(df.A).nth(-1) Out[9]: A 1d 2e Name: C, dtype: object In [10]: df.C.groupby(df.A).nth(-2) Out[10]: A 1b 2c Name: C, dtype: object ``` ### How was this patch tested? added UT Closes #37801 from zhengruifeng/ps_groupby_nth. Authored-by: Ruifeng Zheng Signed-off-by: Ruifeng Zheng --- .../source/reference/pyspark.pandas/groupby.rst| 1 + python/pyspark/pandas/groupby.py | 98 ++ python/pyspark/pandas/missing/groupby.py | 2 - python/pyspark/pandas/tests/test_groupby.py| 11 +++ 4 files changed, 110 insertions(+), 2 deletions(-) diff --git a/python/docs/source/reference/pyspark.pandas/groupby.rst b/python/docs/source/reference/pyspark.pandas/groupby.rst index b331a49b683..24e3bde91f5 100644 --- a/python/docs/source/reference/pyspark.pandas/groupby.rst +++ b/python/docs/source/reference/pyspark.pandas/groupby.rst @@ -73,6 +73,7 @@ Computations / Descriptive Stats GroupBy.mean GroupBy.median GroupBy.min + GroupBy.nth GroupBy.rank GroupBy.sem GroupBy.std diff --git a/python/pyspark/pandas/groupby.py b/python/pyspark/pandas/groupby.py index 84a5a3377f3..01163b61375 100644 --- a/python/pyspark/pandas/groupby.py +++ b/python/pyspark/pandas/groupby.py @@ -895,6 +895,104 @@ class GroupBy(Generic[FrameLike], metaclass=ABCMeta): bool_to_numeric=True, ) +# TODO: 1, 'n' accepts list and slice; 2, implement 'dropna' parameter +def nth(self, n: int) -> FrameLike: +""" +Take the nth row from each group. + +.. versionadded:: 3.4.0 + +Parameters +-- +n : int +A single nth value for the row + +Returns +--- +Series or DataFrame + +Notes +- +There is a behavior difference between pandas-on-Spark and pandas: + +* when there is no aggregation column, and `n` not equal to 0 or -1, +the returned empty dataframe may have an index with different lenght `__len__`. + +Examples + +>>> df = ps.DataFrame({'A': [1, 1, 2, 1, 2], +...'B': [np.nan, 2, 3, 4, 5]}, columns=['A', 'B']) +>>> g = df.groupby('A') +>>> g.nth(0) + B +A +1 NaN +2 3.0 +>>> g.nth(1) + B +A +1 2.0 +2 5.0 +>>> g.nth(-1) + B +A +1 4.0 +2 5.0 + +See Also + +pyspark.pandas.Series.groupby +pyspark.pandas.DataFrame.groupby +""" +if isinstance(n, slice) or is_list_like(n): +raise NotImplementedError("n doesn't support slice or list for now") +if not isinstance(n, int): +raise TypeError("Invalid index %s" % type(n).__name__) + +groupkey_names = [SPARK_INDEX_NAME_FORMAT(i) for i in range(len(self._groupkeys))] +internal, agg_columns, sdf = self._prepare_reduce( +groupkey_names=groupkey_names, +accepted_spark_types=None, +bool_to_numeric=False, +) +psdf: DataFrame = DataFrame(internal) + +if len(psdf._internal.column_labels) > 0: +window1 = Window.partitionBy(*groupkey_names).orderBy(NATURAL_ORDER_COLUMN_NAME) +tmp_row_number_col = verify_temp_column_name(sdf, "__tmp_row_number_col__") +if n >= 0: +sdf = ( +psdf._internal.spark_frame.withColumn( +tmp_row_number_col, F.row_number().over(window1)
[spark] branch master updated: [SPARK-40265][PS] Fix the inconsistent behavior for Index.intersection
This is an automated email from the ASF dual-hosted git repository. ruifengz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 8482ec9e5d8 [SPARK-40265][PS] Fix the inconsistent behavior for Index.intersection 8482ec9e5d8 is described below commit 8482ec9e5d832f89fa55d29cdde0f8005a062f17 Author: itholic AuthorDate: Mon Sep 5 14:56:37 2022 +0800 [SPARK-40265][PS] Fix the inconsistent behavior for Index.intersection ### What changes were proposed in this pull request? This PR proposes to fix the inconsistent behavior for `Index.intersection` function as below: When `other` is list of tuple, the behavior of pandas API on Spark is difference from pandas. - pandas API on Spark ```python >>> psidx Int64Index([1, 2, 3, 4], dtype='int64', name='Koalas') >>> psidx.intersection([(1, 2), (3, 4)]).sort_values() MultiIndex([], ) ``` - pandas ```python >>> pidx Int64Index([1, 2, 3, 4], dtype='int64', name='Koalas') >>> pidx.intersection([(1, 2), (3, 4)]).sort_values() Traceback (most recent call last): ... ValueError: Names should be list-like for a MultiIndex ``` ### Why are the changes needed? To reach parity with pandas. ### Does this PR introduce _any_ user-facing change? Yes, the behavior of `Index.intersection` is chaged, when the `other` is list of tuple: - Before ```python >>> psidx Int64Index([1, 2, 3, 4], dtype='int64', name='Koalas') >>> psidx.intersection([(1, 2), (3, 4)]).sort_values() MultiIndex([], ) ``` - After ```python >>> psidx Int64Index([1, 2, 3, 4], dtype='int64', name='Koalas') >>> psidx.intersection([(1, 2), (3, 4)]).sort_values() Traceback (most recent call last): ... ValueError: Names should be list-like for a MultiIndex ``` ### How was this patch tested? Added a unit test. Closes #37739 from itholic/SPARK-40265. Authored-by: itholic Signed-off-by: Ruifeng Zheng --- python/pyspark/pandas/indexes/base.py| 2 +- python/pyspark/pandas/tests/indexes/test_base.py | 3 +++ 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/python/pyspark/pandas/indexes/base.py b/python/pyspark/pandas/indexes/base.py index facedb1dc91..5043325ccbb 100644 --- a/python/pyspark/pandas/indexes/base.py +++ b/python/pyspark/pandas/indexes/base.py @@ -2509,7 +2509,7 @@ class Index(IndexOpsMixin): elif is_list_like(other): other_idx = Index(other) if isinstance(other_idx, MultiIndex): -return other_idx.to_frame().head(0).index +raise ValueError("Names should be list-like for a MultiIndex") spark_frame_other = other_idx.to_frame()._to_spark() keep_name = True else: diff --git a/python/pyspark/pandas/tests/indexes/test_base.py b/python/pyspark/pandas/tests/indexes/test_base.py index 958314c5741..169a22571ec 100644 --- a/python/pyspark/pandas/tests/indexes/test_base.py +++ b/python/pyspark/pandas/tests/indexes/test_base.py @@ -1977,6 +1977,9 @@ class IndexesTest(ComparisonTestBase, TestUtils): psidx.intersection(ps.DataFrame({"A": [1, 2, 3], "B": [4, 5, 6]})) with self.assertRaisesRegex(ValueError, "Index data must be 1-dimensional"): psmidx.intersection(ps.DataFrame({"A": [1, 2, 3], "B": [4, 5, 6]})) +# other = list of tuple +with self.assertRaisesRegex(ValueError, "Names should be list-like for a MultiIndex"): +psidx.intersection([(1, 2), (3, 4)]) def test_item(self): pidx = pd.Index([10]) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-40301][PYTHON] Add parameter validations in pyspark.rdd
This is an automated email from the ASF dual-hosted git repository. ruifengz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new c14320497d5 [SPARK-40301][PYTHON] Add parameter validations in pyspark.rdd c14320497d5 is described below commit c14320497d5415616b3f65b120336f963d8ab46b Author: Ruifeng Zheng AuthorDate: Sun Sep 4 14:29:42 2022 +0800 [SPARK-40301][PYTHON] Add parameter validations in pyspark.rdd ### What changes were proposed in this pull request? 1,compared with the scala side, some parameter validations were missing in `pyspark.rdd` 2, `rdd.sample` checking fraction will raise `ValueError` instead of `AssertionError` ### Why are the changes needed? add missing parameter validations in `pyspark.rdd` ### Does this PR introduce _any_ user-facing change? yes, when fraction is invalide, `ValueError` is raised instead of `AssertionError` ### How was this patch tested? existing testsutes Closes #37752 from zhengruifeng/py_rdd_check. Authored-by: Ruifeng Zheng Signed-off-by: Ruifeng Zheng --- python/pyspark/rdd.py | 9 - 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 5fe463233a2..7ef0014ae75 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -1039,7 +1039,8 @@ class RDD(Generic[T_co]): >>> 6 <= rdd.sample(False, 0.1, 81).count() <= 14 True """ -assert fraction >= 0.0, "Negative fraction value: %s" % fraction +if not fraction >= 0: +raise ValueError("Fraction must be nonnegative.") return self.mapPartitionsWithIndex(RDDSampler(withReplacement, fraction, seed).func, True) def randomSplit( @@ -1077,7 +1078,11 @@ class RDD(Generic[T_co]): >>> 250 < rdd2.count() < 350 True """ +if not all(w >= 0 for w in weights): +raise ValueError("Weights must be nonnegative") s = float(sum(weights)) +if not s > 0: +raise ValueError("Sum of weights must be positive") cweights = [0.0] for w in weights: cweights.append(cweights[-1] + w / s) @@ -4565,6 +4570,8 @@ class RDD(Generic[T_co]): >>> sc.parallelize([1, 2, 3, 4, 5], 3).coalesce(1).glom().collect() [[1, 2, 3, 4, 5]] """ +if not numPartitions > 0: +raise ValueError("Number of partitions must be positive.") if shuffle: # Decrease the batch size in order to distribute evenly the elements across output # partitions. Otherwise, repartition will possibly produce highly skewed partitions. - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-40305][PS] Implement Groupby.sem
This is an automated email from the ASF dual-hosted git repository. ruifengz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new dde94a68b85 [SPARK-40305][PS] Implement Groupby.sem dde94a68b85 is described below commit dde94a68b850b23df8fca3531350a7b5643b3cd1 Author: Ruifeng Zheng AuthorDate: Sun Sep 4 14:25:47 2022 +0800 [SPARK-40305][PS] Implement Groupby.sem ### What changes were proposed in this pull request? Implement Groupby.sem ### Why are the changes needed? to increase API coverage ### Does this PR introduce _any_ user-facing change? yes, new API ### How was this patch tested? added UT Closes #37756 from zhengruifeng/ps_groupby_sem. Authored-by: Ruifeng Zheng Signed-off-by: Ruifeng Zheng --- .../source/reference/pyspark.pandas/groupby.rst| 1 + python/pyspark/pandas/generic.py | 2 +- python/pyspark/pandas/groupby.py | 80 -- python/pyspark/pandas/missing/groupby.py | 2 - python/pyspark/pandas/tests/test_groupby.py| 21 ++ 5 files changed, 97 insertions(+), 9 deletions(-) diff --git a/python/docs/source/reference/pyspark.pandas/groupby.rst b/python/docs/source/reference/pyspark.pandas/groupby.rst index 6d8eed8e684..b331a49b683 100644 --- a/python/docs/source/reference/pyspark.pandas/groupby.rst +++ b/python/docs/source/reference/pyspark.pandas/groupby.rst @@ -74,6 +74,7 @@ Computations / Descriptive Stats GroupBy.median GroupBy.min GroupBy.rank + GroupBy.sem GroupBy.std GroupBy.sum GroupBy.var diff --git a/python/pyspark/pandas/generic.py b/python/pyspark/pandas/generic.py index bd2b68da51f..8ce3061b7cd 100644 --- a/python/pyspark/pandas/generic.py +++ b/python/pyspark/pandas/generic.py @@ -2189,7 +2189,7 @@ class Frame(object, metaclass=ABCMeta): return F.stddev_samp(spark_column) def sem(psser: "Series") -> Column: -return std(psser) / pow(Frame._count_expr(psser), 0.5) +return std(psser) / F.sqrt(Frame._count_expr(psser)) return self._reduce_for_stat_function( sem, diff --git a/python/pyspark/pandas/groupby.py b/python/pyspark/pandas/groupby.py index 4377ad6a5c9..84a5a3377f3 100644 --- a/python/pyspark/pandas/groupby.py +++ b/python/pyspark/pandas/groupby.py @@ -650,12 +650,11 @@ class GroupBy(Generic[FrameLike], metaclass=ABCMeta): assert ddof in (0, 1) # Raise the TypeError when all aggregation columns are of unaccepted data types -all_unaccepted = True -for _agg_col in self._agg_columns: -if isinstance(_agg_col.spark.data_type, (NumericType, BooleanType)): -all_unaccepted = False -break -if all_unaccepted: +any_accepted = any( +isinstance(_agg_col.spark.data_type, (NumericType, BooleanType)) +for _agg_col in self._agg_columns +) +if not any_accepted: raise TypeError( "Unaccepted data types of aggregation columns; numeric or bool expected." ) @@ -827,6 +826,75 @@ class GroupBy(Generic[FrameLike], metaclass=ABCMeta): return self._prepare_return(DataFrame(internal)) +def sem(self, ddof: int = 1) -> FrameLike: +""" +Compute standard error of the mean of groups, excluding missing values. + +.. versionadded:: 3.4.0 + +Parameters +-- +ddof : int, default 1 +Delta Degrees of Freedom. The divisor used in calculations is N - ddof, +where N represents the number of elements. + +Examples + +>>> df = ps.DataFrame({"A": [1, 2, 1, 1], "B": [True, False, False, True], +..."C": [3, None, 3, 4], "D": ["a", "b", "b", "a"]}) + +>>> df.groupby("A").sem() + B C +A +1 0.33 0.33 +2 NaN NaN + +>>> df.groupby("D").sem(ddof=1) + ABC +D +a 0.0 0.0 0.5 +b 0.5 0.0 NaN + +>>> df.B.groupby(df.A).sem() +A +10.33 +2 NaN +Name: B, dtype: float64 + +See Also + +pyspark.pandas.Series.sem +pyspark.pandas.DataFrame.sem +""" +if ddof not in [0, 1]: +raise TypeError("ddof must be 0 or 1") + +# Raise the TypeError when all aggregation columns are of unaccepted data types +any_accepted = any( +isinstance(_agg_col.
[spark] branch master updated: [SPARK-39284][FOLLOW] Add Groupby.mad to API references
This is an automated email from the ASF dual-hosted git repository. ruifengz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 2ef2ad27faa [SPARK-39284][FOLLOW] Add Groupby.mad to API references 2ef2ad27faa is described below commit 2ef2ad27faa2599c687f7ead2a2855fa9b7495a3 Author: Ruifeng Zheng AuthorDate: Fri Sep 2 17:30:41 2022 +0800 [SPARK-39284][FOLLOW] Add Groupby.mad to API references ### What changes were proposed in this pull request? Add `Groupby.mad` to API references ### Why are the changes needed? `Groupby.mad` was implemented in https://github.com/apache/spark/pull/36660, but I forgot to add it to the doc ### Does this PR introduce _any_ user-facing change? yes, this API will be listed in the references ### How was this patch tested? existing doc building Closes #37767 from zhengruifeng/ps_ref_groupby_mad. Authored-by: Ruifeng Zheng Signed-off-by: Ruifeng Zheng --- python/docs/source/reference/pyspark.pandas/groupby.rst | 1 + 1 file changed, 1 insertion(+) diff --git a/python/docs/source/reference/pyspark.pandas/groupby.rst b/python/docs/source/reference/pyspark.pandas/groupby.rst index 2aa39d25765..6d8eed8e684 100644 --- a/python/docs/source/reference/pyspark.pandas/groupby.rst +++ b/python/docs/source/reference/pyspark.pandas/groupby.rst @@ -68,6 +68,7 @@ Computations / Descriptive Stats GroupBy.filter GroupBy.first GroupBy.last + GroupBy.mad GroupBy.max GroupBy.mean GroupBy.median - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (fd0498f81df -> b61bfde61d3)
This is an automated email from the ASF dual-hosted git repository. ruifengz pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git from fd0498f81df [SPARK-40304][K8S][TESTS] Add `decomTestTag` to K8s Integration Test add b61bfde61d3 [SPARK-40210][PYTHON] Fix math atan2, hypot, pow and pmod float argument call No new revisions were added by this update. Summary of changes: python/pyspark/sql/functions.py| 59 -- python/pyspark/sql/tests/test_functions.py | 10 + 2 files changed, 17 insertions(+), 52 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-40276][CORE] Reduce the result size of RDD.takeOrdered
This is an automated email from the ASF dual-hosted git repository. ruifengz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new a027db1dc03 [SPARK-40276][CORE] Reduce the result size of RDD.takeOrdered a027db1dc03 is described below commit a027db1dc0379d823cbd638181bb25095f4e6577 Author: Ruifeng Zheng AuthorDate: Thu Sep 1 09:19:11 2022 +0800 [SPARK-40276][CORE] Reduce the result size of RDD.takeOrdered ### What changes were proposed in this pull request? use `Array` instead of `BoundedPriorityQueue` to store intermediate results ### Why are the changes needed? 1, encountered a case that `RDD.takeOrdered` fails due to `Total size of serialized results of xxx tasks (... MiB) is bigger than spark.driver.` 2, performance improvement: `bin/spark-shell --driver-memory=4G` ```scala Seq(10, 100, 1000, 1, 5, 10).foreach { n => val start = System.currentTimeMillis; Seq.range(0, 10).foreach(_ => sc.range(0, 1, 1, 1000).top(n)); val duration = System.currentTimeMillis - start; println(s"n=$n, duration=$duration") } ``` duration | n=10 | n=100 | n=1,000 | n=10,000 | n=50,000 | n=100,000 -- | -- | -- | -- | -- | -- | -- master | 2,552 | 2,197 | 2,543 | 10,003 | 58,552 | org.apache.spark.SparkException: Job aborted due to stage failure: Total size of serialized results of 763 tasks (1024.6 MiB) is bigger than spark.driver this PR | 2,556 | 2,138 | 2,196 | 7,371 | 33,903 | 66,895 this PR + treeReduce | 9,160 | 9,748 | 9,728 | 11,441 | 17,216 | 24,728 it is strange that `this PR + treeReduce` turns out to be slowest when `n` is small, so still use `reduce` in this PR. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? added UT Closes #37728 from zhengruifeng/core_take_ordered. Authored-by: Ruifeng Zheng Signed-off-by: Ruifeng Zheng --- core/src/main/scala/org/apache/spark/rdd/RDD.scala | 32 -- .../org/apache/spark/util/collection/Utils.scala | 19 - .../test/scala/org/apache/spark/rdd/RDDSuite.scala | 5 .../resources/tpcds-query-results/v1_4/q77.sql.out | 2 +- .../tpcds-query-results/v2_7/q77a.sql.out | 2 +- 5 files changed, 42 insertions(+), 18 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index 461510b2526..d12804fc12b 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -46,7 +46,7 @@ import org.apache.spark.partial.GroupedCountEvaluator import org.apache.spark.partial.PartialResult import org.apache.spark.resource.ResourceProfile import org.apache.spark.storage.{RDDBlockId, StorageLevel} -import org.apache.spark.util.{BoundedPriorityQueue, Utils} +import org.apache.spark.util.Utils import org.apache.spark.util.collection.{ExternalAppendOnlyMap, OpenHashMap, Utils => collectionUtils} import org.apache.spark.util.random.{BernoulliCellSampler, BernoulliSampler, PoissonSampler, @@ -1523,22 +1523,24 @@ abstract class RDD[T: ClassTag]( * @return an array of top elements */ def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T] = withScope { -if (num == 0) { +if (num == 0 || this.getNumPartitions == 0) { Array.empty } else { - val mapRDDs = mapPartitions { items => -// Priority keeps the largest elements, so let's reverse the ordering. -val queue = new BoundedPriorityQueue[T](num)(ord.reverse) -queue ++= collectionUtils.takeOrdered(items, num)(ord) -Iterator.single(queue) - } - if (mapRDDs.partitions.length == 0) { -Array.empty - } else { -mapRDDs.reduce { (queue1, queue2) => - queue1 ++= queue2 - queue1 -}.toArray.sorted(ord) + this.mapPartitionsWithIndex { case (pid, iter) => +if (iter.nonEmpty) { + // Priority keeps the largest elements, so let's reverse the ordering. + Iterator.single(collectionUtils.takeOrdered(iter, num)(ord).toArray) +} else if (pid == 0) { + // make sure partition 0 always returns an array to avoid reduce on empty RDD + Iterator.single(Array.empty[T]) +} else { + Iterator.empty +} + }.reduce { (array1, array2) => +val size = math.min(num, array1.length + array2.length) +val array = Array.ofDim[T](size) +collectionUtils.mergeOrdered[T](Seq(array1, array2))(ord).copyToArray(array, 0, size) +array } } } diff --git a/core/src/main/scala/org/apache/spark/util/collection/Utils.scala b/core/src/main/scala/org/apache/spark/util/collection/Utils.scala
[spark] branch master updated (6e62b93f3d1 -> 6c3c9575cf0)
This is an automated email from the ASF dual-hosted git repository. ruifengz pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git from 6e62b93f3d1 [SPARK-39896][SQL] UnwrapCastInBinaryComparison should work when the literal of In/InSet downcast failed add 6c3c9575cf0 [SPARK-40271][FOLLOWUP][PYTHON] fix the minor exception message No new revisions were added by this update. Summary of changes: python/pyspark/sql/functions.py| 2 +- python/pyspark/sql/tests/test_functions.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-40271][PYTHON] Support list type for `pyspark.sql.functions.lit`
This is an automated email from the ASF dual-hosted git repository. ruifengz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 65d89f8e897 [SPARK-40271][PYTHON] Support list type for `pyspark.sql.functions.lit` 65d89f8e897 is described below commit 65d89f8e897449f7f026144a76328ff545fecde2 Author: itholic AuthorDate: Wed Aug 31 11:37:20 2022 +0800 [SPARK-40271][PYTHON] Support list type for `pyspark.sql.functions.lit` ### What changes were proposed in this pull request? This PR proposes to support `list` type for `pyspark.sql.functions.lit`. ### Why are the changes needed? To improve the API usability. ### Does this PR introduce _any_ user-facing change? Yes, now the `list` type is available for `pyspark.sql.functions.list` as below: - Before ```python >>> spark.range(3).withColumn("c", lit([1,2,3])).show() Traceback (most recent call last): ... : org.apache.spark.SparkRuntimeException: [UNSUPPORTED_FEATURE.LITERAL_TYPE] The feature is not supported: Literal for '[1, 2, 3]' of class java.util.ArrayList. at org.apache.spark.sql.errors.QueryExecutionErrors$.literalTypeUnsupportedError(QueryExecutionErrors.scala:302) at org.apache.spark.sql.catalyst.expressions.Literal$.apply(literals.scala:100) at org.apache.spark.sql.functions$.lit(functions.scala:125) at org.apache.spark.sql.functions.lit(functions.scala) at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:104) at java.base/java.lang.reflect.Method.invoke(Method.java:577) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374) at py4j.Gateway.invoke(Gateway.java:282) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182) at py4j.ClientServerConnection.run(ClientServerConnection.java:106) at java.base/java.lang.Thread.run(Thread.java:833) ``` - After ```python >>> spark.range(3).withColumn("c", lit([1,2,3])).show() +---+-+ | id|c| +---+-+ | 0|[1, 2, 3]| | 1|[1, 2, 3]| | 2|[1, 2, 3]| +---+-+ ``` ### How was this patch tested? Added doctest & unit test. Closes #37722 from itholic/SPARK-40271. Authored-by: itholic Signed-off-by: Ruifeng Zheng --- python/pyspark/sql/functions.py| 23 +-- python/pyspark/sql/tests/test_functions.py | 26 ++ 2 files changed, 47 insertions(+), 2 deletions(-) diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py index 03c16db602f..e7a7a1b37f3 100644 --- a/python/pyspark/sql/functions.py +++ b/python/pyspark/sql/functions.py @@ -131,10 +131,13 @@ def lit(col: Any) -> Column: Parameters -- -col : :class:`~pyspark.sql.Column` or Python primitive type. +col : :class:`~pyspark.sql.Column`, str, int, float, bool or list. the value to make it as a PySpark literal. If a column is passed, it returns the column as is. +.. versionchanged:: 3.4.0 +Since 3.4.0, it supports the list type. + Returns --- :class:`~pyspark.sql.Column` @@ -149,8 +152,24 @@ def lit(col: Any) -> Column: +--+---+ | 5| 0| +--+---+ + +Create a literal from a list. + +>>> spark.range(1).select(lit([1, 2, 3])).show() ++--+ +|array(1, 2, 3)| ++--+ +| [1, 2, 3]| ++--+ """ -return col if isinstance(col, Column) else _invoke_function("lit", col) +if isinstance(col, Column): +return col +elif isinstance(col, list): +if any(isinstance(c, Column) for c in col): +raise ValueError("lit does not allow for list of Columns") +return array(*[lit(item) for item in col]) +else: +return _invoke_function("lit", col) def col(col: str) -> Column: diff --git a/python/pyspark/sql/tests/test_functions.py b/python/pyspark/sql/tests/test_functions.py index 102ebef8317..1d02a540558 100644 --- a/python/pyspark/sql/tests/test_functions.py +++ b/python/pyspark/sql/tests/test_functions.py @@ -962,6 +962,32 @@ class FunctionsTests(ReusedSQLTestCase): actual = self.spark.range(1).select(lit(td)).first()[0] self.assertEqual(actual, td) +def test_lit_list(self):
[spark] branch master updated (ca17135b288 -> e0cb2eb104e)
This is an automated email from the ASF dual-hosted git repository. ruifengz pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git from ca17135b288 [SPARK-34777][UI] Fix StagePage input size/records not show when records greater than zero add e0cb2eb104e [SPARK-40135][PS] Support `data` mixed with `index` in DataFrame creation No new revisions were added by this update. Summary of changes: python/pyspark/pandas/frame.py| 177 +++ python/pyspark/pandas/tests/test_dataframe.py | 239 +- 2 files changed, 377 insertions(+), 39 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (527ddece8fd -> 375b4e70029)
This is an automated email from the ASF dual-hosted git repository. ruifengz pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git from 527ddece8fd [SPARK-40247][SQL] Fix BitSet equality check add 375b4e70029 [SPARK-40243][DOCS] Enhance Hive UDF support documentation No new revisions were added by this update. Summary of changes: docs/sql-ref-functions-udf-hive.md | 12 1 file changed, 12 insertions(+) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (f555a0d80e1 -> 4dd20ceb38b)
This is an automated email from the ASF dual-hosted git repository. ruifengz pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git from f555a0d80e1 [SPARK-40240][PYTHON] PySpark rdd.takeSample should correctly validate `num > maxSampleSize` add 4dd20ceb38b [SPARK-40239][CORE] Remove duplicated 'fraction' validation in RDD.sample No new revisions were added by this update. Summary of changes: core/src/main/scala/org/apache/spark/rdd/RDD.scala | 1 - 1 file changed, 1 deletion(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-40240][PYTHON] PySpark rdd.takeSample should correctly validate `num > maxSampleSize`
This is an automated email from the ASF dual-hosted git repository. ruifengz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new f555a0d80e1 [SPARK-40240][PYTHON] PySpark rdd.takeSample should correctly validate `num > maxSampleSize` f555a0d80e1 is described below commit f555a0d80e1858ca30527328ca240b56ae6f415e Author: Ruifeng Zheng AuthorDate: Sun Aug 28 07:27:13 2022 +0800 [SPARK-40240][PYTHON] PySpark rdd.takeSample should correctly validate `num > maxSampleSize` ### What changes were proposed in this pull request? to make the PySpark `rdd.takeSample` behave like the scala side ### Why are the changes needed? `rdd.takeSample` in Spark-Core checks the `num > maxsize - int(numStDev * sqrt(maxsize))` at first, while in the PySpark, it may skip this validation: ```scala scala> sc.range(0, 10).takeSample(false, Int.MaxValue) java.lang.IllegalArgumentException: requirement failed: Cannot support a sample size > Int.MaxValue - 10.0 * math.sqrt(Int.MaxValue) at scala.Predef$.require(Predef.scala:281) at org.apache.spark.rdd.RDD.$anonfun$takeSample$1(RDD.scala:620) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:406) at org.apache.spark.rdd.RDD.takeSample(RDD.scala:615) ... 47 elided ``` ```python In [2]: sc.range(0, 10).takeSample(False, sys.maxsize) Out[2]: [9, 6, 8, 5, 7, 2, 0, 3, 4, 1] ``` ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? added doctest Closes #37683 from zhengruifeng/py_refine_takesample. Authored-by: Ruifeng Zheng Signed-off-by: Ruifeng Zheng --- python/pyspark/rdd.py | 16 ++-- 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index b631f141a89..5fe463233a2 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -1122,6 +1122,7 @@ class RDD(Generic[T_co]): Examples +>>> import sys >>> rdd = sc.parallelize(range(0, 10)) >>> len(rdd.takeSample(True, 20, 1)) 20 @@ -1129,12 +1130,19 @@ class RDD(Generic[T_co]): 5 >>> len(rdd.takeSample(False, 15, 3)) 10 +>>> sc.range(0, 10).takeSample(False, sys.maxsize) +Traceback (most recent call last): +... +ValueError: Sample size cannot be greater than ... """ numStDev = 10.0 - +maxSampleSize = sys.maxsize - int(numStDev * sqrt(sys.maxsize)) if num < 0: raise ValueError("Sample size cannot be negative.") -elif num == 0: +elif num > maxSampleSize: +raise ValueError("Sample size cannot be greater than %d." % maxSampleSize) + +if num == 0 or self.getNumPartitions() == 0: return [] initialCount = self.count() @@ -1149,10 +1157,6 @@ class RDD(Generic[T_co]): rand.shuffle(samples) return samples -maxSampleSize = sys.maxsize - int(numStDev * sqrt(sys.maxsize)) -if num > maxSampleSize: -raise ValueError("Sample size cannot be greater than %d." % maxSampleSize) - fraction = RDD._computeFractionForSampleSize(num, initialCount, withReplacement) samples = self.sample(withReplacement, fraction, seed).collect() - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-40214][PYTHON][SQL] add 'get' to functions
This is an automated email from the ASF dual-hosted git repository. ruifengz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 295e98d29b3 [SPARK-40214][PYTHON][SQL] add 'get' to functions 295e98d29b3 is described below commit 295e98d29b34e2b472c375608b8782c3b9189444 Author: Ruifeng Zheng AuthorDate: Thu Aug 25 14:44:18 2022 +0800 [SPARK-40214][PYTHON][SQL] add 'get' to functions ### What changes were proposed in this pull request? expose `get` to dataframe functions ### Why are the changes needed? for function parity ### Does this PR introduce _any_ user-facing change? yes, new API ### How was this patch tested? added UT Closes #37652 from zhengruifeng/py_get. Authored-by: Ruifeng Zheng Signed-off-by: Ruifeng Zheng --- .../source/reference/pyspark.sql/functions.rst | 1 + python/pyspark/sql/functions.py| 70 ++ .../scala/org/apache/spark/sql/functions.scala | 11 .../apache/spark/sql/DataFrameFunctionsSuite.scala | 38 4 files changed, 120 insertions(+) diff --git a/python/docs/source/reference/pyspark.sql/functions.rst b/python/docs/source/reference/pyspark.sql/functions.rst index a799bb8ad0a..027babbf57d 100644 --- a/python/docs/source/reference/pyspark.sql/functions.rst +++ b/python/docs/source/reference/pyspark.sql/functions.rst @@ -176,6 +176,7 @@ Collection Functions explode_outer posexplode posexplode_outer +get get_json_object json_tuple from_json diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py index d59532f52cb..fd7a7247fc8 100644 --- a/python/pyspark/sql/functions.py +++ b/python/pyspark/sql/functions.py @@ -4832,6 +4832,10 @@ def element_at(col: "ColumnOrName", extraction: Any) -> Column: - The position is not zero based, but 1 based index. +See Also + +:meth:`get` + Examples >>> df = spark.createDataFrame([(["a", "b", "c"],)], ['data']) @@ -4845,6 +4849,72 @@ def element_at(col: "ColumnOrName", extraction: Any) -> Column: return _invoke_function_over_columns("element_at", col, lit(extraction)) +def get(col: "ColumnOrName", index: Union["ColumnOrName", int]) -> Column: +""" +Collection function: Returns element of array at given (0-based) index. +If the index points outside of the array boundaries, then this function +returns NULL. + +.. versionadded:: 3.4.0 + +Parameters +-- +col : :class:`~pyspark.sql.Column` or str +name of column containing array +index : :class:`~pyspark.sql.Column` or str or int +index to check for in array + +Notes +- +The position is not 1 based, but 0 based index. + +See Also + +:meth:`element_at` + +Examples + +>>> df = spark.createDataFrame([(["a", "b", "c"], 1)], ['data', 'index']) +>>> df.select(get(df.data, 1)).show() +++ +|get(data, 1)| +++ +| b| +++ + +>>> df.select(get(df.data, -1)).show() ++-+ +|get(data, -1)| ++-+ +| null| ++-+ + +>>> df.select(get(df.data, 3)).show() +++ +|get(data, 3)| +++ +|null| +++ + +>>> df.select(get(df.data, "index")).show() +++ +|get(data, index)| +++ +| b| +++ + +>>> df.select(get(df.data, col("index") - 1)).show() ++--+ +|get(data, (index - 1))| ++--+ +| a| ++--+ +""" +index = lit(index) if isinstance(index, int) else index + +return _invoke_function_over_columns("get", col, index) + + def array_remove(col: "ColumnOrName", element: Any) -> Column: """ Collection function: Remove all elements that equal to element from the given array. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala index bd7473706ca..69da277d5e6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala @@ -3958,6 +3958,17 @@ object functions { ElementAt(column.expr, lit(value).expr) } + /** + * Returns element of array at given (0-based) index. If the index points + * outside of th
[spark] branch master updated: [SPARK-39734][SPARK-36259][SPARK-39733][SPARK-37348][PYTHON] Functions Parity between PySpark and SQL
This is an automated email from the ASF dual-hosted git repository. ruifengz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new f8b15395cf3 [SPARK-39734][SPARK-36259][SPARK-39733][SPARK-37348][PYTHON] Functions Parity between PySpark and SQL f8b15395cf3 is described below commit f8b15395cf347b6c6c6a4a20077fdeb31bfabb24 Author: Ruifeng Zheng AuthorDate: Wed Aug 10 19:33:47 2022 +0800 [SPARK-39734][SPARK-36259][SPARK-39733][SPARK-37348][PYTHON] Functions Parity between PySpark and SQL ### What changes were proposed in this pull request? Implement the missing functions in PySpark: - call_udf - localtimestamp - map_contains_key - pmod After this PR, all functions in `org.apache.spark.sql.functions` can be found in `pyspark.sql.functions` or or have equivalents (e.g. `not` -> `~`) ### Why are the changes needed? for function parity ### Does this PR introduce _any_ user-facing change? yes, 4 new APIs added ### How was this patch tested? added doctests Closes #37449 from zhengruifeng/py_func_parity. Authored-by: Ruifeng Zheng Signed-off-by: Ruifeng Zheng --- .../source/reference/pyspark.sql/functions.rst | 4 + python/pyspark/sql/functions.py| 138 - python/pyspark/sql/tests/test_functions.py | 7 +- 3 files changed, 142 insertions(+), 7 deletions(-) diff --git a/python/docs/source/reference/pyspark.sql/functions.rst b/python/docs/source/reference/pyspark.sql/functions.rst index ea495445426..a799bb8ad0a 100644 --- a/python/docs/source/reference/pyspark.sql/functions.rst +++ b/python/docs/source/reference/pyspark.sql/functions.rst @@ -84,6 +84,7 @@ Math Functions log10 log1p log2 +pmod pow rint round @@ -125,6 +126,7 @@ Datetime Functions quarter month last_day +localtimestamp minute months_between next_day @@ -188,6 +190,7 @@ Collection Functions flatten sequence array_repeat +map_contains_key map_keys map_values map_entries @@ -326,6 +329,7 @@ UDF .. autosummary:: :toctree: api/ +call_udf pandas_udf udf unwrap_udt diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py index e73c70d8ca0..9dd81145243 100644 --- a/python/pyspark/sql/functions.py +++ b/python/pyspark/sql/functions.py @@ -1077,6 +1077,45 @@ def pow(col1: Union["ColumnOrName", float], col2: Union["ColumnOrName", float]) return _invoke_binary_math_function("pow", col1, col2) +def pmod(dividend: Union["ColumnOrName", float], divisor: Union["ColumnOrName", float]) -> Column: +""" +Returns the positive value of dividend mod divisor. + +.. versionadded:: 3.4.0 + +Parameters +-- +dividend : str, :class:`~pyspark.sql.Column` or float +the column that contains dividend, or the specified dividend value +divisor : str, :class:`~pyspark.sql.Column` or float +the column that contains divisor, or the specified divisor value + +Examples + +>>> from pyspark.sql.functions import pmod +>>> df = spark.createDataFrame([ +... (1.0, float('nan')), (float('nan'), 2.0), (10.0, 3.0), +... (float('nan'), float('nan')), (-3.0, 4.0), (-10.0, 3.0), +... (-5.0, -6.0), (7.0, -8.0), (1.0, 2.0)], +... ("a", "b")) +>>> df.select(pmod("a", "b")).show() ++--+ +|pmod(a, b)| ++--+ +| NaN| +| NaN| +| 1.0| +| NaN| +| 1.0| +| 2.0| +| -5.0| +| 7.0| +| 1.0| ++--+ +""" +return _invoke_binary_math_function("pmod", dividend, divisor) + + @since(1.6) def row_number() -> Column: """ @@ -1997,6 +2036,28 @@ def current_timestamp() -> Column: return _invoke_function("current_timestamp") +def localtimestamp() -> Column: +""" +Returns the current timestamp without time zone at the start of query evaluation +as a timestamp without time zone column. All calls of localtimestamp within the +same query return the same value. + +.. versionadded:: 3.4.0 + +Examples + +>>> from pyspark.sql.functions import localtimestamp +>>> df = spark.range(0, 100) +>>> df.select(localtimestamp()).distinct().show() +++ +|localtimestamp()| +++ +|20...-...-... ...:...:...| +++ +""" +return _invoke_function(&
[spark] branch master updated (527cce5dfde -> b012cb722f6)
This is an automated email from the ASF dual-hosted git repository. ruifengz pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git from 527cce5dfde [SPARK-40006][PYTHON][DOCS] Make pyspark.sql.group examples self-contained add b012cb722f6 [SPARK-40007][PYTHON][SQL] Add 'mode' to functions No new revisions were added by this update. Summary of changes: .../source/reference/pyspark.sql/functions.rst | 1 + python/pyspark/sql/functions.py| 34 ++ .../sql/catalyst/expressions/aggregate/Mode.scala | 2 +- .../scala/org/apache/spark/sql/functions.scala | 8 + 4 files changed, 44 insertions(+), 1 deletion(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (1fed903f408 -> 0eef968efa4)
This is an automated email from the ASF dual-hosted git repository. ruifengz pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git from 1fed903f408 [SPARK-40004][CORE] Remove redundant `LevelDB.get` in `RemoteBlockPushResolver` add 0eef968efa4 [SPARK-40003][PYTHON][SQL] Add 'median' to functions No new revisions were added by this update. Summary of changes: .../source/reference/pyspark.sql/functions.rst | 1 + python/pyspark/sql/functions.py| 34 ++ .../scala/org/apache/spark/sql/functions.scala | 8 + 3 files changed, 43 insertions(+) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-39939][PYTHON][PS] return self.copy during calling shift with period == 0
This is an automated email from the ASF dual-hosted git repository. ruifengz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new c9eba123990 [SPARK-39939][PYTHON][PS] return self.copy during calling shift with period == 0 c9eba123990 is described below commit c9eba123990e2e9d7d3a7f2f5c67e55115aa1731 Author: bzhaoop AuthorDate: Wed Aug 3 18:40:26 2022 +0800 [SPARK-39939][PYTHON][PS] return self.copy during calling shift with period == 0 PySpark raises Error when we call shift func with periods=0. The behavior of Pandas will return a same copy for the said obj. ### What changes were proposed in this pull request? Will return self.copy when period == 0 ### Why are the changes needed? Behaviors between PySpark and pandas are different PySpark: ``` >>> df = ps.DataFrame({'Col1': [10, 20, 15, 30, 45], 'Col2': [13, 23, 18, 33, 48],'Col3': [17, 27, 22, 37, 52]},columns=['Col1', 'Col2', 'Col3']) >>> df.Col1.shift(periods=3) 0   NaN 1   NaN 2   NaN 3   10.0 4   20.0 Name: Col1, dtype: float64 >>> df.Col1.shift(periods=0) Traceback (most recent call last):  File "", line 1, in  File "/home/spark/spark/python/pyspark/pandas/base.py", line 1170, in shift   return self._shift(periods, fill_value).spark.analyzed  File "/home/spark/spark/python/pyspark/pandas/spark/accessors.py", line 256, in analyzed   return first_series(DataFrame(self._data._internal.resolved_copy))  File "/home/spark/spark/python/pyspark/pandas/utils.py", line 589, in wrapped_lazy_property   setattr(self, attr_name, fn(self))  File "/home/spark/spark/python/pyspark/pandas/internal.py", line 1173, in resolved_copy   sdf = self.spark_frame.select(self.spark_columns + list(HIDDEN_COLUMNS))  File "/home/spark/spark/python/pyspark/sql/dataframe.py", line 2073, in select   jdf = self._jdf.select(self._jcols(*cols))  File "/home/spark/.pyenv/versions/3.8.13/lib/python3.8/site-packages/py4j/java_gateway.py", line 1321, in __call__   return_value = get_return_value(  File "/home/spark/spark/python/pyspark/sql/utils.py", line 196, in deco   raise converted from None pyspark.sql.utils.AnalysisException: Cannot specify window frame for lag function ``` pandas: ``` >>> pdf = pd.DataFrame({'Col1': [10, 20, 15, 30, 45], 'Col2': [13, 23, 18, 33, 48],'Col3': [17, 27, 22, 37, 52]},columns=['Col1', 'Col2', 'Col3']) >>> pdf.Col1.shift(periods=3) 0   NaN 1   NaN 2   NaN 3   10.0 4   20.0 Name: Col1, dtype: float64 >>> pdf.Col1.shift(periods=0) 0   10 1   20 2   15 3   30 4   45 Name: Col1, dtype: int64 ``` ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? call shift func with period == 0. Closes #37366 from bzhaoopenstack/period. Authored-by: bzhaoop Signed-off-by: Ruifeng Zheng --- python/pyspark/pandas/base.py | 3 +++ python/pyspark/pandas/tests/test_dataframe.py | 1 + python/pyspark/pandas/tests/test_series.py| 2 ++ 3 files changed, 6 insertions(+) diff --git a/python/pyspark/pandas/base.py b/python/pyspark/pandas/base.py index 3430f5efa93..bf7149e6b23 100644 --- a/python/pyspark/pandas/base.py +++ b/python/pyspark/pandas/base.py @@ -1179,6 +1179,9 @@ class IndexOpsMixin(object, metaclass=ABCMeta): if not isinstance(periods, int): raise TypeError("periods should be an int; however, got [%s]" % type(periods).__name__) +if periods == 0: +return self.copy() + col = self.spark.column window = ( Window.partitionBy(*part_cols) diff --git a/python/pyspark/pandas/tests/test_dataframe.py b/python/pyspark/pandas/tests/test_dataframe.py index 1361c44404a..add93faba0c 100644 --- a/python/pyspark/pandas/tests/test_dataframe.py +++ b/python/pyspark/pandas/tests/test_dataframe.py @@ -4249,6 +4249,7 @@ class DataFrameTest(ComparisonTestBase, SQLTestUtils): psdf.columns = columns self.assert_eq(pdf.shift(3), psdf.shift(3)) self.assert_eq(pdf.shift().shift(-1), psdf.shift().shift(-1)) +self.assert_eq(pdf.shift(0), psdf.shift(0)) def test_diff(self): pdf = pd.DataFrame( diff --git a/python/pyspark/pandas/tests/test_series.py b/python/pyspark/pandas/tests/test_series.py index 144df0f986a..6bc07def712 100644 --- a/python/pyspark/pandas/tests/test_series.py +++ b/python/pyspark/pandas/tests/test_series.py @@ -1549,6 +1549,8 @@ class SeriesTest(PandasOnSparkTestCase, SQLTestUtils):
[spark] branch master updated: [SPARK-39877][PYTHON] Add unpivot to PySpark DataFrame API
This is an automated email from the ASF dual-hosted git repository. ruifengz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 8585fafab86 [SPARK-39877][PYTHON] Add unpivot to PySpark DataFrame API 8585fafab86 is described below commit 8585fafab8633c02e6f1b989acd2bbdb0eb1678e Author: Enrico Minack AuthorDate: Mon Aug 1 09:39:12 2022 +0800 [SPARK-39877][PYTHON] Add unpivot to PySpark DataFrame API ### What changes were proposed in this pull request? This adds `unpivot` and its alias `melt` to the PySpark API. It calls into Scala `Dataset.unpivot` (#36150). Small difference to Scala method signature is that PySpark method has default values. This is similar to `melt` in Spark Pandas API. ### Why are the changes needed? To support `unpivot` in Python. ### Does this PR introduce _any_ user-facing change? Yes, adds `DataFrame.unpivot` and `DataFrame.melt` to PySpark API. ### How was this patch tested? Added test to `test_dataframe.py`. Closes #37304 from EnricoMi/branch-pyspark-unpivot. Authored-by: Enrico Minack Signed-off-by: Ruifeng Zheng --- python/pyspark/sql/dataframe.py| 134 +++ python/pyspark/sql/tests/test_dataframe.py | 144 + .../main/scala/org/apache/spark/sql/Dataset.scala | 11 ++ 3 files changed, 289 insertions(+) diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py index 481dafa310d..8c9632fe766 100644 --- a/python/pyspark/sql/dataframe.py +++ b/python/pyspark/sql/dataframe.py @@ -2238,6 +2238,140 @@ class DataFrame(PandasMapOpsMixin, PandasConversionMixin): return GroupedData(jgd, self) +def unpivot( +self, +ids: Optional[Union["ColumnOrName", List["ColumnOrName"], Tuple["ColumnOrName", ...]]], +values: Optional[Union["ColumnOrName", List["ColumnOrName"], Tuple["ColumnOrName", ...]]], +variableColumnName: str, +valueColumnName: str, +) -> "DataFrame": +""" +Unpivot a DataFrame from wide format to long format, optionally leaving +identifier columns set. This is the reverse to `groupBy(...).pivot(...).agg(...)`, +except for the aggregation, which cannot be reversed. + +This function is useful to massage a DataFrame into a format where some +columns are identifier columns ("ids"), while all other columns ("values") +are "unpivoted" to the rows, leaving just two non-id columns, named as given +by `variableColumnName` and `valueColumnName`. + +When no "id" columns are given, the unpivoted DataFrame consists of only the +"variable" and "value" columns. + +All "value" columns must share a least common data type. Unless they are the same data type, +all "value" columns are cast to the nearest common data type. For instance, types +`IntegerType` and `LongType` are cast to `LongType`, while `IntegerType` and `StringType` +do not have a common data type and `unpivot` fails. + +:func:`groupby` is an alias for :func:`groupBy`. + +.. versionadded:: 3.4.0 + +Parameters +-- +ids : str, Column, tuple, list, optional +Column(s) to use as identifiers. Can be a single column or column name, +or a list or tuple for multiple columns. +values : str, Column, tuple, list, optional +Column(s) to unpivot. Can be a single column or column name, or a list or tuple +for multiple columns. If not specified or empty, uses all columns that +are not set as `ids`. +variableColumnName : str +Name of the variable column. +valueColumnName : str +Name of the value column. + +Returns +--- +DataFrame +Unpivoted DataFrame. + +Examples + +>>> df = spark.createDataFrame( +... [(1, 11, 1.1), (2, 12, 1.2)], +... ["id", "int", "double"], +... ) +>>> df.show() ++---+---+--+ +| id|int|double| ++---+---+--+ +| 1| 11| 1.1| +| 2| 12| 1.2| ++---+---+--+ + +>>> df.unpivot("id", ["int", "double"], "var", "val").show() ++---+--++ +| id| var| val| ++---+--++ +| 1| int|11.0| +| 1|double| 1.1| +| 2| int|12.0| +| 2|double| 1.2| ++---+--++ +""&
[spark] branch master updated: [SPARK-39823][SQL][PYTHON] Rename Dataset.as as Dataset.to and add DataFrame.to in PySpark
This is an automated email from the ASF dual-hosted git repository. ruifengz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 91b95056806 [SPARK-39823][SQL][PYTHON] Rename Dataset.as as Dataset.to and add DataFrame.to in PySpark 91b95056806 is described below commit 91b950568066830ecd7a4581ab5bf4dbdbbeb474 Author: Ruifeng Zheng AuthorDate: Wed Jul 27 08:11:18 2022 +0800 [SPARK-39823][SQL][PYTHON] Rename Dataset.as as Dataset.to and add DataFrame.to in PySpark ### What changes were proposed in this pull request? 1, rename `Dataset.as(StructType)` to `Dataset.to(StructType)`, since `as` is a keyword in python, we dont want to use a different name; 2, Add `DataFrame.to(StructType)` in Python ### Why are the changes needed? for function parity ### Does this PR introduce _any_ user-facing change? yes, new api ### How was this patch tested? added UT Closes #37233 from zhengruifeng/py_ds_as. Authored-by: Ruifeng Zheng Signed-off-by: Ruifeng Zheng --- .../source/reference/pyspark.sql/dataframe.rst | 1 + python/pyspark/sql/dataframe.py| 50 + python/pyspark/sql/tests/test_dataframe.py | 36 ++- .../main/scala/org/apache/spark/sql/Dataset.scala | 4 +- ...emaSuite.scala => DataFrameToSchemaSuite.scala} | 52 +++--- 5 files changed, 114 insertions(+), 29 deletions(-) diff --git a/python/docs/source/reference/pyspark.sql/dataframe.rst b/python/docs/source/reference/pyspark.sql/dataframe.rst index 5b6e704ba48..8cf083e5dd4 100644 --- a/python/docs/source/reference/pyspark.sql/dataframe.rst +++ b/python/docs/source/reference/pyspark.sql/dataframe.rst @@ -102,6 +102,7 @@ DataFrame DataFrame.summary DataFrame.tail DataFrame.take +DataFrame.to DataFrame.toDF DataFrame.toJSON DataFrame.toLocalIterator diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py index efebd05c08d..481dafa310d 100644 --- a/python/pyspark/sql/dataframe.py +++ b/python/pyspark/sql/dataframe.py @@ -1422,6 +1422,56 @@ class DataFrame(PandasMapOpsMixin, PandasConversionMixin): jc = self._jdf.colRegex(colName) return Column(jc) +def to(self, schema: StructType) -> "DataFrame": +""" +Returns a new :class:`DataFrame` where each row is reconciled to match the specified +schema. + +Notes +- +1, Reorder columns and/or inner fields by name to match the specified schema. + +2, Project away columns and/or inner fields that are not needed by the specified schema. +Missing columns and/or inner fields (present in the specified schema but not input +DataFrame) lead to failures. + +3, Cast the columns and/or inner fields to match the data types in the specified schema, +if the types are compatible, e.g., numeric to numeric (error if overflows), but not string +to int. + +4, Carry over the metadata from the specified schema, while the columns and/or inner fields +still keep their own metadata if not overwritten by the specified schema. + +5, Fail if the nullability is not compatible. For example, the column and/or inner field +is nullable but the specified schema requires them to be not nullable. + +.. versionadded:: 3.4.0 + +Parameters +-- +schema : :class:`StructType` +Specified schema. + +Examples + +>>> df = spark.createDataFrame([("a", 1)], ["i", "j"]) +>>> df.schema +StructType([StructField('i', StringType(), True), StructField('j', LongType(), True)]) +>>> schema = StructType([StructField("j", StringType()), StructField("i", StringType())]) +>>> df2 = df.to(schema) +>>> df2.schema +StructType([StructField('j', StringType(), True), StructField('i', StringType(), True)]) +>>> df2.show() ++---+---+ +| j| i| ++---+---+ +| 1| a| ++---+---+ +""" +assert schema is not None +jschema = self._jdf.sparkSession().parseDataType(schema.json()) +return DataFrame(self._jdf.to(jschema), self.sparkSession) + def alias(self, alias: str) -> "DataFrame": """Returns a new :class:`DataFrame` with an alias set. diff --git a/python/pyspark/sql/tests/test_dataframe.py b/python/pyspark/sql/tests/test_dataframe.py index ac6b6f68aed..7c7d3d1e51c 100644 --- a/python/pyspark/sql/tests/test_dataframe.py +++ b/python/pyspark/sql/tests/test_datafram
[spark] branch master updated: [SPARK-39760][PYTHON] Support Varchar in PySpark
This is an automated email from the ASF dual-hosted git repository. ruifengz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 08808fb5079 [SPARK-39760][PYTHON] Support Varchar in PySpark 08808fb5079 is described below commit 08808fb507947b51ea7656496612a81e11fe66bd Author: Ruifeng Zheng AuthorDate: Mon Jul 18 15:55:55 2022 +0800 [SPARK-39760][PYTHON] Support Varchar in PySpark ### What changes were proposed in this pull request? Support Varchar in PySpark ### Why are the changes needed? function parity ### Does this PR introduce _any_ user-facing change? yes, new datatype supported ### How was this patch tested? 1, added UT; 2, manually check against the scala side: ```python In [1]: from pyspark.sql.types import * ...: from pyspark.sql.functions import * ...: ...: df = spark.createDataFrame([(1,), (11,)], ["value"]) ...: ret = df.select(col("value").cast(VarcharType(10))).collect() ...: 22/07/13 17:17:07 WARN CharVarcharUtils: The Spark cast operator does not support char/varchar type and simply treats them as string type. Please use string type directly to avoid confusion. Otherwise, you can set spark.sql.legacy.charVarcharAsString to true, so that Spark treat them as string type as same as Spark 3.0 and earlier In [2]: In [2]: schema = StructType([StructField("a", IntegerType(), True), (StructField("v", VarcharType(10), True))]) ...: description = "this a table created via Catalog.createTable()" ...: table = spark.catalog.createTable("tab3_via_catalog", schema=schema, description=description) ...: table.schema ...: Out[2]: StructType([StructField('a', IntegerType(), True), StructField('v', StringType(), True)]) ``` ```scala scala> import org.apache.spark.sql.types._ import org.apache.spark.sql.types._ scala> import org.apache.spark.sql.functions._ import org.apache.spark.sql.functions._ scala> val df = spark.range(0, 10).selectExpr(" id AS value") df: org.apache.spark.sql.DataFrame = [value: bigint] scala> val ret = df.select(col("value").cast(VarcharType(10))).collect() 22/07/13 17:28:56 WARN CharVarcharUtils: The Spark cast operator does not support char/varchar type and simply treats them as string type. Please use string type directly to avoid confusion. Otherwise, you can set spark.sql.legacy.charVarcharAsString to true, so that Spark treat them as string type as same as Spark 3.0 and earlier ret: Array[org.apache.spark.sql.Row] = Array([0], [1], [2], [3], [4], [5], [6], [7], [8], [9]) scala> scala> val schema = StructType(StructField("a", IntegerType, true) :: (StructField("v", VarcharType(10), true) :: Nil)) schema: org.apache.spark.sql.types.StructType = StructType(StructField(a,IntegerType,true),StructField(v,VarcharType(10),true)) scala> val description = "this a table created via Catalog.createTable()" description: String = this a table created via Catalog.createTable() scala> val table = spark.catalog.createTable("tab3_via_catalog", source="json", schema=schema, description=description, options=Map.empty[String, String]) table: org.apache.spark.sql.DataFrame = [a: int, v: string] scala> table.schema res0: org.apache.spark.sql.types.StructType = StructType(StructField(a,IntegerType,true),StructField(v,StringType,true)) ``` Closes #37173 from zhengruifeng/py_add_varchar. Authored-by: Ruifeng Zheng Signed-off-by: Ruifeng Zheng --- .../source/reference/pyspark.sql/data_types.rst| 1 + python/pyspark/sql/tests/test_types.py | 26 +++- python/pyspark/sql/types.py| 46 -- 3 files changed, 68 insertions(+), 5 deletions(-) diff --git a/python/docs/source/reference/pyspark.sql/data_types.rst b/python/docs/source/reference/pyspark.sql/data_types.rst index d146c640477..775f0bf394a 100644 --- a/python/docs/source/reference/pyspark.sql/data_types.rst +++ b/python/docs/source/reference/pyspark.sql/data_types.rst @@ -40,6 +40,7 @@ Data Types NullType ShortType StringType +VarcharType StructField StructType TimestampType diff --git a/python/pyspark/sql/tests/test_types.py b/python/pyspark/sql/tests/test_types.py index ef0ad82dbb9..218cfc413db 100644 --- a/python/pyspark/sql/tests/test_types.py +++ b/python/pyspark/sql/tests/test_types.py @@ -38,6 +38,7 @@ from pyspark.sql.types import ( DayTimeIntervalType, MapType, StringType, +VarcharType, Str
[spark] branch master updated: [SPARK-39723][R] Implement functionExists/getFunc in SparkR for 3L namespace
This is an automated email from the ASF dual-hosted git repository. ruifengz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new aa51da42908 [SPARK-39723][R] Implement functionExists/getFunc in SparkR for 3L namespace aa51da42908 is described below commit aa51da4290814bf3ccdc52000b8d90d6db575d3f Author: Ruifeng Zheng AuthorDate: Tue Jul 12 11:05:25 2022 +0800 [SPARK-39723][R] Implement functionExists/getFunc in SparkR for 3L namespace ### What changes were proposed in this pull request? 1, implement functionExists/getFunc in SparkR 2, update doc of ListFunctions ### Why are the changes needed? for 3L namespace ### Does this PR introduce _any_ user-facing change? yes, new API functionExists ### How was this patch tested? added UT Closes #37135 from zhengruifeng/r_3L_func. Lead-authored-by: Ruifeng Zheng Co-authored-by: Ruifeng Zheng Signed-off-by: Ruifeng Zheng --- R/pkg/NAMESPACE | 2 + R/pkg/R/catalog.R | 75 ++- R/pkg/pkgdown/_pkgdown_template.yml | 2 + R/pkg/tests/fulltests/test_sparkSQL.R | 34 +++- 4 files changed, 111 insertions(+), 2 deletions(-) diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE index 3937791421a..e078ba0c2cd 100644 --- a/R/pkg/NAMESPACE +++ b/R/pkg/NAMESPACE @@ -479,7 +479,9 @@ export("as.DataFrame", "databaseExists", "dropTempTable", "dropTempView", + "functionExists", "getDatabase", + "getFunc", "getTable", "listCatalogs", "listColumns", diff --git a/R/pkg/R/catalog.R b/R/pkg/R/catalog.R index 680415ea6cd..942af4de3c0 100644 --- a/R/pkg/R/catalog.R +++ b/R/pkg/R/catalog.R @@ -583,13 +583,14 @@ listColumns <- function(tableName, databaseName = NULL) { #' This includes all temporary functions. #' #' @param databaseName (optional) name of the database +#' The database name can be qualified with catalog name since 3.4.0. #' @return a SparkDataFrame of the list of function descriptions. #' @rdname listFunctions #' @name listFunctions #' @examples #' \dontrun{ #' sparkR.session() -#' listFunctions() +#' listFunctions(spark_catalog.default) #' } #' @note since 2.2.0 listFunctions <- function(databaseName = NULL) { @@ -606,6 +607,78 @@ listFunctions <- function(databaseName = NULL) { dataFrame(callJMethod(jdst, "toDF")) } +#' Checks if the function with the specified name exists. +#' +#' Checks if the function with the specified name exists. +#' +#' @param functionName name of the function, allowed to be qualified with catalog name +#' @rdname functionExists +#' @name functionExists +#' @examples +#' \dontrun{ +#' sparkR.session() +#' functionExists("spark_catalog.default.myFunc") +#' } +#' @note since 3.4.0 +functionExists <- function(functionName) { + sparkSession <- getSparkSession() + if (class(functionName) != "character") { +stop("functionName must be a string.") + } + catalog <- callJMethod(sparkSession, "catalog") + callJMethod(catalog, "functionExists", functionName) +} + +#' Get the function with the specified name +#' +#' Get the function with the specified name +#' +#' @param functionName name of the function, allowed to be qualified with catalog name +#' @return A named list. +#' @rdname getFunc +#' @name getFunc +#' @examples +#' \dontrun{ +#' sparkR.session() +#' func <- getFunc("spark_catalog.default.myFunc") +#' } +#' @note since 3.4.0. Use different name with the scala/python side, to avoid the +#' signature conflict with built-in "getFunction". +getFunc <- function(functionName) { + sparkSession <- getSparkSession() + if (class(functionName) != "character") { +stop("functionName must be a string.") + } + catalog <- callJMethod(sparkSession, "catalog") + jfunc <- handledCallJMethod(catalog, "getFunction", functionName) + + ret <- list(name = callJMethod(jfunc, "name")) + jcata <- callJMethod(jfunc, "catalog") + if (is.null(jcata)) { +ret$catalog <- NA + } else { +ret$catalog <- jcata + } + + jns <- callJMethod(jfunc, "namespace") + if (is.null(jns)) { +ret$namespace <- NA + } else { +ret$namespace <- jns + } + + jdesc <- callJMethod(jfunc, "description") + if (is.null(jdesc)) { +ret$description <- NA + } else { +ret$description <- jdesc + } + + ret$className <- callJMethod(jfunc, "className") + ret$isTemporary <- callJMethod(jfunc, "isTemporary"
[spark] branch master updated: [SPARK-39719][R] Implement databaseExists/getDatabase in SparkR support 3L namespace
This is an automated email from the ASF dual-hosted git repository. ruifengz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new d5e9c5801cb [SPARK-39719][R] Implement databaseExists/getDatabase in SparkR support 3L namespace d5e9c5801cb is described below commit d5e9c5801cb1d0c8cb545b679261bd94b5ae0280 Author: Ruifeng Zheng AuthorDate: Mon Jul 11 14:23:12 2022 +0800 [SPARK-39719][R] Implement databaseExists/getDatabase in SparkR support 3L namespace ### What changes were proposed in this pull request? 1, add `databaseExists`/`getDatabase` 2, make sure `listTables` support 3L namespace 3, modify sparkR-specific catalog method `tables` and `tableNames` to support 3L namespace ### Why are the changes needed? to support 3L namespace in SparkR ### Does this PR introduce _any_ user-facing change? Yes ### How was this patch tested? updated UT and manual check Closes #37132 from zhengruifeng/r_3L_dbname. Authored-by: Ruifeng Zheng Signed-off-by: Ruifeng Zheng --- R/pkg/NAMESPACE| 2 + R/pkg/R/catalog.R | 72 +- R/pkg/pkgdown/_pkgdown_template.yml| 2 + R/pkg/tests/fulltests/test_sparkSQL.R | 34 +- .../org/apache/spark/sql/api/r/SQLUtils.scala | 2 +- 5 files changed, 107 insertions(+), 5 deletions(-) diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE index f5f60ecf134..3937791421a 100644 --- a/R/pkg/NAMESPACE +++ b/R/pkg/NAMESPACE @@ -476,8 +476,10 @@ export("as.DataFrame", "createTable", "currentCatalog", "currentDatabase", + "databaseExists", "dropTempTable", "dropTempView", + "getDatabase", "getTable", "listCatalogs", "listColumns", diff --git a/R/pkg/R/catalog.R b/R/pkg/R/catalog.R index 8237ac26b33..680415ea6cd 100644 --- a/R/pkg/R/catalog.R +++ b/R/pkg/R/catalog.R @@ -278,13 +278,14 @@ dropTempView <- function(viewName) { #' Returns a SparkDataFrame containing names of tables in the given database. #' #' @param databaseName (optional) name of the database +#' The database name can be qualified with catalog name since 3.4.0. #' @return a SparkDataFrame #' @rdname tables #' @seealso \link{listTables} #' @examples #'\dontrun{ #' sparkR.session() -#' tables("hive") +#' tables("spark_catalog.hive") #' } #' @name tables #' @note tables since 1.4.0 @@ -298,12 +299,13 @@ tables <- function(databaseName = NULL) { #' Returns the names of tables in the given database as an array. #' #' @param databaseName (optional) name of the database +#' The database name can be qualified with catalog name since 3.4.0. #' @return a list of table names #' @rdname tableNames #' @examples #'\dontrun{ #' sparkR.session() -#' tableNames("hive") +#' tableNames("spark_catalog.hive") #' } #' @name tableNames #' @note tableNames since 1.4.0 @@ -356,6 +358,28 @@ setCurrentDatabase <- function(databaseName) { invisible(handledCallJMethod(catalog, "setCurrentDatabase", databaseName)) } +#' Checks if the database with the specified name exists. +#' +#' Checks if the database with the specified name exists. +#' +#' @param databaseName name of the database, allowed to be qualified with catalog name +#' @rdname databaseExists +#' @name databaseExists +#' @examples +#' \dontrun{ +#' sparkR.session() +#' databaseExists("spark_catalog.default") +#' } +#' @note since 3.4.0 +databaseExists <- function(databaseName) { + sparkSession <- getSparkSession() + if (class(databaseName) != "character") { +stop("databaseName must be a string.") + } + catalog <- callJMethod(sparkSession, "catalog") + callJMethod(catalog, "databaseExists", databaseName) +} + #' Returns a list of databases available #' #' Returns a list of databases available. @@ -375,12 +399,54 @@ listDatabases <- function() { dataFrame(callJMethod(callJMethod(catalog, "listDatabases"), "toDF")) } +#' Get the database with the specified name +#' +#' Get the database with the specified name +#' +#' @param databaseName name of the database, allowed to be qualified with catalog name +#' @return A named list. +#' @rdname getDatabase +#' @name getDatabase +#' @examples +#' \dontrun{ +#' sparkR.session() +#' db <- getDatabase("default") +#' } +#' @note since 3.4.0 +getDatabase <- function(databaseName) { + sparkSession <- getSparkSession() + if (class(databaseName) != "character") { +stop("databaseName must
[spark] branch master updated: [SPARK-39720][R] Implement tableExists/getTable in SparkR for 3L namespace
This is an automated email from the ASF dual-hosted git repository. ruifengz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new c2f8f555dd7 [SPARK-39720][R] Implement tableExists/getTable in SparkR for 3L namespace c2f8f555dd7 is described below commit c2f8f555dd7a067625455b66c207cbfd113a8e6e Author: Ruifeng Zheng AuthorDate: Mon Jul 11 12:02:00 2022 +0800 [SPARK-39720][R] Implement tableExists/getTable in SparkR for 3L namespace ### What changes were proposed in this pull request? 1, Implement tableExists/getTable 2, Update the documents of createTable/cacheTable/uncacheTable/refreshTable/recoverPartitions/listColumns ### Why are the changes needed? for 3L namespace ### Does this PR introduce _any_ user-facing change? yes, new method `tableExists` ### How was this patch tested? updated UT Closes #37133 from zhengruifeng/r_3L_tblname. Authored-by: Ruifeng Zheng Signed-off-by: Ruifeng Zheng --- R/pkg/NAMESPACE | 2 + R/pkg/R/catalog.R | 87 +-- R/pkg/pkgdown/_pkgdown_template.yml | 2 + R/pkg/tests/fulltests/test_sparkSQL.R | 46 ++ 4 files changed, 125 insertions(+), 12 deletions(-) diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE index 570f721ab41..f5f60ecf134 100644 --- a/R/pkg/NAMESPACE +++ b/R/pkg/NAMESPACE @@ -478,6 +478,7 @@ export("as.DataFrame", "currentDatabase", "dropTempTable", "dropTempView", + "getTable", "listCatalogs", "listColumns", "listDatabases", @@ -503,6 +504,7 @@ export("as.DataFrame", "spark.getSparkFiles", "sql", "str", + "tableExists", "tableToDF", "tableNames", "tables", diff --git a/R/pkg/R/catalog.R b/R/pkg/R/catalog.R index b10f73fb340..8237ac26b33 100644 --- a/R/pkg/R/catalog.R +++ b/R/pkg/R/catalog.R @@ -118,6 +118,7 @@ createExternalTable <- function(tableName, path = NULL, source = NULL, schema = #' #' @param tableName the qualified or unqualified name that designates a table. If no database #' identifier is provided, it refers to a table in the current database. +#' The table name can be fully qualified with catalog name since 3.4.0. #' @param path (optional) the path of files to load. #' @param source (optional) the name of the data source. #' @param schema (optional) the schema of the data required for some data sources. @@ -129,7 +130,7 @@ createExternalTable <- function(tableName, path = NULL, source = NULL, schema = #' sparkR.session() #' df <- createTable("myjson", path="path/to/json", source="json", schema) #' -#' createTable("people", source = "json", schema = schema) +#' createTable("spark_catalog.default.people", source = "json", schema = schema) #' insertInto(df, "people") #' } #' @name createTable @@ -160,6 +161,7 @@ createTable <- function(tableName, path = NULL, source = NULL, schema = NULL, .. #' #' @param tableName the qualified or unqualified name that designates a table. If no database #' identifier is provided, it refers to a table in the current database. +#' The table name can be fully qualified with catalog name since 3.4.0. #' @return SparkDataFrame #' @rdname cacheTable #' @examples @@ -184,6 +186,7 @@ cacheTable <- function(tableName) { #' #' @param tableName the qualified or unqualified name that designates a table. If no database #' identifier is provided, it refers to a table in the current database. +#' The table name can be fully qualified with catalog name since 3.4.0. #' @return SparkDataFrame #' @rdname uncacheTable #' @examples @@ -403,6 +406,78 @@ listTables <- function(databaseName = NULL) { dataFrame(callJMethod(jdst, "toDF")) } +#' Checks if the table with the specified name exists. +#' +#' Checks if the table with the specified name exists. +#' +#' @param tableName name of the table, allowed to be qualified with catalog name +#' @rdname tableExists +#' @name tableExists +#' @examples +#' \dontrun{ +#' sparkR.session() +#' databaseExists("spark_catalog.default.myTable") +#' } +#' @note since 3.4.0 +tableExists <- function(tableName) { + sparkSession <- getSparkSession() + if (class(tableName) != "character") { +stop("tableName must be a string.") + } + catalog <- callJMethod(sparkSession, "catalog") + callJMethod(catalog, "tableExists", tableName) +} + +
[spark] branch master updated: [SPARK-39716][R] Make currentDatabase/setCurrentDatabase/listCatalogs in SparkR support 3L namespace
This is an automated email from the ASF dual-hosted git repository. ruifengz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 14ddcfcc44f [SPARK-39716][R] Make currentDatabase/setCurrentDatabase/listCatalogs in SparkR support 3L namespace 14ddcfcc44f is described below commit 14ddcfcc44fa28b7473092a9567230022c4b01c7 Author: Ruifeng Zheng AuthorDate: Fri Jul 8 18:06:53 2022 +0800 [SPARK-39716][R] Make currentDatabase/setCurrentDatabase/listCatalogs in SparkR support 3L namespace ### What changes were proposed in this pull request? add currentDatabase/setCurrentDatabase/listCatalogs ### Why are the changes needed? to support 3L namespace in SparkR ### Does this PR introduce _any_ user-facing change? yes, new API added ### How was this patch tested? added UT Closes #37127 from zhengruifeng/r_3L_catalog. Authored-by: Ruifeng Zheng Signed-off-by: Ruifeng Zheng --- R/pkg/NAMESPACE | 3 ++ R/pkg/R/catalog.R | 60 +++ R/pkg/pkgdown/_pkgdown_template.yml | 5 ++- R/pkg/tests/fulltests/test_sparkSQL.R | 11 +++ 4 files changed, 78 insertions(+), 1 deletion(-) diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE index 6e0557cff88..570f721ab41 100644 --- a/R/pkg/NAMESPACE +++ b/R/pkg/NAMESPACE @@ -474,9 +474,11 @@ export("as.DataFrame", "createDataFrame", "createExternalTable", "createTable", + "currentCatalog", "currentDatabase", "dropTempTable", "dropTempView", + "listCatalogs", "listColumns", "listDatabases", "listFunctions", @@ -493,6 +495,7 @@ export("as.DataFrame", "refreshByPath", "refreshTable", "setCheckpointDir", + "setCurrentCatalog", "setCurrentDatabase", "spark.lapply", "spark.addFile", diff --git a/R/pkg/R/catalog.R b/R/pkg/R/catalog.R index 275737f804b..b10f73fb340 100644 --- a/R/pkg/R/catalog.R +++ b/R/pkg/R/catalog.R @@ -17,6 +17,66 @@ # catalog.R: SparkSession catalog functions +#' Returns the current default catalog +#' +#' Returns the current default catalog. +#' +#' @return name of the current default catalog. +#' @rdname currentCatalog +#' @name currentCatalog +#' @examples +#' \dontrun{ +#' sparkR.session() +#' currentCatalog() +#' } +#' @note since 3.4.0 +currentCatalog <- function() { + sparkSession <- getSparkSession() + catalog <- callJMethod(sparkSession, "catalog") + callJMethod(catalog, "currentCatalog") +} + +#' Sets the current default catalog +#' +#' Sets the current default catalog. +#' +#' @param catalogName name of the catalog +#' @rdname setCurrentCatalog +#' @name setCurrentCatalog +#' @examples +#' \dontrun{ +#' sparkR.session() +#' setCurrentCatalog("spark_catalog") +#' } +#' @note since 3.4.0 +setCurrentCatalog <- function(catalogName) { + sparkSession <- getSparkSession() + if (class(catalogName) != "character") { +stop("catalogName must be a string.") + } + catalog <- callJMethod(sparkSession, "catalog") + invisible(handledCallJMethod(catalog, "setCurrentCatalog", catalogName)) +} + +#' Returns a list of catalog available +#' +#' Returns a list of catalog available. +#' +#' @return a SparkDataFrame of the list of catalog. +#' @rdname listCatalogs +#' @name listCatalogs +#' @examples +#' \dontrun{ +#' sparkR.session() +#' listCatalogs() +#' } +#' @note since 3.4.0 +listCatalogs <- function() { + sparkSession <- getSparkSession() + catalog <- callJMethod(sparkSession, "catalog") + dataFrame(callJMethod(callJMethod(catalog, "listCatalogs"), "toDF")) +} + #' (Deprecated) Create an external table #' #' Creates an external table based on the dataset in a data source, diff --git a/R/pkg/pkgdown/_pkgdown_template.yml b/R/pkg/pkgdown/_pkgdown_template.yml index eeb676befbc..d487b51ec5d 100644 --- a/R/pkg/pkgdown/_pkgdown_template.yml +++ b/R/pkg/pkgdown/_pkgdown_template.yml @@ -261,9 +261,11 @@ reference: - title: "SQL Catalog" - contents: + - currentCatalog - currentDatabase - dropTempTable - dropTempView + - listCatalogs - listColumns - listDatabases - listFunctions @@ -271,6 +273,8 @@ reference: - refreshByPath - refreshTable - recoverPartitions + - setCurrentCatalog + - setCurrentDatabase - tableNames - tables - uncacheTable @@ -283,7 +287,6 @@ reference: - getLocalProperty - install.spark - setCheckpointDir - - setCurrentDatabase - setJobDe
[spark] branch master updated: [SPARK-39579][PYTHON][FOLLOWUP] fix functionExists(functionName, dbName) when dbName is not None
This is an automated email from the ASF dual-hosted git repository. ruifengz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 1100d75f53c [SPARK-39579][PYTHON][FOLLOWUP] fix functionExists(functionName, dbName) when dbName is not None 1100d75f53c is described below commit 1100d75f53c16f44dd414b8a0be477760420507d Author: Ruifeng Zheng AuthorDate: Tue Jul 5 19:53:13 2022 +0800 [SPARK-39579][PYTHON][FOLLOWUP] fix functionExists(functionName, dbName) when dbName is not None ### What changes were proposed in this pull request? fix functionExists(functionName, dbName) ### Why are the changes needed? https://github.com/apache/spark/pull/36977 introduce a bug in `functionExists(functionName, dbName)`, when dbName is not None, should call `self._jcatalog.functionExists(dbName, functionName)` ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? existing testsuite Closes #37088 from zhengruifeng/py_3l_fix_functionExists. Authored-by: Ruifeng Zheng Signed-off-by: Ruifeng Zheng --- python/pyspark/sql/catalog.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/pyspark/sql/catalog.py b/python/pyspark/sql/catalog.py index 42c040c284b..7efaf14eb82 100644 --- a/python/pyspark/sql/catalog.py +++ b/python/pyspark/sql/catalog.py @@ -359,7 +359,7 @@ class Catalog: "a future version. Use functionExists(`dbName.tableName`) instead.", FutureWarning, ) -return self._jcatalog.functionExists(self.currentDatabase(), functionName) +return self._jcatalog.functionExists(dbName, functionName) def getFunction(self, functionName: str) -> Function: """Get the function with the specified name. This function can be a temporary function or a - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-39646][SQL] Make setCurrentDatabase compatible with 3 layer namespace
This is an automated email from the ASF dual-hosted git repository. ruifengz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new cbb4e7da692 [SPARK-39646][SQL] Make setCurrentDatabase compatible with 3 layer namespace cbb4e7da692 is described below commit cbb4e7da6924f62a6b5272a5684faac9f132fcfd Author: Zach Schuermann AuthorDate: Sat Jul 2 08:26:40 2022 +0800 [SPARK-39646][SQL] Make setCurrentDatabase compatible with 3 layer namespace ### What changes were proposed in this pull request? Change `setCurrentDatabase` catalog API to support 3 layer namespace. We use `sparkSession.sessionState.catalogManager.currentNamespace` for the currentDatabase now. ### Why are the changes needed? `setCurrentDatabase` doesn't support 3 layer namespace. ### Does this PR introduce _any_ user-facing change? Yes. This PR introduces a backwards-compatible API change to support 3 layer namespace (e.g. catalog.database.table) for `setCurrentDatabase`. ### How was this patch tested? UT Closes #36969 from schuermannator/3l-setCurrentDatabse. Authored-by: Zach Schuermann Signed-off-by: Ruifeng Zheng --- R/pkg/tests/fulltests/test_sparkSQL.R | 4 +-- .../apache/spark/sql/internal/CatalogImpl.scala| 9 --- .../apache/spark/sql/internal/CatalogSuite.scala | 29 ++ 3 files changed, 37 insertions(+), 5 deletions(-) diff --git a/R/pkg/tests/fulltests/test_sparkSQL.R b/R/pkg/tests/fulltests/test_sparkSQL.R index 2acb7a9ceba..0f984d0022a 100644 --- a/R/pkg/tests/fulltests/test_sparkSQL.R +++ b/R/pkg/tests/fulltests/test_sparkSQL.R @@ -4015,8 +4015,8 @@ test_that("catalog APIs, currentDatabase, setCurrentDatabase, listDatabases", { expect_equal(currentDatabase(), "default") expect_error(setCurrentDatabase("default"), NA) expect_error(setCurrentDatabase("zxwtyswklpf"), - paste0("Error in setCurrentDatabase : analysis error - Database ", - "'zxwtyswklpf' does not exist")) + paste0("Error in setCurrentDatabase : no such database - Database ", + "'zxwtyswklpf' not found")) dbs <- collect(listDatabases()) expect_equal(names(dbs), c("name", "catalog", "description", "locationUri")) expect_equal(which(dbs[, 1] == "default"), 1) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala index 742ca5ccb1e..c276fbb677c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala @@ -59,15 +59,18 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog { /** * Returns the current default database in this session. */ - override def currentDatabase: String = sessionCatalog.getCurrentDatabase + override def currentDatabase: String = +sparkSession.sessionState.catalogManager.currentNamespace.toSeq.quoted /** * Sets the current default database in this session. */ @throws[AnalysisException]("database does not exist") override def setCurrentDatabase(dbName: String): Unit = { -requireDatabaseExists(dbName) -sessionCatalog.setCurrentDatabase(dbName) +// we assume dbName will not include the catalog prefix. e.g. if you call +// setCurrentDatabase("catalog.db") it will search for a database catalog.db in the catalog. +val ident = sparkSession.sessionState.sqlParser.parseMultipartIdentifier(dbName) +sparkSession.sessionState.catalogManager.setCurrentNamespace(ident.toArray) } /** diff --git a/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala index 6e6138c91dd..7e4933b3407 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala @@ -866,4 +866,33 @@ class CatalogSuite extends SharedSparkSession with AnalysisTest with BeforeAndAf sql(s"CREATE NAMESPACE $qualified") assert(spark.catalog.getDatabase(qualified).name === db) } + + test("three layer namespace compatibility - set current database") { +spark.catalog.setCurrentCatalog("testcat") +// namespace with the same name as catalog +sql("CREATE NAMESPACE testcat.testcat.my_db") +spark.catalog.setCurrentDatabase("testcat.my_db") +assert(spark.catalog.currentDatabase == "testcat.my_db") +// sessionCatalog still reports 'default' as curr
[spark] branch master updated (309d5882bdc -> ed1a3402d2d)
This is an automated email from the ASF dual-hosted git repository. ruifengz pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git from 309d5882bdc [MINOR] Make the 'sep' description better in read_csv of pyspark p… add ed1a3402d2d [SPARK-39598][PYTHON] Make *cache*, *catalog* in the python side support 3-layer-namespace No new revisions were added by this update. Summary of changes: python/pyspark/sql/catalog.py| 59 ++-- python/pyspark/sql/tests/test_catalog.py | 15 2 files changed, 71 insertions(+), 3 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark-website] branch asf-site updated: Organization update (#401)
This is an automated email from the ASF dual-hosted git repository. ruifengz pushed a commit to branch asf-site in repository https://gitbox.apache.org/repos/asf/spark-website.git The following commit(s) were added to refs/heads/asf-site by this push: new c2ed369e7 Organization update (#401) c2ed369e7 is described below commit c2ed369e77657f17e6617199b9316ba198981312 Author: Ruifeng Zheng AuthorDate: Wed Jun 29 20:33:46 2022 +0800 Organization update (#401) * hello * update committers.html --- committers.md| 2 +- site/committers.html | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/committers.md b/committers.md index 25cd9d8de..50e0ae354 100644 --- a/committers.md +++ b/committers.md @@ -93,7 +93,7 @@ navigation: |Kent Yao|NetEase| |Burak Yavuz|Databricks| |Matei Zaharia|Databricks, Stanford| -|Ruifeng Zheng|JD.com| +|Ruifeng Zheng|Databricks| |Shixiong Zhu|Databricks| Becoming a committer diff --git a/site/committers.html b/site/committers.html index acb8024dd..aad0d3c93 100644 --- a/site/committers.html +++ b/site/committers.html @@ -466,7 +466,7 @@ Ruifeng Zheng - JD.com + Databricks Shixiong Zhu - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-39446][MLLIB][FOLLOWUP] Modify ranking metrics for java and python
This is an automated email from the ASF dual-hosted git repository. ruifengz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new feae21c6445 [SPARK-39446][MLLIB][FOLLOWUP] Modify ranking metrics for java and python feae21c6445 is described below commit feae21c6445f8767bf5f62bb54f6c61a8df4e0c1 Author: uch AuthorDate: Wed Jun 29 17:36:48 2022 +0800 [SPARK-39446][MLLIB][FOLLOWUP] Modify ranking metrics for java and python ### What changes were proposed in this pull request? - Updated `RankingMetrics` for Java and Python - Modified the interface for Java and Python - Added test for Java ### Why are the changes needed? - To expose the change in https://github.com/apache/spark/pull/36843 to Java and Python. - To update the document for Java and Python. ### Does this PR introduce _any_ user-facing change? - Java users can use a JavaRDD of (predicted ranking, ground truth set, relevance value of ground truth set) for `RankingMetrics` ### How was this patch tested? - Added test for Java Closes #37019 from uch/modify_ranking_metrics_for_java_and_python. Authored-by: uch Signed-off-by: Ruifeng Zheng --- .../spark/mllib/evaluation/RankingMetrics.scala| 16 ++--- .../mllib/evaluation/JavaRankingMetricsSuite.java | 27 ++ python/pyspark/mllib/evaluation.py | 14 --- 3 files changed, 51 insertions(+), 6 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/evaluation/RankingMetrics.scala b/mllib/src/main/scala/org/apache/spark/mllib/evaluation/RankingMetrics.scala index 87a17f57caf..6ff8262c498 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/evaluation/RankingMetrics.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/evaluation/RankingMetrics.scala @@ -267,12 +267,22 @@ object RankingMetrics { /** * Creates a [[RankingMetrics]] instance (for Java users). * @param predictionAndLabels a JavaRDD of (predicted ranking, ground truth set) pairs + *or (predicted ranking, ground truth set, + *relevance value of ground truth set). + *Since 3.4.0, it supports ndcg evaluation with relevance value. */ @Since("1.4.0") - def of[E, T <: jl.Iterable[E]](predictionAndLabels: JavaRDD[(T, T)]): RankingMetrics[E] = { + def of[E, T <: jl.Iterable[E], A <: jl.Iterable[Double]]( + predictionAndLabels: JavaRDD[_ <: Product]): RankingMetrics[E] = { implicit val tag = JavaSparkContext.fakeClassTag[E] -val rdd = predictionAndLabels.rdd.map { case (predictions, labels) => - (predictions.asScala.toArray, labels.asScala.toArray) +val rdd = predictionAndLabels.rdd.map { + case (predictions, labels) => +(predictions.asInstanceOf[T].asScala.toArray, labels.asInstanceOf[T].asScala.toArray) + case (predictions, labels, rels) => +( + predictions.asInstanceOf[T].asScala.toArray, + labels.asInstanceOf[T].asScala.toArray, + rels.asInstanceOf[A].asScala.toArray) } new RankingMetrics(rdd) } diff --git a/mllib/src/test/java/org/apache/spark/mllib/evaluation/JavaRankingMetricsSuite.java b/mllib/src/test/java/org/apache/spark/mllib/evaluation/JavaRankingMetricsSuite.java index 50822c61fdc..4dcb2920610 100644 --- a/mllib/src/test/java/org/apache/spark/mllib/evaluation/JavaRankingMetricsSuite.java +++ b/mllib/src/test/java/org/apache/spark/mllib/evaluation/JavaRankingMetricsSuite.java @@ -22,7 +22,9 @@ import java.util.Arrays; import java.util.List; import scala.Tuple2; +import scala.Tuple3; import scala.Tuple2$; +import scala.Tuple3$; import org.junit.Assert; import org.junit.Test; @@ -32,6 +34,8 @@ import org.apache.spark.api.java.JavaRDD; public class JavaRankingMetricsSuite extends SharedSparkSession { private transient JavaRDD, List>> predictionAndLabels; + private transient JavaRDD, List, List>> +predictionLabelsAndRelevance; @Override public void setUp() throws IOException { @@ -43,6 +47,22 @@ public class JavaRankingMetricsSuite extends SharedSparkSession { Arrays.asList(4, 1, 5, 6, 2, 7, 3, 8, 9, 10), Arrays.asList(1, 2, 3)), Tuple2$.MODULE$.apply( Arrays.asList(1, 2, 3, 4, 5), Arrays.asList())), 2); +predictionLabelsAndRelevance = jsc.parallelize(Arrays.asList( + Tuple3$.MODULE$.apply( +Arrays.asList(1, 6, 2, 7, 8, 3, 9, 10, 4, 5), +Arrays.asList(1, 2, 3, 4, 5), +Arrays.asList(3.0, 2.0, 1.0, 1.0, 1.0) + ), + Tuple3$.MODULE$.apply( +Arrays.asList(4, 1, 5, 6, 2, 7, 3, 8, 9, 10), +Arrays.asList(1, 2, 3), +Arrays.asList(2.0, 0.0, 0.0) + ), + Tuple3$.MO
[spark] branch master updated: [SPARK-39615][SQL] Make listColumns be compatible with 3 layer namespace
This is an automated email from the ASF dual-hosted git repository. ruifengz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 1f15f2c6ad7 [SPARK-39615][SQL] Make listColumns be compatible with 3 layer namespace 1f15f2c6ad7 is described below commit 1f15f2c6ad7ff8e593d39dd264b4a6efa89d67af Author: Ruifeng Zheng AuthorDate: Wed Jun 29 10:00:59 2022 +0800 [SPARK-39615][SQL] Make listColumns be compatible with 3 layer namespace ### What changes were proposed in this pull request? Make listColumns be compatible with 3 layer namespace ### Why are the changes needed? for 3 layer namespace compatiblity ### Does this PR introduce _any_ user-facing change? Yes ### How was this patch tested? added UT Closes #37000 from zhengruifeng/sql_3L_list_cols. Authored-by: Ruifeng Zheng Signed-off-by: Ruifeng Zheng --- .../apache/spark/sql/internal/CatalogImpl.scala| 59 -- .../apache/spark/sql/internal/CatalogSuite.scala | 46 + 2 files changed, 102 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala index 49cb9a3e897..97226736691 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala @@ -29,7 +29,7 @@ import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder import org.apache.spark.sql.catalyst.plans.logical.{CreateTable, LocalRelation, RecoverPartitions, ShowTables, SubqueryAlias, TableSpec, View} import org.apache.spark.sql.catalyst.util.CharVarcharUtils import org.apache.spark.sql.connector.catalog.{CatalogManager, Identifier, SupportsNamespaces, TableCatalog} -import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.CatalogHelper +import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.{CatalogHelper, IdentifierHelper, TransformHelper} import org.apache.spark.sql.errors.QueryCompilationErrors import org.apache.spark.sql.execution.datasources.DataSource import org.apache.spark.sql.types.StructType @@ -208,8 +208,23 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog { */ @throws[AnalysisException]("table does not exist") override def listColumns(tableName: String): Dataset[Column] = { -val tableIdent = sparkSession.sessionState.sqlParser.parseTableIdentifier(tableName) -listColumns(tableIdent) +// calling `sqlParser.parseTableIdentifier` to parse tableName. If it contains only table name +// and optionally contains a database name(thus a TableIdentifier), then we look up the table in +// sessionCatalog. Otherwise we try `sqlParser.parseMultipartIdentifier` to have a sequence of +// string as the qualified identifier and resolve the table through SQL analyzer. +try { + val ident = sparkSession.sessionState.sqlParser.parseTableIdentifier(tableName) + if (tableExists(ident.database.orNull, ident.table)) { +listColumns(ident) + } else { +val ident = sparkSession.sessionState.sqlParser.parseMultipartIdentifier(tableName) +listColumns(ident) + } +} catch { + case e: org.apache.spark.sql.catalyst.parser.ParseException => +val ident = sparkSession.sessionState.sqlParser.parseMultipartIdentifier(tableName) +listColumns(ident) +} } /** @@ -238,6 +253,44 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog { CatalogImpl.makeDataset(columns, sparkSession) } + private def listColumns(ident: Seq[String]): Dataset[Column] = { +val plan = UnresolvedTableOrView(ident, "Catalog.listColumns", true) + +val columns = sparkSession.sessionState.executePlan(plan).analyzed match { + case ResolvedTable(_, _, table, _) => +val (partitionColumnNames, bucketSpecOpt) = table.partitioning.toSeq.convertTransforms +val bucketColumnNames = bucketSpecOpt.map(_.bucketColumnNames).getOrElse(Nil) +table.schema.map { field => + new Column( +name = field.name, +description = field.getComment().orNull, +dataType = field.dataType.simpleString, +nullable = field.nullable, +isPartition = partitionColumnNames.contains(field.name), +isBucket = bucketColumnNames.contains(field.name)) +} + + case ResolvedView(identifier, _) => +val catalog = sparkSession.sessionState.catalog +val table = identifier.asTableIdentifier +val schema = catalog.getTempViewOrPermanentTableMetadata(table).schema +schema.map { field => + new Column( +name = field.name, +desc
[spark] branch master updated (8f744783531 -> e1020a241a0)
This is an automated email from the ASF dual-hosted git repository. ruifengz pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git from 8f744783531 [SPARK-39596][INFRA] Install `ggplot2` for GitHub Action linter job add e1020a241a0 [SPARK-39483][PYTHON] Construct the schema from `np.dtype` when `createDataFrame` from a NumPy array No new revisions were added by this update. Summary of changes: python/pyspark/sql/session.py | 11 +++ python/pyspark/sql/tests/test_arrow.py | 32 python/pyspark/sql/types.py| 25 + 3 files changed, 52 insertions(+), 16 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-39284][PS] Implement Groupby.mad
This is an automated email from the ASF dual-hosted git repository. ruifengz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new d793c5c6858 [SPARK-39284][PS] Implement Groupby.mad d793c5c6858 is described below commit d793c5c6858cb3d89fd981495a85f4c60ae63035 Author: Ruifeng Zheng AuthorDate: Sun Jun 5 09:49:24 2022 +0800 [SPARK-39284][PS] Implement Groupby.mad ### What changes were proposed in this pull request? Implement Groupby.mad ### Why are the changes needed? to increase pandas api coverage ### Does this PR introduce _any_ user-facing change? yes ``` In [6]: pdf = pd.DataFrame({"A": [1, 2, 2, 1, 1], "B": [3, 2, 3, 9, 0], "C": [3, 4, 13, -14, 9]}) In [7]: psdf = ps.from_pandas(pdf) In [8]: pdf.groupby("A")[["B", "C"]].mad() Out[8]: B C A 1 3.33 8.89 2 0.50 4.50 In [9]: psdf.groupby("A")[["B", "C"]].mad() Out[9]: B C A 1 3.33 8.89 2 0.50 4.50 In [10]: pdf.B.groupby(pdf.A).mad() Out[10]: A 13.33 20.50 Name: B, dtype: float64 In [11]: psdf.B.groupby(psdf.A).mad() Out[11]: A 13.33 20.50 Name: B, dtype: float64 ``` ### How was this patch tested? added ut Closes #36660 from zhengruifeng/ps_groupby_mad. Lead-authored-by: Ruifeng Zheng Co-authored-by: Ruifeng Zheng Signed-off-by: Ruifeng Zheng --- python/pyspark/pandas/groupby.py| 84 +++-- python/pyspark/pandas/missing/groupby.py| 2 - python/pyspark/pandas/tests/test_groupby.py | 3 ++ 3 files changed, 83 insertions(+), 6 deletions(-) diff --git a/python/pyspark/pandas/groupby.py b/python/pyspark/pandas/groupby.py index ce8a322c20b..4377ad6a5c9 100644 --- a/python/pyspark/pandas/groupby.py +++ b/python/pyspark/pandas/groupby.py @@ -753,6 +753,80 @@ class GroupBy(Generic[FrameLike], metaclass=ABCMeta): bool_to_numeric=True, ) +# TODO: 'axis', 'skipna', 'level' parameter should be implemented. +def mad(self) -> FrameLike: +""" +Compute mean absolute deviation of groups, excluding missing values. + +.. versionadded:: 3.4.0 + +Examples + +>>> df = ps.DataFrame({"A": [1, 2, 1, 1], "B": [True, False, False, True], +..."C": [3, 4, 3, 4], "D": ["a", "b", "b", "a"]}) + +>>> df.groupby("A").mad() + B C +A +1 0.44 0.44 +2 0.00 0.00 + +>>> df.B.groupby(df.A).mad() +A +10.44 +20.00 +Name: B, dtype: float64 + +See Also + +pyspark.pandas.Series.groupby +pyspark.pandas.DataFrame.groupby +""" +groupkey_names = [SPARK_INDEX_NAME_FORMAT(i) for i in range(len(self._groupkeys))] +internal, agg_columns, sdf = self._prepare_reduce( +groupkey_names=groupkey_names, +accepted_spark_types=(NumericType, BooleanType), +bool_to_numeric=False, +) +psdf: DataFrame = DataFrame(internal) + +if len(psdf._internal.column_labels) > 0: +window = Window.partitionBy(groupkey_names).rowsBetween( +Window.unboundedPreceding, Window.unboundedFollowing +) +new_agg_scols = {} +new_stat_scols = [] +for agg_column in agg_columns: +# it is not able to directly use 'self._reduce_for_stat_function', due to +# 'it is not allowed to use a window function inside an aggregate function'. +# so we need to create temporary columns to compute the 'abs(x - avg(x))' here. +agg_column_name = agg_column._internal.data_spark_column_names[0] +new_agg_column_name = verify_temp_column_name( +psdf._internal.spark_frame, "__tmp_agg_col_{}__".format(agg_column_name) +) +casted_agg_scol = F.col(agg_column_name).cast("double") +new_agg_scols[new_agg_column_name] = F.abs( +casted_agg_scol - F.avg(casted_agg_scol).over(window) +) + new_stat_scols.append(F.avg(F.col(new_agg_column_name)).alias(agg_column_name)) + +sdf = ( +psdf._internal.spark_frame.withColumns(new_agg_scols) +.g
[spark] branch master updated: [SPARK-39228][PYTHON][PS] Implement `skipna` of `Series.argmax`
This is an automated email from the ASF dual-hosted git repository. ruifengz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 2fd76994860 [SPARK-39228][PYTHON][PS] Implement `skipna` of `Series.argmax` 2fd76994860 is described below commit 2fd769948604eff38d90974017971434484897d6 Author: Xinrong Meng AuthorDate: Fri May 27 09:25:21 2022 +0800 [SPARK-39228][PYTHON][PS] Implement `skipna` of `Series.argmax` ### What changes were proposed in this pull request? Implement `skipna` of `Series.argmax` ### Why are the changes needed? Increase pandas API coverage. ### Does this PR introduce _any_ user-facing change? Yes. `skipna` of `Series.argmax` is supported as below. ```py >>> s = ps.Series({'Corn Flakes': 100.0, 'Almond Delight': 110.0, 'Unknown': np.nan, ...'Cinnamon Toast Crunch': 120.0, 'Cocoa Puff': 110.0}) >>> s Corn Flakes 100.0 Almond Delight 110.0 UnknownNaN Cinnamon Toast Crunch120.0 Cocoa Puff 110.0 dtype: float64 >>> s.argmax(skipna=True) 3 >>> s.argmax(skipna=False) -1 ``` ### How was this patch tested? Unit tests. Closes #36599 from xinrong-databricks/argmax.skipna. Authored-by: Xinrong Meng Signed-off-by: Ruifeng Zheng --- python/pyspark/pandas/series.py| 50 +- python/pyspark/pandas/tests/test_series.py | 9 +- 2 files changed, 44 insertions(+), 15 deletions(-) diff --git a/python/pyspark/pandas/series.py b/python/pyspark/pandas/series.py index 29afcbe956e..653b4812cad 100644 --- a/python/pyspark/pandas/series.py +++ b/python/pyspark/pandas/series.py @@ -6249,13 +6249,21 @@ class Series(Frame, IndexOpsMixin, Generic[T]): ps.concat([psser, self.loc[self.isnull()].spark.transform(lambda _: SF.lit(-1))]), ) -def argmax(self) -> int: +def argmax(self, axis: Axis = None, skipna: bool = True) -> int: """ Return int position of the largest value in the Series. If the maximum is achieved in multiple locations, the first row position is returned. +Parameters +-- +axis : {{None}} +Dummy argument for consistency with Series. +skipna : bool, default True +Exclude NA/null values. If the entire Series is NA, the result +will be NA. + Returns --- int @@ -6265,36 +6273,50 @@ class Series(Frame, IndexOpsMixin, Generic[T]): Consider dataset containing cereal calories ->>> s = ps.Series({'Corn Flakes': 100.0, 'Almond Delight': 110.0, +>>> s = ps.Series({'Corn Flakes': 100.0, 'Almond Delight': 110.0, 'Unknown': np.nan, ...'Cinnamon Toast Crunch': 120.0, 'Cocoa Puff': 110.0}) ->>> s # doctest: +SKIP +>>> s Corn Flakes 100.0 Almond Delight 110.0 +UnknownNaN Cinnamon Toast Crunch120.0 Cocoa Puff 110.0 dtype: float64 ->>> s.argmax() # doctest: +SKIP -2 +>>> s.argmax() +3 + +>>> s.argmax(skipna=False) +-1 """ +axis = validate_axis(axis, none_axis=0) +if axis == 1: +raise ValueError("axis can only be 0 or 'index'") sdf = self._internal.spark_frame.select(self.spark.column, NATURAL_ORDER_COLUMN_NAME) +seq_col_name = verify_temp_column_name(sdf, "__distributed_sequence_column__") +sdf = InternalFrame.attach_distributed_sequence_column( +sdf, +seq_col_name, +) +scol = scol_for(sdf, self._internal.data_spark_column_names[0]) + +if skipna: +sdf = sdf.orderBy(scol.desc_nulls_last(), NATURAL_ORDER_COLUMN_NAME) +else: +sdf = sdf.orderBy(scol.desc_nulls_first(), NATURAL_ORDER_COLUMN_NAME) + max_value = sdf.select( -F.max(scol_for(sdf, self._internal.data_spark_column_names[0])), +F.first(scol), F.first(NATURAL_ORDER_COLUMN_NAME), ).head() + if max_value[1] is None: raise ValueError("attempt to get argmax of an empty sequence") elif max_value[0] is None: return -1 -# We should remember the natural sequence started from 0 -seq_col_name = verify_temp_column_name(sdf
[spark] branch master updated: [SPARK-39300][PS] Move pandasSkewness and pandasKurtosis into pandas.spark.functions
This is an automated email from the ASF dual-hosted git repository. ruifengz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 283aa9941bb [SPARK-39300][PS] Move pandasSkewness and pandasKurtosis into pandas.spark.functions 283aa9941bb is described below commit 283aa9941bb159b05542d81138d92d5dc79fbde8 Author: Ruifeng Zheng AuthorDate: Thu May 26 19:46:14 2022 +0800 [SPARK-39300][PS] Move pandasSkewness and pandasKurtosis into pandas.spark.functions init ### What changes were proposed in this pull request? `pandasSkewness` and `pandasKurtosis` are used in `generic`,`groupby`,`window`. move them into `SF` for reuse ### Why are the changes needed? code clean up ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? existing UT Closes #36682 from zhengruifeng/ps_mv_pandas_skew_kurt_to_SF. Authored-by: Ruifeng Zheng Signed-off-by: Ruifeng Zheng --- python/pyspark/pandas/generic.py | 7 ++- python/pyspark/pandas/groupby.py | 8 +--- python/pyspark/pandas/spark/functions.py | 10 ++ python/pyspark/pandas/window.py | 6 ++ 4 files changed, 15 insertions(+), 16 deletions(-) diff --git a/python/pyspark/pandas/generic.py b/python/pyspark/pandas/generic.py index ce13ae5ad1b..dbfaedc9321 100644 --- a/python/pyspark/pandas/generic.py +++ b/python/pyspark/pandas/generic.py @@ -41,7 +41,6 @@ import numpy as np import pandas as pd from pandas.api.types import is_list_like # type: ignore[attr-defined] -from pyspark import SparkContext from pyspark.sql import Column, functions as F from pyspark.sql.types import ( BooleanType, @@ -1518,8 +1517,7 @@ class Frame(object, metaclass=ABCMeta): ) ) -sql_utils = SparkContext._active_spark_context._jvm.PythonSQLUtils -return Column(sql_utils.pandasSkewness(spark_column._jc)) +return SF.skew(spark_column) return self._reduce_for_stat_function( skew, @@ -1588,8 +1586,7 @@ class Frame(object, metaclass=ABCMeta): ) ) -sql_utils = SparkContext._active_spark_context._jvm.PythonSQLUtils -return Column(sql_utils.pandasKurtosis(spark_column._jc)) +return SF.kurt(spark_column) return self._reduce_for_stat_function( kurtosis, diff --git a/python/pyspark/pandas/groupby.py b/python/pyspark/pandas/groupby.py index 03e6a038232..3201d70d417 100644 --- a/python/pyspark/pandas/groupby.py +++ b/python/pyspark/pandas/groupby.py @@ -54,7 +54,6 @@ else: _builtin_table = SelectionMixin._builtin_table # type: ignore[attr-defined] -from pyspark import SparkContext from pyspark.sql import Column, DataFrame as SparkDataFrame, Window, functions as F from pyspark.sql.types import ( BooleanType, @@ -748,13 +747,8 @@ class GroupBy(Generic[FrameLike], metaclass=ABCMeta): pyspark.pandas.Series.groupby pyspark.pandas.DataFrame.groupby """ - -def skew(scol: Column) -> Column: -sql_utils = SparkContext._active_spark_context._jvm.PythonSQLUtils -return Column(sql_utils.pandasSkewness(scol._jc)) - return self._reduce_for_stat_function( -skew, +SF.skew, accepted_spark_types=(NumericType,), bool_to_numeric=True, ) diff --git a/python/pyspark/pandas/spark/functions.py b/python/pyspark/pandas/spark/functions.py index b7d57b4c3f8..11f9dbbb8c0 100644 --- a/python/pyspark/pandas/spark/functions.py +++ b/python/pyspark/pandas/spark/functions.py @@ -36,6 +36,16 @@ from pyspark.sql.types import ( ) +def skew(col: Column) -> Column: +sc = SparkContext._active_spark_context +return Column(sc._jvm.PythonSQLUtils.pandasSkewness(col._jc)) + + +def kurt(col: Column) -> Column: +sc = SparkContext._active_spark_context +return Column(sc._jvm.PythonSQLUtils.pandasKurtosis(col._jc)) + + def repeat(col: Column, n: Union[int, Column]) -> Column: """ Repeats a string column n times, and returns it as a new string column. diff --git a/python/pyspark/pandas/window.py b/python/pyspark/pandas/window.py index 5c21e31f377..2808f72fd3c 100644 --- a/python/pyspark/pandas/window.py +++ b/python/pyspark/pandas/window.py @@ -121,20 +121,18 @@ class RollingAndExpanding(Generic[FrameLike], metaclass=ABCMeta): def skew(self) -> FrameLike: def skew(scol: Column) -> Column: -sql_utils = SparkContext._active_spark_context._jvm.PythonSQLUtils return F.when( F.row_number().over(self._unbounded_window) >= self._min_periods, -Column(sq
[spark] branch master updated: [SPARK-39268][SQL] AttachDistributedSequenceExec do not checkpoint childRDD with single partition
This is an automated email from the ASF dual-hosted git repository. ruifengz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new f673ebd8afc [SPARK-39268][SQL] AttachDistributedSequenceExec do not checkpoint childRDD with single partition f673ebd8afc is described below commit f673ebd8afc94a3b434a0156b61366fede80b8f9 Author: Ruifeng Zheng AuthorDate: Thu May 26 12:30:25 2022 +0800 [SPARK-39268][SQL] AttachDistributedSequenceExec do not checkpoint childRDD with single partition ### What changes were proposed in this pull request? do not checkpoint child rdd when it only has 1 partition ### Why are the changes needed? avoid necessary checkpoint when child rdd only has 1 partition, `zipWithIndex` will not trigger an action ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? existing suites Closes #36648 from zhengruifeng/sql_do_not_checkpoint_with_single_partition. Authored-by: Ruifeng Zheng Signed-off-by: Ruifeng Zheng --- python/pyspark/pandas/tests/test_groupby.py | 9 ++--- .../sql/execution/python/AttachDistributedSequenceExec.scala | 11 --- 2 files changed, 14 insertions(+), 6 deletions(-) diff --git a/python/pyspark/pandas/tests/test_groupby.py b/python/pyspark/pandas/tests/test_groupby.py index 045cbaf5274..ac1e73f9d5d 100644 --- a/python/pyspark/pandas/tests/test_groupby.py +++ b/python/pyspark/pandas/tests/test_groupby.py @@ -2256,9 +2256,12 @@ class GroupByTest(PandasOnSparkTestCase, TestUtils): acc += 1 return np.sum(x) -actual = psdf.groupby("d").apply(sum_with_acc_frame).sort_index() +actual = psdf.groupby("d").apply(sum_with_acc_frame) actual.columns = ["d", "v"] -self.assert_eq(actual, pdf.groupby("d").apply(sum).sort_index().reset_index(drop=True)) +self.assert_eq( +actual.to_pandas().sort_index(), +pdf.groupby("d").apply(sum).sort_index().reset_index(drop=True), +) self.assert_eq(acc.value, 2) def sum_with_acc_series(x) -> np.float64: @@ -2267,7 +2270,7 @@ class GroupByTest(PandasOnSparkTestCase, TestUtils): return np.sum(x) self.assert_eq( -psdf.groupby("d")["v"].apply(sum_with_acc_series).sort_index(), + psdf.groupby("d")["v"].apply(sum_with_acc_series).to_pandas().sort_index(), pdf.groupby("d")["v"].apply(sum).sort_index().reset_index(drop=True), ) self.assert_eq(acc.value, 4) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/AttachDistributedSequenceExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/AttachDistributedSequenceExec.scala index 203fb6d7d50..5f722826fc7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/AttachDistributedSequenceExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/AttachDistributedSequenceExec.scala @@ -41,9 +41,14 @@ case class AttachDistributedSequenceExec( override def outputPartitioning: Partitioning = child.outputPartitioning override protected def doExecute(): RDD[InternalRow] = { -child.execute().map(_.copy()) -.localCheckpoint() // to avoid execute multiple jobs. zipWithIndex launches a Spark job. -.zipWithIndex().mapPartitions { iter => +val childRDD = child.execute().map(_.copy()) +val checkpointed = if (childRDD.getNumPartitions > 1) { + // to avoid execute multiple jobs. zipWithIndex launches a Spark job. + childRDD.localCheckpoint() +} else { + childRDD +} +checkpointed.zipWithIndex().mapPartitions { iter => val unsafeProj = UnsafeProjection.create(output, output) val joinedRow = new JoinedRow val unsafeRowWriter = - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-39246][PS] Implement Groupby.skew
This is an automated email from the ASF dual-hosted git repository. ruifengz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new d165de8c04c [SPARK-39246][PS] Implement Groupby.skew d165de8c04c is described below commit d165de8c04c41de5c67925cf670b2d7211c4da68 Author: Ruifeng Zheng AuthorDate: Sun May 22 13:42:12 2022 +0800 [SPARK-39246][PS] Implement Groupby.skew ### What changes were proposed in this pull request? Implement Groupby.skew ### Why are the changes needed? for api coverage ### Does this PR introduce _any_ user-facing change? yes, new api added ``` In [4]: df = ps.DataFrame({"A": [1, 2, 1, 1], "B": [True, False, False, True], "C": [3, 4, 3, 4], "D": ["a", "b", "b", "a"]}) In [5]: df.groupby("A").skew() Out[5]: B C A 1 -1.732051 1.732051 2 NaN NaN ``` ### How was this patch tested? added UT Closes #36624 from zhengruifeng/ps_groupby_skew_kurt. Authored-by: Ruifeng Zheng Signed-off-by: Ruifeng Zheng --- python/pyspark/pandas/groupby.py| 34 + python/pyspark/pandas/missing/groupby.py| 2 -- python/pyspark/pandas/tests/test_groupby.py | 2 +- 3 files changed, 35 insertions(+), 3 deletions(-) diff --git a/python/pyspark/pandas/groupby.py b/python/pyspark/pandas/groupby.py index d1cff8e960d..03e6a038232 100644 --- a/python/pyspark/pandas/groupby.py +++ b/python/pyspark/pandas/groupby.py @@ -54,6 +54,7 @@ else: _builtin_table = SelectionMixin._builtin_table # type: ignore[attr-defined] +from pyspark import SparkContext from pyspark.sql import Column, DataFrame as SparkDataFrame, Window, functions as F from pyspark.sql.types import ( BooleanType, @@ -725,6 +726,39 @@ class GroupBy(Generic[FrameLike], metaclass=ABCMeta): bool_to_numeric=True, ) +def skew(self) -> FrameLike: +""" +Compute skewness of groups, excluding missing values. + +.. versionadded:: 3.4.0 + +Examples + +>>> df = ps.DataFrame({"A": [1, 2, 1, 1], "B": [True, False, False, True], +..."C": [3, 4, 3, 4], "D": ["a", "b", "b", "a"]}) + +>>> df.groupby("A").skew() + B C +A +1 -1.732051 1.732051 +2 NaN NaN + +See Also + +pyspark.pandas.Series.groupby +pyspark.pandas.DataFrame.groupby +""" + +def skew(scol: Column) -> Column: +sql_utils = SparkContext._active_spark_context._jvm.PythonSQLUtils +return Column(sql_utils.pandasSkewness(scol._jc)) + +return self._reduce_for_stat_function( +skew, +accepted_spark_types=(NumericType,), +bool_to_numeric=True, +) + # TODO: skipna should be implemented. def all(self) -> FrameLike: """ diff --git a/python/pyspark/pandas/missing/groupby.py b/python/pyspark/pandas/missing/groupby.py index d0867e4982f..3ea443ebd6e 100644 --- a/python/pyspark/pandas/missing/groupby.py +++ b/python/pyspark/pandas/missing/groupby.py @@ -52,7 +52,6 @@ class MissingPandasLikeDataFrameGroupBy: ngroups = _unsupported_property("ngroups") plot = _unsupported_property("plot") quantile = _unsupported_property("quantile") -skew = _unsupported_property("skew") tshift = _unsupported_property("tshift") # Deprecated properties @@ -87,7 +86,6 @@ class MissingPandasLikeSeriesGroupBy: ngroups = _unsupported_property("ngroups") plot = _unsupported_property("plot") quantile = _unsupported_property("quantile") -skew = _unsupported_property("skew") tshift = _unsupported_property("tshift") # Deprecated properties diff --git a/python/pyspark/pandas/tests/test_groupby.py b/python/pyspark/pandas/tests/test_groupby.py index 1375d7a9bc0..045cbaf5274 100644 --- a/python/pyspark/pandas/tests/test_groupby.py +++ b/python/pyspark/pandas/tests/test_groupby.py @@ -266,7 +266,7 @@ class GroupByTest(PandasOnSparkTestCase, TestUtils): funcs = [ ((True, False), ["sum", "min", "max", "count", "first", "last"]), ((True, True), ["mean"]), -((False, False), ["var", "std"]), +((False, False), ["var", "std", "skew"]), ] funcs = [(check_exact, almost, f) for (check_exact, almost), fs in funcs for f in fs] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (49562f41678 -> 291d155b3c5)
This is an automated email from the ASF dual-hosted git repository. ruifengz pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git from 49562f41678 [SPARK-39167][SQL] Throw an exception w/ an error class for multiple rows from a subquery used as an expression add 291d155b3c5 [SPARK-39129][PS] Implement GroupBy.ewm No new revisions were added by this update. Summary of changes: .../source/reference/pyspark.pandas/groupby.rst| 1 + python/pyspark/pandas/groupby.py | 70 ++- python/pyspark/pandas/missing/window.py| 12 ++ python/pyspark/pandas/tests/test_ewm.py| 212 + python/pyspark/pandas/tests/test_window.py | 72 +++ python/pyspark/pandas/usage_logging/__init__.py| 18 +- python/pyspark/pandas/window.py| 112 +++ 7 files changed, 494 insertions(+), 3 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (2349f74866a -> a24dfd697c7)
This is an automated email from the ASF dual-hosted git repository. ruifengz pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git from 2349f74866a [SPARK-39121][K8S][DOCS] Fix format error on running-on-kubernetes doc add a24dfd697c7 [SPARK-39114][ML] ml.optim.aggregator avoid re-allocating buffers No new revisions were added by this update. Summary of changes: .../ml/optim/aggregator/AFTBlockAggregator.scala | 16 +-- .../aggregator/BinaryLogisticBlockAggregator.scala | 14 -- .../ml/optim/aggregator/HingeBlockAggregator.scala | 16 +-- .../ml/optim/aggregator/HuberBlockAggregator.scala | 16 +-- .../aggregator/LeastSquaresBlockAggregator.scala | 14 +- .../MultinomialLogisticBlockAggregator.scala | 53 +- 6 files changed, 104 insertions(+), 25 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (c1c3f4f -> 669c625)
This is an automated email from the ASF dual-hosted git repository. ruifengz pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from c1c3f4f [SPARK-38679][SQL][TESTS][FOLLOW-UP] Add numPartitions parameter to TaskContextImpl at SubexpressionEliminationSuite add 669c625 [SPARK-38588][ML][FOLLOWUP] fix ga failed at `Bernoulli: check vectors` No new revisions were added by this update. Summary of changes: .../test/scala/org/apache/spark/ml/classification/NaiveBayesSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (3858bf0 -> b902936)
This is an automated email from the ASF dual-hosted git repository. ruifengz pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 3858bf0 [SPARK-38063][SQL] Support split_part Function add b902936 [SPARK-38588][ML] Validate input dataset of ml.classification No new revisions were added by this update. Summary of changes: .../spark/ml/classification/Classifier.scala | 7 +- .../ml/classification/DecisionTreeClassifier.scala | 13 ++- .../spark/ml/classification/FMClassifier.scala | 13 ++- .../spark/ml/classification/GBTClassifier.scala| 22 ++-- .../apache/spark/ml/classification/LinearSVC.scala | 9 +- .../ml/classification/LogisticRegression.scala | 11 +- .../MultilayerPerceptronClassifier.scala | 15 ++- .../spark/ml/classification/NaiveBayes.scala | 101 ++--- .../ml/classification/RandomForestClassifier.scala | 13 ++- .../org/apache/spark/ml/util/DatasetUtils.scala| 70 +++- .../spark/mllib/classification/NaiveBayes.scala| 4 +- .../spark/ml/classification/ClassifierSuite.scala | 8 +- .../DecisionTreeClassifierSuite.scala | 6 + .../ml/classification/FMClassifierSuite.scala | 5 + .../ml/classification/GBTClassifierSuite.scala | 38 ++- .../spark/ml/classification/LinearSVCSuite.scala | 6 + .../classification/LogisticRegressionSuite.scala | 6 + .../MultilayerPerceptronClassifierSuite.scala | 14 +++ .../spark/ml/classification/NaiveBayesSuite.scala | 83 +- .../RandomForestClassifierSuite.scala | 6 + .../scala/org/apache/spark/ml/util/MLTest.scala| 126 - 21 files changed, 432 insertions(+), 144 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (a342214 -> 5bb001b)
This is an automated email from the ASF dual-hosted git repository. ruifengz pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from a342214 [SPARK-38535][SQL] Add the `datetimeUnit` enum and use it in `TIMESTAMPADD/DIFF` add 5bb001b [SPARK-36967][FOLLOWUP][CORE] Report accurate shuffle block size if its skewed No new revisions were added by this update. Summary of changes: core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala | 2 +- core/src/main/scala/org/apache/spark/util/Utils.scala| 5 +++-- core/src/test/scala/org/apache/spark/scheduler/MapStatusSuite.scala | 2 +- .../org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala | 4 ++-- 4 files changed, 7 insertions(+), 6 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (8e44791 -> 2844a18)
This is an automated email from the ASF dual-hosted git repository. ruifengz pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 8e44791 [SPARK-38504][SQL] Cannot read TimestampNTZ as TimestampLTZ add 2844a18 [SPARK-38360][SQL][AVRO][SS][FOLLOWUP] Replace `TreeNode.collectFirst` + `isDefined/isEmpty` with `exists` No new revisions were added by this update. Summary of changes: .../test/scala/org/apache/spark/sql/avro/AvroSuite.scala | 7 --- .../org/apache/spark/sql/catalyst/analysis/Analyzer.scala | 15 ++- .../spark/sql/catalyst/analysis/StreamingJoinHelper.scala | 2 +- .../spark/sql/catalyst/plans/logical/LogicalPlan.scala| 7 --- .../org/apache/spark/sql/execution/SparkStrategies.scala | 5 - .../org/apache/spark/sql/FileBasedDataSourceSuite.scala | 7 --- .../src/test/scala/org/apache/spark/sql/QueryTest.scala | 11 ++- .../spark/sql/execution/WholeStageCodegenSuite.scala | 10 ++ .../sql/streaming/test/DataStreamTableAPISuite.scala | 8 +++- .../scala/org/apache/spark/sql/hive/client/HiveShim.scala | 9 + 10 files changed, 43 insertions(+), 38 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-35707][ML] optimize sparse GEMM by skipping bound checking
This is an automated email from the ASF dual-hosted git repository. ruifengz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 5c96d64 [SPARK-35707][ML] optimize sparse GEMM by skipping bound checking 5c96d64 is described below commit 5c96d643eeb4ca1ad7e4e9cc711971203fcacc6c Author: Ruifeng Zheng AuthorDate: Wed Jun 16 08:57:27 2021 +0800 [SPARK-35707][ML] optimize sparse GEMM by skipping bound checking ### What changes were proposed in this pull request? Sparse gemm use mothod `DenseMatrix.apply` to access the values, which can be optimized by skipping checking the bound and `isTransposed` ``` override def apply(i: Int, j: Int): Double = values(index(i, j)) private[ml] def index(i: Int, j: Int): Int = { require(i >= 0 && i < numRows, s"Expected 0 <= i < $numRows, got i = $i.") require(j >= 0 && j < numCols, s"Expected 0 <= j < $numCols, got j = $j.") if (!isTransposed) i + numRows * j else j + numCols * i } ``` ### Why are the changes needed? to improve performance, about 15% faster in the designed case ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? existing testsuite and additional performance test Closes #32857 from zhengruifeng/gemm_opt_index. Authored-by: Ruifeng Zheng Signed-off-by: Ruifeng Zheng --- mllib-local/src/main/scala/org/apache/spark/ml/linalg/BLAS.scala | 4 ++-- mllib/src/main/scala/org/apache/spark/mllib/linalg/BLAS.scala| 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/mllib-local/src/main/scala/org/apache/spark/ml/linalg/BLAS.scala b/mllib-local/src/main/scala/org/apache/spark/ml/linalg/BLAS.scala index 0bc8b2f..d1255de 100644 --- a/mllib-local/src/main/scala/org/apache/spark/ml/linalg/BLAS.scala +++ b/mllib-local/src/main/scala/org/apache/spark/ml/linalg/BLAS.scala @@ -480,7 +480,7 @@ private[spark] object BLAS extends Serializable { val indEnd = AcolPtrs(rowCounterForA + 1) var sum = 0.0 while (i < indEnd) { - sum += Avals(i) * B(ArowIndices(i), colCounterForB) + sum += Avals(i) * Bvals(colCounterForB + nB * ArowIndices(i)) i += 1 } val Cindex = Cstart + rowCounterForA @@ -522,7 +522,7 @@ private[spark] object BLAS extends Serializable { while (colCounterForA < kA) { var i = AcolPtrs(colCounterForA) val indEnd = AcolPtrs(colCounterForA + 1) -val Bval = B(colCounterForA, colCounterForB) * alpha +val Bval = Bvals(colCounterForB + nB * colCounterForA) * alpha while (i < indEnd) { Cvals(Cstart + ArowIndices(i)) += Avals(i) * Bval i += 1 diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/BLAS.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/BLAS.scala index e38cfe4..5cbec53 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/BLAS.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/BLAS.scala @@ -462,7 +462,7 @@ private[spark] object BLAS extends Serializable with Logging { val indEnd = AcolPtrs(rowCounterForA + 1) var sum = 0.0 while (i < indEnd) { - sum += Avals(i) * B(ArowIndices(i), colCounterForB) + sum += Avals(i) * Bvals(colCounterForB + nB * ArowIndices(i)) i += 1 } val Cindex = Cstart + rowCounterForA @@ -504,7 +504,7 @@ private[spark] object BLAS extends Serializable with Logging { while (colCounterForA < kA) { var i = AcolPtrs(colCounterForA) val indEnd = AcolPtrs(colCounterForA + 1) -val Bval = B(colCounterForA, colCounterForB) * alpha +val Bval = Bvals(colCounterForB + nB * colCounterForA) * alpha while (i < indEnd) { Cvals(Cstart + ArowIndices(i)) += Avals(i) * Bval i += 1 - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-35666][ML] gemv skip array shape checking
This is an automated email from the ASF dual-hosted git repository. ruifengz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 2802ac3 [SPARK-35666][ML] gemv skip array shape checking 2802ac3 is described below commit 2802ac321f7378c8a9113338c9872b8fd332de6b Author: Ruifeng Zheng AuthorDate: Wed Jun 16 08:54:34 2021 +0800 [SPARK-35666][ML] gemv skip array shape checking ### What changes were proposed in this pull request? In existing impls, it is common case that the vector/matrix need to be sliced/copied just due to shape match. which makes the logic complex and introduce extra costing of slicing & copying. ### Why are the changes needed? 1, avoid slicing and copying due to shape checking; 2, simpify the usages; ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? existing testsuites Closes #32805 from zhengruifeng/new_blas_func_for_agg. Authored-by: Ruifeng Zheng Signed-off-by: Ruifeng Zheng --- .../scala/org/apache/spark/ml/linalg/BLAS.scala| 60 -- .../ml/optim/aggregator/AFTBlockAggregator.scala | 24 +++-- .../aggregator/BinaryLogisticBlockAggregator.scala | 34 ++-- .../ml/optim/aggregator/HingeBlockAggregator.scala | 34 ++-- .../ml/optim/aggregator/HuberBlockAggregator.scala | 24 +++-- .../aggregator/LeastSquaresBlockAggregator.scala | 18 +++ 6 files changed, 84 insertions(+), 110 deletions(-) diff --git a/mllib-local/src/main/scala/org/apache/spark/ml/linalg/BLAS.scala b/mllib-local/src/main/scala/org/apache/spark/ml/linalg/BLAS.scala index 5a6bee3..0bc8b2f 100644 --- a/mllib-local/src/main/scala/org/apache/spark/ml/linalg/BLAS.scala +++ b/mllib-local/src/main/scala/org/apache/spark/ml/linalg/BLAS.scala @@ -536,6 +536,32 @@ private[spark] object BLAS extends Serializable { } /** + * y[0: A.numRows] := alpha * A * x[0: A.numCols] + beta * y[0: A.numRows] + */ + def gemv( + alpha: Double, + A: Matrix, + x: Array[Double], + beta: Double, + y: Array[Double]): Unit = { +require(A.numCols <= x.length, + s"The columns of A don't match the number of elements of x. A: ${A.numCols}, x: ${x.length}") +require(A.numRows <= y.length, + s"The rows of A don't match the number of elements of y. A: ${A.numRows}, y:${y.length}") +if (alpha == 0.0 && beta == 1.0) { + // gemv: alpha is equal to 0 and beta is equal to 1. Returning y. + return +} else if (alpha == 0.0) { + getBLAS(A.numRows).dscal(A.numRows, beta, y, 1) +} else { + A match { +case smA: SparseMatrix => gemvImpl(alpha, smA, x, beta, y) +case dmA: DenseMatrix => gemvImpl(alpha, dmA, x, beta, y) + } +} + } + + /** * y := alpha * A * x + beta * y * @param alpha a scalar to scale the multiplication A * x. * @param A the matrix A that will be left multiplied to x. Size of m x n. @@ -585,11 +611,24 @@ private[spark] object BLAS extends Serializable { x: DenseVector, beta: Double, y: DenseVector): Unit = { +gemvImpl(alpha, A, x.values, beta, y.values) + } + + /** + * y[0: A.numRows] := alpha * A * x[0: A.numCols] + beta * y[0: A.numRows] + * For `DenseMatrix` A. + */ + private def gemvImpl( + alpha: Double, + A: DenseMatrix, + xValues: Array[Double], + beta: Double, + yValues: Array[Double]): Unit = { val tStrA = if (A.isTransposed) "T" else "N" val mA = if (!A.isTransposed) A.numRows else A.numCols val nA = if (!A.isTransposed) A.numCols else A.numRows -nativeBLAS.dgemv(tStrA, mA, nA, alpha, A.values, mA, x.values, 1, beta, - y.values, 1) +nativeBLAS.dgemv(tStrA, mA, nA, alpha, A.values, mA, xValues, 1, beta, + yValues, 1) } /** @@ -715,8 +754,19 @@ private[spark] object BLAS extends Serializable { x: DenseVector, beta: Double, y: DenseVector): Unit = { -val xValues = x.values -val yValues = y.values +gemvImpl(alpha, A, x.values, beta, y.values) + } + + /** + * y[0: A.numRows] := alpha * A * x[0: A.numCols] + beta * y[0: A.numRows] + * For `SparseMatrix` A. + */ + private def gemvImpl( + alpha: Double, + A: SparseMatrix, + xValues: Array[Double], + beta: Double, + yValues: Array[Double]): Unit = { val mA: Int = A.numRows val nA: Int = A.numCols @@ -738,7 +788,7 @@ private[spark] object BLAS extends Serializable { rowCounter += 1 } } else { - if (beta != 1.0) scal(beta, y) + if (beta != 1.0) getBLAS(mA).dscal(mA, beta, yValues, 1) // Perform matrix-vector multiplication and add to y var colCounterForA = 0 while (colCounterForA <
[spark] branch master updated (2a56cc3 -> 8c4b535)
This is an automated email from the ASF dual-hosted git repository. ruifengz pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 2a56cc3 [SPARK-35761][PYTHON] Use type-annotation based pandas_udf or avoid specifying udf types to suppress warnings add 8c4b535 [SPARK-35678][ML] add a common softmax function No new revisions were added by this update. Summary of changes: .../scala/org/apache/spark/ml/impl/Utils.scala | 31 + .../ml/classification/LogisticRegression.scala | 29 ++-- .../MultinomialLogisticBlockAggregator.scala | 32 -- 3 files changed, 40 insertions(+), 52 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (d8e37a8 -> ffc61c6)
This is an automated email from the ASF dual-hosted git repository. ruifengz pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from d8e37a8 [SPARK-35100][ML][FOLLOWUP] AFT cleanup add ffc61c6 [SPARK-35619][ML] Refactor LinearRegression - make huber support virtual centering No new revisions were added by this update. Summary of changes: .../ml/optim/aggregator/HuberAggregator.scala | 250 ...Aggregator.scala => HuberBlockAggregator.scala} | 82 +++--- .../optim/aggregator/LeastSquaresAggregator.scala | 314 - .../aggregator/LeastSquaresBlockAggregator.scala | 128 + .../spark/ml/regression/LinearRegression.scala | 97 --- .../ml/optim/aggregator/HuberAggregatorSuite.scala | 197 - ...Suite.scala => HuberBlockAggregatorSuite.scala} | 113 +--- ...cala => LeastSquaresBlockAggregatorSuite.scala} | 135 - 8 files changed, 349 insertions(+), 967 deletions(-) delete mode 100644 mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/HuberAggregator.scala copy mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/{HingeBlockAggregator.scala => HuberBlockAggregator.scala} (70%) delete mode 100644 mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/LeastSquaresAggregator.scala create mode 100644 mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/LeastSquaresBlockAggregator.scala delete mode 100644 mllib/src/test/scala/org/apache/spark/ml/optim/aggregator/HuberAggregatorSuite.scala copy mllib/src/test/scala/org/apache/spark/ml/optim/aggregator/{HingeBlockAggregatorSuite.scala => HuberBlockAggregatorSuite.scala} (68%) rename mllib/src/test/scala/org/apache/spark/ml/optim/aggregator/{LeastSquaresAggregatorSuite.scala => LeastSquaresBlockAggregatorSuite.scala} (51%) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (50f7686 -> d8e37a8)
This is an automated email from the ASF dual-hosted git repository. ruifengz pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 50f7686 [SPARK-35599][PYTHON] Adjust `check_exact` parameter for older pd.testing add d8e37a8 [SPARK-35100][ML][FOLLOWUP] AFT cleanup No new revisions were added by this update. Summary of changes: .../apache/spark/ml/optim/aggregator/AFTBlockAggregator.scala | 11 +-- 1 file changed, 5 insertions(+), 6 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (dbf0b50 -> c2de0a6)
This is an automated email from the ASF dual-hosted git repository. ruifengz pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from dbf0b50 [SPARK-35560][SQL] Remove redundant subexpression evaluation in nested subexpressions add c2de0a6 [SPARK-35100][ML] Refactor AFT - support virtual centering No new revisions were added by this update. Summary of changes: .../spark/ml/optim/aggregator/AFTAggregator.scala | 256 - ...ckAggregator.scala => AFTBlockAggregator.scala} | 117 -- .../ml/regression/AFTSurvivalRegression.scala | 52 +++-- 3 files changed, 87 insertions(+), 338 deletions(-) delete mode 100644 mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/AFTAggregator.scala copy mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/{HingeBlockAggregator.scala => AFTBlockAggregator.scala} (52%) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (bcac733 -> 1f150b9)
This is an automated email from the ASF dual-hosted git repository. ruifengz pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from bcac733 [SPARK-35200][CORE] Avoid to recompute the pending speculative tasks in the ExecutorAllocationManager and remove some unnecessary code add 1f150b9 [SPARK-35024][ML] Refactor LinearSVC - support virtual centering No new revisions were added by this update. Summary of changes: R/pkg/tests/fulltests/test_mllib_classification.R | 6 +- .../apache/spark/ml/classification/LinearSVC.scala | 46 - .../ml/classification/LogisticRegression.scala | 10 +- .../aggregator/BinaryLogisticBlockAggregator.scala | 6 +- .../ml/optim/aggregator/HingeAggregator.scala | 212 - ...Aggregator.scala => HingeBlockAggregator.scala} | 70 +++ .../MultinomialLogisticBlockAggregator.scala | 2 +- .../spark/ml/classification/LinearSVCSuite.scala | 36 +--- .../ml/optim/aggregator/HingeAggregatorSuite.scala | 189 -- ...Suite.scala => HingeBlockAggregatorSuite.scala} | 109 --- python/pyspark/ml/classification.py| 8 +- python/pyspark/ml/tests/test_training_summary.py | 12 +- 12 files changed, 131 insertions(+), 575 deletions(-) delete mode 100644 mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/HingeAggregator.scala copy mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/{BinaryLogisticBlockAggregator.scala => HingeBlockAggregator.scala} (69%) delete mode 100644 mllib/src/test/scala/org/apache/spark/ml/optim/aggregator/HingeAggregatorSuite.scala copy mllib/src/test/scala/org/apache/spark/ml/optim/aggregator/{BinaryLogisticBlockAggregatorSuite.scala => HingeBlockAggregatorSuite.scala} (70%) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org