[spark] branch master updated: [SPARK-42484][SQL] UnsafeRowUtils better error message
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new eb24dbd305b [SPARK-42484][SQL] UnsafeRowUtils better error message eb24dbd305b is described below commit eb24dbd305b9e46382d49d43202d0c3a89fd324f Author: Wei Liu AuthorDate: Thu Feb 23 14:08:37 2023 +0800 [SPARK-42484][SQL] UnsafeRowUtils better error message ### What changes were proposed in this pull request? Showing the essential information when throwing `InvalidUnsafeRowException`. Including where the check failed, and status of the `unsafeRow` and `expctedSchema` Example output: ``` [UnsafeRowStatus] expectedSchema: StructType(StructField(key1,IntegerType,false),StructField(key2,IntegerType,false),StructField(sum(key1),IntegerType,false),StructField(sum(key2),IntegerType,false)), expectedSchemaNumFields: 4, numFields: 4, bitSetWidthInBytes: 8, rowSizeInBytes: 40 fieldStatus: [UnsafeRowFieldStatus] index: 0, expectedFieldType: IntegerType, isNull: false, isFixedLength: true, offset: -1, size: -1 [UnsafeRowFieldStatus] index: 1, expectedFieldType: IntegerType, isNull: false, isFixedLength: true, offset: -1, size: -1 [UnsafeRowFieldStatus] index: 2, expectedFieldType: IntegerType, isNull: false, isFixedLength: true, offset: -1, size: -1 [UnsafeRowFieldStatus] index: 3, expectedFieldType: IntegerType, isNull: false, isFixedLength: true, offset: -1, size: -1 ``` ### Why are the changes needed? Right now if such error happens, it's hard to track where it errored, and what the misbehaved row & schema looks like. With this change these information are more clear. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Unit tests Closes #40073 from WweiL/SPARK-42484-better-log-unsaferowUtil. Authored-by: Wei Liu Signed-off-by: Wenchen Fan --- .../spark/sql/catalyst/util/UnsafeRowUtils.scala | 83 ++ .../sql/catalyst/util/UnsafeRowUtilsSuite.scala| 26 --- .../sql/execution/streaming/state/StateStore.scala | 18 ++--- .../spark/sql/hive/execution/SQLQuerySuite.scala | 6 +- 4 files changed, 101 insertions(+), 32 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/UnsafeRowUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/UnsafeRowUtils.scala index 2791f404813..81b06cb466c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/UnsafeRowUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/UnsafeRowUtils.scala @@ -46,15 +46,21 @@ object UnsafeRowUtils { * check if the unused bits in the field are all zeros. The UnsafeRowWriter's write() methods * make this guarantee. * - Check the total length of the row. + * @param row The input UnsafeRow to be validated + * @param expectedSchema The expected schema that should match with the UnsafeRow + * @return None if all the checks pass. An error message if the row is not matched with the schema */ - def validateStructuralIntegrity(row: UnsafeRow, expectedSchema: StructType): Boolean = { + private def validateStructuralIntegrityWithReasonImpl( + row: UnsafeRow, expectedSchema: StructType): Option[String] = { if (expectedSchema.fields.length != row.numFields) { - return false + return Some(s"Field length mismatch: " + +s"expected: ${expectedSchema.fields.length}, actual: ${row.numFields}") } val bitSetWidthInBytes = UnsafeRow.calculateBitSetWidthInBytes(row.numFields) val rowSizeInBytes = row.getSizeInBytes if (expectedSchema.fields.length > 0 && bitSetWidthInBytes >= rowSizeInBytes) { - return false + return Some(s"rowSizeInBytes should not exceed bitSetWidthInBytes, " + +s"bitSetWidthInBytes: $bitSetWidthInBytes, rowSizeInBytes: $rowSizeInBytes") } var varLenFieldsSizeInBytes = 0 expectedSchema.fields.zipWithIndex.foreach { @@ -62,21 +68,31 @@ object UnsafeRowUtils { val (offset, size) = getOffsetAndSize(row, index) if (size < 0 || offset < bitSetWidthInBytes + 8 * row.numFields || offset + size > rowSizeInBytes) { - return false + return Some(s"Variable-length field validation error: field: $field, index: $index") } varLenFieldsSizeInBytes += size case (field, index) if UnsafeRow.isFixedLength(field.dataType) && !row.isNullAt(index) => field.dataType match { case BooleanType => -if ((row.getLong(index) >> 1) != 0L) return false +if ((row.getLong(index) >> 1) != 0L) { + return Some(s"Fixed-length field validation error: field: $field, index:
[spark] branch branch-3.4 updated: [SPARK-42529][CONNECT] Support Cube and Rollup in Scala client
This is an automated email from the ASF dual-hosted git repository. hvanhovell pushed a commit to branch branch-3.4 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.4 by this push: new 314e25f4282 [SPARK-42529][CONNECT] Support Cube and Rollup in Scala client 314e25f4282 is described below commit 314e25f4282fe220c570cbdba26e0d6ba2dad005 Author: Rui Wang AuthorDate: Wed Feb 22 23:56:38 2023 -0400 [SPARK-42529][CONNECT] Support Cube and Rollup in Scala client ### What changes were proposed in this pull request? Support Cube and Rollup in Scala client. ### Why are the changes needed? API Coverage ### Does this PR introduce _any_ user-facing change? NO ### How was this patch tested? UT Closes #40129 from amaliujia/support_cube_rollup_pivot. Authored-by: Rui Wang Signed-off-by: Herman van Hovell (cherry picked from commit 21767d29b36c3c8d812bb3ea8946a21a8ef6e65c) Signed-off-by: Herman van Hovell --- .../main/scala/org/apache/spark/sql/Dataset.scala | 120 - .../spark/sql/RelationalGroupedDataset.scala | 16 ++- .../apache/spark/sql/PlanGenerationTestSuite.scala | 16 +++ .../explain-results/cube_column.explain| 4 + .../explain-results/cube_string.explain| 4 + .../explain-results/rollup_column.explain | 4 + .../explain-results/rollup_string.explain | 4 + .../resources/query-tests/queries/cube_column.json | 34 ++ .../query-tests/queries/cube_column.proto.bin | 7 ++ .../resources/query-tests/queries/cube_string.json | 34 ++ .../query-tests/queries/cube_string.proto.bin | 7 ++ .../query-tests/queries/rollup_column.json | 34 ++ .../query-tests/queries/rollup_column.proto.bin| 7 ++ .../query-tests/queries/rollup_string.json | 34 ++ .../query-tests/queries/rollup_string.proto.bin| 7 ++ 15 files changed, 328 insertions(+), 4 deletions(-) diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala index c7ded04a963..560276d154e 100644 --- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -1055,7 +1055,125 @@ class Dataset[T] private[sql] (val session: SparkSession, private[sql] val plan: */ @scala.annotation.varargs def groupBy(cols: Column*): RelationalGroupedDataset = { -new RelationalGroupedDataset(toDF(), cols.map(_.expr)) +new RelationalGroupedDataset( + toDF(), + cols.map(_.expr), + proto.Aggregate.GroupType.GROUP_TYPE_GROUPBY) + } + + /** + * Create a multi-dimensional rollup for the current Dataset using the specified columns, so we + * can run aggregation on them. See [[RelationalGroupedDataset]] for all the available aggregate + * functions. + * + * {{{ + * // Compute the average for all numeric columns rolled up by department and group. + * ds.rollup($"department", $"group").avg() + * + * // Compute the max age and average salary, rolled up by department and gender. + * ds.rollup($"department", $"gender").agg(Map( + * "salary" -> "avg", + * "age" -> "max" + * )) + * }}} + * + * @group untypedrel + * @since 3.4.0 + */ + @scala.annotation.varargs + def rollup(cols: Column*): RelationalGroupedDataset = { +new RelationalGroupedDataset( + toDF(), + cols.map(_.expr), + proto.Aggregate.GroupType.GROUP_TYPE_ROLLUP) + } + + /** + * Create a multi-dimensional rollup for the current Dataset using the specified columns, so we + * can run aggregation on them. See [[RelationalGroupedDataset]] for all the available aggregate + * functions. + * + * This is a variant of rollup that can only group by existing columns using column names (i.e. + * cannot construct expressions). + * + * {{{ + * // Compute the average for all numeric columns rolled up by department and group. + * ds.rollup("department", "group").avg() + * + * // Compute the max age and average salary, rolled up by department and gender. + * ds.rollup($"department", $"gender").agg(Map( + * "salary" -> "avg", + * "age" -> "max" + * )) + * }}} + * + * @group untypedrel + * @since 3.4.0 + */ + @scala.annotation.varargs + def rollup(col1: String, cols: String*): RelationalGroupedDataset = { +val colNames: Seq[String] = col1 +: cols +new RelationalGroupedDataset( + toDF(), + colNames.map(colName => Column(colName).expr), + proto.Aggregate.GroupType.GROUP_TYPE_ROLLUP) + } + + /** + * Create a multi-dimensional cube for the current Dataset using the specified columns, so we + * can run
[spark] branch master updated: [SPARK-42529][CONNECT] Support Cube and Rollup in Scala client
This is an automated email from the ASF dual-hosted git repository. hvanhovell pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 21767d29b36 [SPARK-42529][CONNECT] Support Cube and Rollup in Scala client 21767d29b36 is described below commit 21767d29b36c3c8d812bb3ea8946a21a8ef6e65c Author: Rui Wang AuthorDate: Wed Feb 22 23:56:38 2023 -0400 [SPARK-42529][CONNECT] Support Cube and Rollup in Scala client ### What changes were proposed in this pull request? Support Cube and Rollup in Scala client. ### Why are the changes needed? API Coverage ### Does this PR introduce _any_ user-facing change? NO ### How was this patch tested? UT Closes #40129 from amaliujia/support_cube_rollup_pivot. Authored-by: Rui Wang Signed-off-by: Herman van Hovell --- .../main/scala/org/apache/spark/sql/Dataset.scala | 120 - .../spark/sql/RelationalGroupedDataset.scala | 16 ++- .../apache/spark/sql/PlanGenerationTestSuite.scala | 16 +++ .../explain-results/cube_column.explain| 4 + .../explain-results/cube_string.explain| 4 + .../explain-results/rollup_column.explain | 4 + .../explain-results/rollup_string.explain | 4 + .../resources/query-tests/queries/cube_column.json | 34 ++ .../query-tests/queries/cube_column.proto.bin | 7 ++ .../resources/query-tests/queries/cube_string.json | 34 ++ .../query-tests/queries/cube_string.proto.bin | 7 ++ .../query-tests/queries/rollup_column.json | 34 ++ .../query-tests/queries/rollup_column.proto.bin| 7 ++ .../query-tests/queries/rollup_string.json | 34 ++ .../query-tests/queries/rollup_string.proto.bin| 7 ++ 15 files changed, 328 insertions(+), 4 deletions(-) diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala index c7ded04a963..560276d154e 100644 --- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -1055,7 +1055,125 @@ class Dataset[T] private[sql] (val session: SparkSession, private[sql] val plan: */ @scala.annotation.varargs def groupBy(cols: Column*): RelationalGroupedDataset = { -new RelationalGroupedDataset(toDF(), cols.map(_.expr)) +new RelationalGroupedDataset( + toDF(), + cols.map(_.expr), + proto.Aggregate.GroupType.GROUP_TYPE_GROUPBY) + } + + /** + * Create a multi-dimensional rollup for the current Dataset using the specified columns, so we + * can run aggregation on them. See [[RelationalGroupedDataset]] for all the available aggregate + * functions. + * + * {{{ + * // Compute the average for all numeric columns rolled up by department and group. + * ds.rollup($"department", $"group").avg() + * + * // Compute the max age and average salary, rolled up by department and gender. + * ds.rollup($"department", $"gender").agg(Map( + * "salary" -> "avg", + * "age" -> "max" + * )) + * }}} + * + * @group untypedrel + * @since 3.4.0 + */ + @scala.annotation.varargs + def rollup(cols: Column*): RelationalGroupedDataset = { +new RelationalGroupedDataset( + toDF(), + cols.map(_.expr), + proto.Aggregate.GroupType.GROUP_TYPE_ROLLUP) + } + + /** + * Create a multi-dimensional rollup for the current Dataset using the specified columns, so we + * can run aggregation on them. See [[RelationalGroupedDataset]] for all the available aggregate + * functions. + * + * This is a variant of rollup that can only group by existing columns using column names (i.e. + * cannot construct expressions). + * + * {{{ + * // Compute the average for all numeric columns rolled up by department and group. + * ds.rollup("department", "group").avg() + * + * // Compute the max age and average salary, rolled up by department and gender. + * ds.rollup($"department", $"gender").agg(Map( + * "salary" -> "avg", + * "age" -> "max" + * )) + * }}} + * + * @group untypedrel + * @since 3.4.0 + */ + @scala.annotation.varargs + def rollup(col1: String, cols: String*): RelationalGroupedDataset = { +val colNames: Seq[String] = col1 +: cols +new RelationalGroupedDataset( + toDF(), + colNames.map(colName => Column(colName).expr), + proto.Aggregate.GroupType.GROUP_TYPE_ROLLUP) + } + + /** + * Create a multi-dimensional cube for the current Dataset using the specified columns, so we + * can run aggregation on them. See [[RelationalGroupedDataset]] for all the available aggregate + * functions. + * + * {{{ +
[spark] branch branch-3.4 updated: [SPARK-42530][PYSPARK][DOCS] Remove Hadoop 2 from PySpark installation guide
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch branch-3.4 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.4 by this push: new 3477d14d802 [SPARK-42530][PYSPARK][DOCS] Remove Hadoop 2 from PySpark installation guide 3477d14d802 is described below commit 3477d14d802a0b45970f2f99330dd4ddb9e6fefc Author: Dongjoon Hyun AuthorDate: Wed Feb 22 17:09:17 2023 -0800 [SPARK-42530][PYSPARK][DOCS] Remove Hadoop 2 from PySpark installation guide ### What changes were proposed in this pull request? This PR aims to remove `Hadoop 2` from PySpark installation guide. ### Why are the changes needed? From Apache Spark 3.4.0, we don't provide Hadoop 2 binaries. ### Does this PR introduce _any_ user-facing change? This is a documentation fix to be consistent with the new availability. ### How was this patch tested? Manual review. Closes #40127 from dongjoon-hyun/SPARK-42530. Authored-by: Dongjoon Hyun Signed-off-by: Dongjoon Hyun (cherry picked from commit 295617c5d8913fc1afc78fa9647d2f99b925ceaf) Signed-off-by: Dongjoon Hyun --- python/docs/source/getting_started/install.rst | 11 +-- 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/python/docs/source/getting_started/install.rst b/python/docs/source/getting_started/install.rst index be2a1eae66d..3db6b278403 100644 --- a/python/docs/source/getting_started/install.rst +++ b/python/docs/source/getting_started/install.rst @@ -57,7 +57,7 @@ For PySpark with/without a specific Hadoop version, you can install it by using .. code-block:: bash -PYSPARK_HADOOP_VERSION=2 pip install pyspark +PYSPARK_HADOOP_VERSION=3 pip install pyspark The default distribution uses Hadoop 3.3 and Hive 2.3. If users specify different versions of Hadoop, the pip installation automatically downloads a different version and uses it in PySpark. Downloading it can take a while depending on @@ -65,18 +65,17 @@ the network and the mirror chosen. ``PYSPARK_RELEASE_MIRROR`` can be set to manu .. code-block:: bash -PYSPARK_RELEASE_MIRROR=http://mirror.apache-kr.org PYSPARK_HADOOP_VERSION=2 pip install +PYSPARK_RELEASE_MIRROR=http://mirror.apache-kr.org PYSPARK_HADOOP_VERSION=3 pip install It is recommended to use ``-v`` option in ``pip`` to track the installation and download status. .. code-block:: bash -PYSPARK_HADOOP_VERSION=2 pip install pyspark -v +PYSPARK_HADOOP_VERSION=3 pip install pyspark -v Supported values in ``PYSPARK_HADOOP_VERSION`` are: - ``without``: Spark pre-built with user-provided Apache Hadoop -- ``2``: Spark pre-built for Apache Hadoop 2.7 - ``3``: Spark pre-built for Apache Hadoop 3.3 and later (default) Note that this installation of PySpark with/without a specific Hadoop version is experimental. It can change or be removed between minor releases. @@ -132,7 +131,7 @@ to install Spark, for example, as below: .. code-block:: bash -tar xzvf spark-3.3.0-bin-hadoop3.tgz +tar xzvf spark-3.4.0-bin-hadoop3.tgz Ensure the ``SPARK_HOME`` environment variable points to the directory where the tar file has been extracted. Update ``PYTHONPATH`` environment variable such that it can find the PySpark and Py4J under ``SPARK_HOME/python/lib``. @@ -140,7 +139,7 @@ One example of doing this is shown below: .. code-block:: bash -cd spark-3.3.0-bin-hadoop3 +cd spark-3.4.0-bin-hadoop3 export SPARK_HOME=`pwd` export PYTHONPATH=$(ZIPS=("$SPARK_HOME"/python/lib/*.zip); IFS=:; echo "${ZIPS[*]}"):$PYTHONPATH - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-42530][PYSPARK][DOCS] Remove Hadoop 2 from PySpark installation guide
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 295617c5d89 [SPARK-42530][PYSPARK][DOCS] Remove Hadoop 2 from PySpark installation guide 295617c5d89 is described below commit 295617c5d8913fc1afc78fa9647d2f99b925ceaf Author: Dongjoon Hyun AuthorDate: Wed Feb 22 17:09:17 2023 -0800 [SPARK-42530][PYSPARK][DOCS] Remove Hadoop 2 from PySpark installation guide ### What changes were proposed in this pull request? This PR aims to remove `Hadoop 2` from PySpark installation guide. ### Why are the changes needed? From Apache Spark 3.4.0, we don't provide Hadoop 2 binaries. ### Does this PR introduce _any_ user-facing change? This is a documentation fix to be consistent with the new availability. ### How was this patch tested? Manual review. Closes #40127 from dongjoon-hyun/SPARK-42530. Authored-by: Dongjoon Hyun Signed-off-by: Dongjoon Hyun --- python/docs/source/getting_started/install.rst | 11 +-- 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/python/docs/source/getting_started/install.rst b/python/docs/source/getting_started/install.rst index be2a1eae66d..3db6b278403 100644 --- a/python/docs/source/getting_started/install.rst +++ b/python/docs/source/getting_started/install.rst @@ -57,7 +57,7 @@ For PySpark with/without a specific Hadoop version, you can install it by using .. code-block:: bash -PYSPARK_HADOOP_VERSION=2 pip install pyspark +PYSPARK_HADOOP_VERSION=3 pip install pyspark The default distribution uses Hadoop 3.3 and Hive 2.3. If users specify different versions of Hadoop, the pip installation automatically downloads a different version and uses it in PySpark. Downloading it can take a while depending on @@ -65,18 +65,17 @@ the network and the mirror chosen. ``PYSPARK_RELEASE_MIRROR`` can be set to manu .. code-block:: bash -PYSPARK_RELEASE_MIRROR=http://mirror.apache-kr.org PYSPARK_HADOOP_VERSION=2 pip install +PYSPARK_RELEASE_MIRROR=http://mirror.apache-kr.org PYSPARK_HADOOP_VERSION=3 pip install It is recommended to use ``-v`` option in ``pip`` to track the installation and download status. .. code-block:: bash -PYSPARK_HADOOP_VERSION=2 pip install pyspark -v +PYSPARK_HADOOP_VERSION=3 pip install pyspark -v Supported values in ``PYSPARK_HADOOP_VERSION`` are: - ``without``: Spark pre-built with user-provided Apache Hadoop -- ``2``: Spark pre-built for Apache Hadoop 2.7 - ``3``: Spark pre-built for Apache Hadoop 3.3 and later (default) Note that this installation of PySpark with/without a specific Hadoop version is experimental. It can change or be removed between minor releases. @@ -132,7 +131,7 @@ to install Spark, for example, as below: .. code-block:: bash -tar xzvf spark-3.3.0-bin-hadoop3.tgz +tar xzvf spark-3.4.0-bin-hadoop3.tgz Ensure the ``SPARK_HOME`` environment variable points to the directory where the tar file has been extracted. Update ``PYTHONPATH`` environment variable such that it can find the PySpark and Py4J under ``SPARK_HOME/python/lib``. @@ -140,7 +139,7 @@ One example of doing this is shown below: .. code-block:: bash -cd spark-3.3.0-bin-hadoop3 +cd spark-3.4.0-bin-hadoop3 export SPARK_HOME=`pwd` export PYTHONPATH=$(ZIPS=("$SPARK_HOME"/python/lib/*.zip); IFS=:; echo "${ZIPS[*]}"):$PYTHONPATH - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-3.4 updated: [SPARK-42532][K8S][DOCS] Update YuniKorn docs with v1.2
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch branch-3.4 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.4 by this push: new 50f2ff25dcf [SPARK-42532][K8S][DOCS] Update YuniKorn docs with v1.2 50f2ff25dcf is described below commit 50f2ff25dcfcc0d2f0d4ad98f90e4ba68fedc04e Author: Dongjoon Hyun AuthorDate: Wed Feb 22 15:02:21 2023 -0800 [SPARK-42532][K8S][DOCS] Update YuniKorn docs with v1.2 ### What changes were proposed in this pull request? This PR aims to update `YuniKorn` documentation with the latest v1.2.0 and fix codify issues in doc. ### Why are the changes needed? - https://yunikorn.apache.org/release-announce/1.2.0 ### Does this PR introduce _any_ user-facing change? This is a documentation-only change. **BEFORE** - https://dist.apache.org/repos/dist/dev/spark/v3.4.0-rc1-docs/_site/running-on-kubernetes.html#using-apache-yunikorn-as-customized-scheduler-for-spark-on-kubernetes **AFTER** https://user-images.githubusercontent.com/9700541/220775386-90268ecb-facf-4701-bcb7-4f6b3e847e70.png;> ### How was this patch tested? Manually test with YuniKorn v1.2.0. ``` $ helm list -n yunikorn NAMENAMESPACE REVISIONUPDATED STATUS CHART APP VERSION yunikornyunikorn1 2023-02-22 14:01:11.728926 -0800 PSTdeployedyunikorn-1.2.0 ``` ``` $ build/sbt -Psparkr -Pkubernetes -Pkubernetes-integration-tests -Dspark.kubernetes.test.deployMode=docker-desktop "kubernetes-integration-tests/test" -Dtest.exclude.tags=minikube,local,decom -Dtest.default.exclude.tags='' [info] KubernetesSuite: [info] - SPARK-42190: Run SparkPi with local[*] (10 seconds, 832 milliseconds) [info] - Run SparkPi with no resources (12 seconds, 421 milliseconds) [info] - Run SparkPi with no resources & statefulset allocation (17 seconds, 861 milliseconds) [info] - Run SparkPi with a very long application name. (12 seconds, 531 milliseconds) [info] - Use SparkLauncher.NO_RESOURCE (17 seconds, 697 milliseconds) [info] - Run SparkPi with a master URL without a scheme. (12 seconds, 499 milliseconds) [info] - Run SparkPi with an argument. (18 seconds, 734 milliseconds) [info] - Run SparkPi with custom labels, annotations, and environment variables. (12 seconds, 520 milliseconds) [info] - All pods have the same service account by default (17 seconds, 504 milliseconds) [info] - Run extraJVMOptions check on driver (9 seconds, 402 milliseconds) [info] - SPARK-42474: Run extraJVMOptions JVM GC option check - G1GC (9 seconds, 389 milliseconds) [info] - SPARK-42474: Run extraJVMOptions JVM GC option check - Other GC (9 seconds, 330 milliseconds) [info] - Verify logging configuration is picked from the provided SPARK_CONF_DIR/log4j2.properties (17 seconds, 710 milliseconds) [info] - Run SparkPi with env and mount secrets. (19 seconds, 797 milliseconds) [info] - Run PySpark on simple pi.py example (18 seconds, 568 milliseconds) [info] - Run PySpark to test a pyfiles example (15 seconds, 622 milliseconds) [info] - Run PySpark with memory customization (18 seconds, 507 milliseconds) [info] - Run in client mode. (6 seconds, 185 milliseconds) [info] - Start pod creation from template (17 seconds, 696 milliseconds) [info] - SPARK-38398: Schedule pod creation from template (12 seconds, 585 milliseconds) [info] - Run SparkR on simple dataframe.R example (19 seconds, 639 milliseconds) [info] YuniKornSuite: [info] - SPARK-42190: Run SparkPi with local[*] (12 seconds, 421 milliseconds) [info] - Run SparkPi with no resources (20 seconds, 465 milliseconds) [info] - Run SparkPi with no resources & statefulset allocation (15 seconds, 516 milliseconds) [info] - Run SparkPi with a very long application name. (20 seconds, 532 milliseconds) [info] - Use SparkLauncher.NO_RESOURCE (15 seconds, 545 milliseconds) [info] - Run SparkPi with a master URL without a scheme. (20 seconds, 575 milliseconds) [info] - Run SparkPi with an argument. (16 seconds, 462 milliseconds) [info] - Run SparkPi with custom labels, annotations, and environment variables. (20 seconds, 568 milliseconds) [info] - All pods have the same service account by default (15 seconds, 630 milliseconds) [info] - Run extraJVMOptions check on driver (12 seconds, 483 milliseconds) [info] - SPARK-42474: Run extraJVMOptions JVM GC option check - G1GC (12 seconds, 665 milliseconds) [info] - SPARK-42474: Run extraJVMOptions JVM GC option check - Other GC (11 seconds, 615 milliseconds) [info] - Verify logging configuration is picked from the provided
[spark] branch master updated: [SPARK-42532][K8S][DOCS] Update YuniKorn docs with v1.2
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 5b1c45eedae [SPARK-42532][K8S][DOCS] Update YuniKorn docs with v1.2 5b1c45eedae is described below commit 5b1c45eedaed0138afb260019db800b637c3b135 Author: Dongjoon Hyun AuthorDate: Wed Feb 22 15:02:21 2023 -0800 [SPARK-42532][K8S][DOCS] Update YuniKorn docs with v1.2 ### What changes were proposed in this pull request? This PR aims to update `YuniKorn` documentation with the latest v1.2.0 and fix codify issues in doc. ### Why are the changes needed? - https://yunikorn.apache.org/release-announce/1.2.0 ### Does this PR introduce _any_ user-facing change? This is a documentation-only change. **BEFORE** - https://dist.apache.org/repos/dist/dev/spark/v3.4.0-rc1-docs/_site/running-on-kubernetes.html#using-apache-yunikorn-as-customized-scheduler-for-spark-on-kubernetes **AFTER** https://user-images.githubusercontent.com/9700541/220775386-90268ecb-facf-4701-bcb7-4f6b3e847e70.png;> ### How was this patch tested? Manually test with YuniKorn v1.2.0. ``` $ helm list -n yunikorn NAMENAMESPACE REVISIONUPDATED STATUS CHART APP VERSION yunikornyunikorn1 2023-02-22 14:01:11.728926 -0800 PSTdeployedyunikorn-1.2.0 ``` ``` $ build/sbt -Psparkr -Pkubernetes -Pkubernetes-integration-tests -Dspark.kubernetes.test.deployMode=docker-desktop "kubernetes-integration-tests/test" -Dtest.exclude.tags=minikube,local,decom -Dtest.default.exclude.tags='' [info] KubernetesSuite: [info] - SPARK-42190: Run SparkPi with local[*] (10 seconds, 832 milliseconds) [info] - Run SparkPi with no resources (12 seconds, 421 milliseconds) [info] - Run SparkPi with no resources & statefulset allocation (17 seconds, 861 milliseconds) [info] - Run SparkPi with a very long application name. (12 seconds, 531 milliseconds) [info] - Use SparkLauncher.NO_RESOURCE (17 seconds, 697 milliseconds) [info] - Run SparkPi with a master URL without a scheme. (12 seconds, 499 milliseconds) [info] - Run SparkPi with an argument. (18 seconds, 734 milliseconds) [info] - Run SparkPi with custom labels, annotations, and environment variables. (12 seconds, 520 milliseconds) [info] - All pods have the same service account by default (17 seconds, 504 milliseconds) [info] - Run extraJVMOptions check on driver (9 seconds, 402 milliseconds) [info] - SPARK-42474: Run extraJVMOptions JVM GC option check - G1GC (9 seconds, 389 milliseconds) [info] - SPARK-42474: Run extraJVMOptions JVM GC option check - Other GC (9 seconds, 330 milliseconds) [info] - Verify logging configuration is picked from the provided SPARK_CONF_DIR/log4j2.properties (17 seconds, 710 milliseconds) [info] - Run SparkPi with env and mount secrets. (19 seconds, 797 milliseconds) [info] - Run PySpark on simple pi.py example (18 seconds, 568 milliseconds) [info] - Run PySpark to test a pyfiles example (15 seconds, 622 milliseconds) [info] - Run PySpark with memory customization (18 seconds, 507 milliseconds) [info] - Run in client mode. (6 seconds, 185 milliseconds) [info] - Start pod creation from template (17 seconds, 696 milliseconds) [info] - SPARK-38398: Schedule pod creation from template (12 seconds, 585 milliseconds) [info] - Run SparkR on simple dataframe.R example (19 seconds, 639 milliseconds) [info] YuniKornSuite: [info] - SPARK-42190: Run SparkPi with local[*] (12 seconds, 421 milliseconds) [info] - Run SparkPi with no resources (20 seconds, 465 milliseconds) [info] - Run SparkPi with no resources & statefulset allocation (15 seconds, 516 milliseconds) [info] - Run SparkPi with a very long application name. (20 seconds, 532 milliseconds) [info] - Use SparkLauncher.NO_RESOURCE (15 seconds, 545 milliseconds) [info] - Run SparkPi with a master URL without a scheme. (20 seconds, 575 milliseconds) [info] - Run SparkPi with an argument. (16 seconds, 462 milliseconds) [info] - Run SparkPi with custom labels, annotations, and environment variables. (20 seconds, 568 milliseconds) [info] - All pods have the same service account by default (15 seconds, 630 milliseconds) [info] - Run extraJVMOptions check on driver (12 seconds, 483 milliseconds) [info] - SPARK-42474: Run extraJVMOptions JVM GC option check - G1GC (12 seconds, 665 milliseconds) [info] - SPARK-42474: Run extraJVMOptions JVM GC option check - Other GC (11 seconds, 615 milliseconds) [info] - Verify logging configuration is picked from the provided
[spark] branch branch-3.4 updated: [SPARK-42150][K8S][DOCS][FOLLOWUP] Use v1.7.0 in docs
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch branch-3.4 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.4 by this push: new 6b5885b27db [SPARK-42150][K8S][DOCS][FOLLOWUP] Use v1.7.0 in docs 6b5885b27db is described below commit 6b5885b27db60463b4de88b6f3259bb616b7e36b Author: Dongjoon Hyun AuthorDate: Wed Feb 22 14:14:22 2023 -0800 [SPARK-42150][K8S][DOCS][FOLLOWUP] Use v1.7.0 in docs ### What changes were proposed in this pull request? This is a follow-up of #39690. ### Why are the changes needed? To be consistent across multiple docs. ### Does this PR introduce _any_ user-facing change? No, this is a doc-only change. ### How was this patch tested? Manual review. Closes #40131 from dongjoon-hyun/SPARK-42150-2. Authored-by: Dongjoon Hyun Signed-off-by: Dongjoon Hyun (cherry picked from commit 70e69892b55d09064c760b92ba941289f9def005) Signed-off-by: Dongjoon Hyun --- docs/running-on-kubernetes.md| 8 ++-- resource-managers/kubernetes/integration-tests/README.md | 2 +- 2 files changed, 3 insertions(+), 7 deletions(-) diff --git a/docs/running-on-kubernetes.md b/docs/running-on-kubernetes.md index 9174697ca97..4f2647d3e06 100644 --- a/docs/running-on-kubernetes.md +++ b/docs/running-on-kubernetes.md @@ -1848,14 +1848,10 @@ Spark allows users to specify a custom Kubernetes schedulers. **This feature is currently experimental. In future versions, there may be behavioral changes around configuration, feature step improvement.** # Prerequisites -* Spark on Kubernetes with [Volcano](https://volcano.sh/en) as a custom scheduler is supported since Spark v3.3.0 and Volcano v1.5.1. Below is an example to install Volcano 1.5.1: +* Spark on Kubernetes with [Volcano](https://volcano.sh/en) as a custom scheduler is supported since Spark v3.3.0 and Volcano v1.7.0. Below is an example to install Volcano 1.7.0: ```bash - # x86_64 - kubectl apply -f https://raw.githubusercontent.com/volcano-sh/volcano/v1.5.1/installer/volcano-development.yaml - - # arm64: - kubectl apply -f https://raw.githubusercontent.com/volcano-sh/volcano/v1.5.1/installer/volcano-development-arm64.yaml + kubectl apply -f https://raw.githubusercontent.com/volcano-sh/volcano/v1.7.0/installer/volcano-development.yaml ``` # Build diff --git a/resource-managers/kubernetes/integration-tests/README.md b/resource-managers/kubernetes/integration-tests/README.md index c3bce2415e4..0a03be42b4b 100644 --- a/resource-managers/kubernetes/integration-tests/README.md +++ b/resource-managers/kubernetes/integration-tests/README.md @@ -335,7 +335,7 @@ Volcano integration is experimental in Aapche Spark 3.3.0 and the test coverage ## Requirements - A minimum of 6 CPUs and 9G of memory is required to complete all Volcano test cases. -- Volcano v1.5.1. +- Volcano v1.7.0. ## Installation - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-42150][K8S][DOCS][FOLLOWUP] Use v1.7.0 in docs
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 70e69892b55 [SPARK-42150][K8S][DOCS][FOLLOWUP] Use v1.7.0 in docs 70e69892b55 is described below commit 70e69892b55d09064c760b92ba941289f9def005 Author: Dongjoon Hyun AuthorDate: Wed Feb 22 14:14:22 2023 -0800 [SPARK-42150][K8S][DOCS][FOLLOWUP] Use v1.7.0 in docs ### What changes were proposed in this pull request? This is a follow-up of #39690. ### Why are the changes needed? To be consistent across multiple docs. ### Does this PR introduce _any_ user-facing change? No, this is a doc-only change. ### How was this patch tested? Manual review. Closes #40131 from dongjoon-hyun/SPARK-42150-2. Authored-by: Dongjoon Hyun Signed-off-by: Dongjoon Hyun --- docs/running-on-kubernetes.md| 8 ++-- resource-managers/kubernetes/integration-tests/README.md | 2 +- 2 files changed, 3 insertions(+), 7 deletions(-) diff --git a/docs/running-on-kubernetes.md b/docs/running-on-kubernetes.md index 9174697ca97..4f2647d3e06 100644 --- a/docs/running-on-kubernetes.md +++ b/docs/running-on-kubernetes.md @@ -1848,14 +1848,10 @@ Spark allows users to specify a custom Kubernetes schedulers. **This feature is currently experimental. In future versions, there may be behavioral changes around configuration, feature step improvement.** # Prerequisites -* Spark on Kubernetes with [Volcano](https://volcano.sh/en) as a custom scheduler is supported since Spark v3.3.0 and Volcano v1.5.1. Below is an example to install Volcano 1.5.1: +* Spark on Kubernetes with [Volcano](https://volcano.sh/en) as a custom scheduler is supported since Spark v3.3.0 and Volcano v1.7.0. Below is an example to install Volcano 1.7.0: ```bash - # x86_64 - kubectl apply -f https://raw.githubusercontent.com/volcano-sh/volcano/v1.5.1/installer/volcano-development.yaml - - # arm64: - kubectl apply -f https://raw.githubusercontent.com/volcano-sh/volcano/v1.5.1/installer/volcano-development-arm64.yaml + kubectl apply -f https://raw.githubusercontent.com/volcano-sh/volcano/v1.7.0/installer/volcano-development.yaml ``` # Build diff --git a/resource-managers/kubernetes/integration-tests/README.md b/resource-managers/kubernetes/integration-tests/README.md index c3bce2415e4..0a03be42b4b 100644 --- a/resource-managers/kubernetes/integration-tests/README.md +++ b/resource-managers/kubernetes/integration-tests/README.md @@ -335,7 +335,7 @@ Volcano integration is experimental in Aapche Spark 3.3.0 and the test coverage ## Requirements - A minimum of 6 CPUs and 9G of memory is required to complete all Volcano test cases. -- Volcano v1.5.1. +- Volcano v1.7.0. ## Installation - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (dbe23c8e88d -> 1232309e44d)
This is an automated email from the ASF dual-hosted git repository. hvanhovell pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git from dbe23c8e88d [SPARK-42522][CONNECT] Fix DataFrameWriterV2 to find the default source add 1232309e44d [SPARK-42468][CONNECT][FOLLOW-UP] Add .agg variants in Dataset No new revisions were added by this update. Summary of changes: .../main/scala/org/apache/spark/sql/Dataset.scala | 55 ++ 1 file changed, 55 insertions(+) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-3.4 updated: [SPARK-42468][CONNECT][FOLLOW-UP] Add .agg variants in Dataset
This is an automated email from the ASF dual-hosted git repository. hvanhovell pushed a commit to branch branch-3.4 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.4 by this push: new 6998f973d10 [SPARK-42468][CONNECT][FOLLOW-UP] Add .agg variants in Dataset 6998f973d10 is described below commit 6998f973d10985a92a5e1b9119d8f20a935cb33f Author: Rui Wang AuthorDate: Wed Feb 22 16:54:59 2023 -0400 [SPARK-42468][CONNECT][FOLLOW-UP] Add .agg variants in Dataset ### What changes were proposed in this pull request? Add `.agg` in Dataset in Scala client. ### Why are the changes needed? API coverage. ### Does this PR introduce _any_ user-facing change? NO ### How was this patch tested? Existing UT Closes #40125 from amaliujia/rw_add_agg_dataset. Authored-by: Rui Wang Signed-off-by: Herman van Hovell (cherry picked from commit 1232309e44d8ed65528c2b29ee4087e4173a3e06) Signed-off-by: Herman van Hovell --- .../main/scala/org/apache/spark/sql/Dataset.scala | 55 ++ 1 file changed, 55 insertions(+) diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala index 33125e5fd87..c7ded04a963 100644 --- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -1058,6 +1058,61 @@ class Dataset[T] private[sql] (val session: SparkSession, private[sql] val plan: new RelationalGroupedDataset(toDF(), cols.map(_.expr)) } + /** + * (Scala-specific) Aggregates on the entire Dataset without groups. + * {{{ + * // ds.agg(...) is a shorthand for ds.groupBy().agg(...) + * ds.agg("age" -> "max", "salary" -> "avg") + * ds.groupBy().agg("age" -> "max", "salary" -> "avg") + * }}} + * + * @group untypedrel + * @since 3.4.0 + */ + def agg(aggExpr: (String, String), aggExprs: (String, String)*): DataFrame = { +groupBy().agg(aggExpr, aggExprs: _*) + } + + /** + * (Scala-specific) Aggregates on the entire Dataset without groups. + * {{{ + * // ds.agg(...) is a shorthand for ds.groupBy().agg(...) + * ds.agg(Map("age" -> "max", "salary" -> "avg")) + * ds.groupBy().agg(Map("age" -> "max", "salary" -> "avg")) + * }}} + * + * @group untypedrel + * @since 3.4.0 + */ + def agg(exprs: Map[String, String]): DataFrame = groupBy().agg(exprs) + + /** + * (Java-specific) Aggregates on the entire Dataset without groups. + * {{{ + * // ds.agg(...) is a shorthand for ds.groupBy().agg(...) + * ds.agg(Map("age" -> "max", "salary" -> "avg")) + * ds.groupBy().agg(Map("age" -> "max", "salary" -> "avg")) + * }}} + * + * @group untypedrel + * @since 3.4.0 + */ + def agg(exprs: java.util.Map[String, String]): DataFrame = groupBy().agg(exprs) + + /** + * Aggregates on the entire Dataset without groups. + * {{{ + * // ds.agg(...) is a shorthand for ds.groupBy().agg(...) + * ds.agg(max($"age"), avg($"salary")) + * ds.groupBy().agg(max($"age"), avg($"salary")) + * }}} + * + * @group untypedrel + * @since 3.4.0 + */ + @scala.annotation.varargs + def agg(expr: Column, exprs: Column*): DataFrame = groupBy().agg(expr, exprs: _*) + /** * Unpivot a DataFrame from wide format to long format, optionally leaving identifier columns * set. This is the reverse to `groupBy(...).pivot(...).agg(...)`, except for the aggregation, - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-3.4 updated: [SPARK-42522][CONNECT] Fix DataFrameWriterV2 to find the default source
This is an automated email from the ASF dual-hosted git repository. hvanhovell pushed a commit to branch branch-3.4 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.4 by this push: new 51e82756a9a [SPARK-42522][CONNECT] Fix DataFrameWriterV2 to find the default source 51e82756a9a is described below commit 51e82756a9af67c86e2d4dddc9e683c938c2b5ed Author: Takuya UESHIN AuthorDate: Wed Feb 22 16:53:06 2023 -0400 [SPARK-42522][CONNECT] Fix DataFrameWriterV2 to find the default source ### What changes were proposed in this pull request? Fixes `DataFrameWriterV2` to find the default source. ### Why are the changes needed? Currently `DataFrameWriterV2` in Spark Connect doesn't work without the provider with a weird error: For example: ```py df.writeTo("test_table").create() ``` ``` pyspark.errors.exceptions.connect.SparkConnectGrpcException: (org.apache.spark.SparkClassNotFoundException) [DATA_SOURCE_NOT_FOUND] Failed to find the data source: . Please find packages at `https://spark.apache.org/third-party-projects.html`. ``` ### Does this PR introduce _any_ user-facing change? Users will be able to use `DataFrameWriterV2` without provider as same as PySpark. ### How was this patch tested? Added some tests. Closes #40109 from ueshin/issues/SPARK-42522/writer_v2. Authored-by: Takuya UESHIN Signed-off-by: Herman van Hovell (cherry picked from commit dbe23c8e88d1a2968ae1c17ec9ee3029ef7a348a) Signed-off-by: Herman van Hovell --- .../src/main/protobuf/spark/connect/commands.proto | 2 +- .../spark/sql/connect/planner/SparkConnectPlanner.scala | 6 +++--- python/pyspark/sql/connect/proto/commands_pb2.py | 12 ++-- python/pyspark/sql/connect/proto/commands_pb2.pyi| 16 ++-- python/pyspark/sql/tests/test_readwriter.py | 12 5 files changed, 36 insertions(+), 12 deletions(-) diff --git a/connector/connect/common/src/main/protobuf/spark/connect/commands.proto b/connector/connect/common/src/main/protobuf/spark/connect/commands.proto index 7567b0e3d7c..1f2f473a050 100644 --- a/connector/connect/common/src/main/protobuf/spark/connect/commands.proto +++ b/connector/connect/common/src/main/protobuf/spark/connect/commands.proto @@ -128,7 +128,7 @@ message WriteOperationV2 { // (Optional) A provider for the underlying output data source. Spark's default catalog supports // "parquet", "json", etc. - string provider = 3; + optional string provider = 3; // (Optional) List of columns for partitioning for output table created by `create`, // `createOrReplace`, or `replace` diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala index a14d3632d28..268bf02fad9 100644 --- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala @@ -1614,7 +1614,7 @@ class SparkConnectPlanner(val session: SparkSession) { writeOperation.getMode match { case proto.WriteOperationV2.Mode.MODE_CREATE => -if (writeOperation.getProvider != null) { +if (writeOperation.hasProvider) { w.using(writeOperation.getProvider).create() } else { w.create() @@ -1626,13 +1626,13 @@ class SparkConnectPlanner(val session: SparkSession) { case proto.WriteOperationV2.Mode.MODE_APPEND => w.append() case proto.WriteOperationV2.Mode.MODE_REPLACE => -if (writeOperation.getProvider != null) { +if (writeOperation.hasProvider) { w.using(writeOperation.getProvider).replace() } else { w.replace() } case proto.WriteOperationV2.Mode.MODE_CREATE_OR_REPLACE => -if (writeOperation.getProvider != null) { +if (writeOperation.hasProvider) { w.using(writeOperation.getProvider).createOrReplace() } else { w.createOrReplace() diff --git a/python/pyspark/sql/connect/proto/commands_pb2.py b/python/pyspark/sql/connect/proto/commands_pb2.py index faa7dd65e2e..c8ade1ea81b 100644 --- a/python/pyspark/sql/connect/proto/commands_pb2.py +++ b/python/pyspark/sql/connect/proto/commands_pb2.py @@ -36,7 +36,7 @@ from pyspark.sql.connect.proto import types_pb2 as spark_dot_connect_dot_types__ DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile( -
[spark] branch master updated: [SPARK-42522][CONNECT] Fix DataFrameWriterV2 to find the default source
This is an automated email from the ASF dual-hosted git repository. hvanhovell pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new dbe23c8e88d [SPARK-42522][CONNECT] Fix DataFrameWriterV2 to find the default source dbe23c8e88d is described below commit dbe23c8e88d1a2968ae1c17ec9ee3029ef7a348a Author: Takuya UESHIN AuthorDate: Wed Feb 22 16:53:06 2023 -0400 [SPARK-42522][CONNECT] Fix DataFrameWriterV2 to find the default source ### What changes were proposed in this pull request? Fixes `DataFrameWriterV2` to find the default source. ### Why are the changes needed? Currently `DataFrameWriterV2` in Spark Connect doesn't work without the provider with a weird error: For example: ```py df.writeTo("test_table").create() ``` ``` pyspark.errors.exceptions.connect.SparkConnectGrpcException: (org.apache.spark.SparkClassNotFoundException) [DATA_SOURCE_NOT_FOUND] Failed to find the data source: . Please find packages at `https://spark.apache.org/third-party-projects.html`. ``` ### Does this PR introduce _any_ user-facing change? Users will be able to use `DataFrameWriterV2` without provider as same as PySpark. ### How was this patch tested? Added some tests. Closes #40109 from ueshin/issues/SPARK-42522/writer_v2. Authored-by: Takuya UESHIN Signed-off-by: Herman van Hovell --- .../src/main/protobuf/spark/connect/commands.proto | 2 +- .../spark/sql/connect/planner/SparkConnectPlanner.scala | 6 +++--- python/pyspark/sql/connect/proto/commands_pb2.py | 12 ++-- python/pyspark/sql/connect/proto/commands_pb2.pyi| 16 ++-- python/pyspark/sql/tests/test_readwriter.py | 12 5 files changed, 36 insertions(+), 12 deletions(-) diff --git a/connector/connect/common/src/main/protobuf/spark/connect/commands.proto b/connector/connect/common/src/main/protobuf/spark/connect/commands.proto index 7567b0e3d7c..1f2f473a050 100644 --- a/connector/connect/common/src/main/protobuf/spark/connect/commands.proto +++ b/connector/connect/common/src/main/protobuf/spark/connect/commands.proto @@ -128,7 +128,7 @@ message WriteOperationV2 { // (Optional) A provider for the underlying output data source. Spark's default catalog supports // "parquet", "json", etc. - string provider = 3; + optional string provider = 3; // (Optional) List of columns for partitioning for output table created by `create`, // `createOrReplace`, or `replace` diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala index a14d3632d28..268bf02fad9 100644 --- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala @@ -1614,7 +1614,7 @@ class SparkConnectPlanner(val session: SparkSession) { writeOperation.getMode match { case proto.WriteOperationV2.Mode.MODE_CREATE => -if (writeOperation.getProvider != null) { +if (writeOperation.hasProvider) { w.using(writeOperation.getProvider).create() } else { w.create() @@ -1626,13 +1626,13 @@ class SparkConnectPlanner(val session: SparkSession) { case proto.WriteOperationV2.Mode.MODE_APPEND => w.append() case proto.WriteOperationV2.Mode.MODE_REPLACE => -if (writeOperation.getProvider != null) { +if (writeOperation.hasProvider) { w.using(writeOperation.getProvider).replace() } else { w.replace() } case proto.WriteOperationV2.Mode.MODE_CREATE_OR_REPLACE => -if (writeOperation.getProvider != null) { +if (writeOperation.hasProvider) { w.using(writeOperation.getProvider).createOrReplace() } else { w.createOrReplace() diff --git a/python/pyspark/sql/connect/proto/commands_pb2.py b/python/pyspark/sql/connect/proto/commands_pb2.py index faa7dd65e2e..c8ade1ea81b 100644 --- a/python/pyspark/sql/connect/proto/commands_pb2.py +++ b/python/pyspark/sql/connect/proto/commands_pb2.py @@ -36,7 +36,7 @@ from pyspark.sql.connect.proto import types_pb2 as spark_dot_connect_dot_types__ DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile( - b'\n\x1cspark/connect/commands.proto\x12\rspark.connect\x1a\x19google/protobuf/any.proto\x1a\x1fspark/connect/expressions.proto\x1a\x1dspark/connect/relations.proto\x1a\x19spark/connect/types.proto"\xab\x03\n\x07\x43ommand\x12]\n\x11register_function\x18\x01
[spark] branch branch-3.4 updated: [SPARK-42518][CONNECT] Scala Client DataFrameWriterV2
This is an automated email from the ASF dual-hosted git repository. hvanhovell pushed a commit to branch branch-3.4 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.4 by this push: new 6dd52fe251e [SPARK-42518][CONNECT] Scala Client DataFrameWriterV2 6dd52fe251e is described below commit 6dd52fe251e696fe56abcd8574f0e7f34c344654 Author: Zhen Li AuthorDate: Wed Feb 22 16:51:23 2023 -0400 [SPARK-42518][CONNECT] Scala Client DataFrameWriterV2 ### What changes were proposed in this pull request? Adding DataFrameWriterV2. This allows users to use the Dataset#writeTo API. ### Why are the changes needed? Impls Dataset#writeTo ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? E2E This is based on https://github.com/apache/spark/pull/40061 Closes #40075 from zhenlineo/write-v2. Authored-by: Zhen Li Signed-off-by: Herman van Hovell (cherry picked from commit 0c4645eb6bb4740b92281d124053d4610090da34) Signed-off-by: Herman van Hovell --- .../org/apache/spark/sql/DataFrameWriterV2.scala | 289 + .../main/scala/org/apache/spark/sql/Dataset.scala | 23 ++ .../org/apache/spark/sql/ClientE2ETestSuite.scala | 43 ++- .../scala/org/apache/spark/sql/DatasetSuite.scala | 29 +++ .../sql/connect/client/CompatibilitySuite.scala| 11 +- .../connect/client/util/RemoteSparkSession.scala | 12 + 6 files changed, 399 insertions(+), 8 deletions(-) diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameWriterV2.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameWriterV2.scala new file mode 100644 index 000..ed149223129 --- /dev/null +++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameWriterV2.scala @@ -0,0 +1,289 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import scala.collection.JavaConverters._ +import scala.collection.mutable + +import org.apache.spark.annotation.Experimental +import org.apache.spark.connect.proto + +/** + * Interface used to write a [[org.apache.spark.sql.Dataset]] to external storage using the v2 + * API. + * + * @since 3.4.0 + */ +@Experimental +final class DataFrameWriterV2[T] private[sql] (table: String, ds: Dataset[T]) +extends CreateTableWriter[T] { + + private var provider: Option[String] = None + + private val options = new mutable.HashMap[String, String]() + + private val properties = new mutable.HashMap[String, String]() + + private var partitioning: Option[Seq[proto.Expression]] = None + + private var overwriteCondition: Option[proto.Expression] = None + + override def using(provider: String): CreateTableWriter[T] = { +this.provider = Some(provider) +this + } + + override def option(key: String, value: String): DataFrameWriterV2[T] = { +this.options.put(key, value) +this + } + + override def options(options: scala.collection.Map[String, String]): DataFrameWriterV2[T] = { +options.foreach { case (key, value) => + this.options.put(key, value) +} +this + } + + override def options(options: java.util.Map[String, String]): DataFrameWriterV2[T] = { +this.options(options.asScala) +this + } + + override def tableProperty(property: String, value: String): CreateTableWriter[T] = { +this.properties.put(property, value) +this + } + + @scala.annotation.varargs + override def partitionedBy(column: Column, columns: Column*): CreateTableWriter[T] = { +val asTransforms = (column +: columns).map(_.expr) +this.partitioning = Some(asTransforms) +this + } + + override def create(): Unit = { +executeWriteOperation(proto.WriteOperationV2.Mode.MODE_CREATE) + } + + override def replace(): Unit = { +executeWriteOperation(proto.WriteOperationV2.Mode.MODE_REPLACE) + } + + override def createOrReplace(): Unit = { +executeWriteOperation(proto.WriteOperationV2.Mode.MODE_CREATE_OR_REPLACE) + } + + /** + * Append the contents of the data frame to the output table. + * + * If the
[spark] branch master updated: [SPARK-42518][CONNECT] Scala Client DataFrameWriterV2
This is an automated email from the ASF dual-hosted git repository. hvanhovell pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 0c4645eb6bb [SPARK-42518][CONNECT] Scala Client DataFrameWriterV2 0c4645eb6bb is described below commit 0c4645eb6bb4740b92281d124053d4610090da34 Author: Zhen Li AuthorDate: Wed Feb 22 16:51:23 2023 -0400 [SPARK-42518][CONNECT] Scala Client DataFrameWriterV2 ### What changes were proposed in this pull request? Adding DataFrameWriterV2. This allows users to use the Dataset#writeTo API. ### Why are the changes needed? Impls Dataset#writeTo ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? E2E This is based on https://github.com/apache/spark/pull/40061 Closes #40075 from zhenlineo/write-v2. Authored-by: Zhen Li Signed-off-by: Herman van Hovell --- .../org/apache/spark/sql/DataFrameWriterV2.scala | 289 + .../main/scala/org/apache/spark/sql/Dataset.scala | 23 ++ .../org/apache/spark/sql/ClientE2ETestSuite.scala | 43 ++- .../scala/org/apache/spark/sql/DatasetSuite.scala | 29 +++ .../sql/connect/client/CompatibilitySuite.scala| 11 +- .../connect/client/util/RemoteSparkSession.scala | 12 + 6 files changed, 399 insertions(+), 8 deletions(-) diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameWriterV2.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameWriterV2.scala new file mode 100644 index 000..ed149223129 --- /dev/null +++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameWriterV2.scala @@ -0,0 +1,289 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import scala.collection.JavaConverters._ +import scala.collection.mutable + +import org.apache.spark.annotation.Experimental +import org.apache.spark.connect.proto + +/** + * Interface used to write a [[org.apache.spark.sql.Dataset]] to external storage using the v2 + * API. + * + * @since 3.4.0 + */ +@Experimental +final class DataFrameWriterV2[T] private[sql] (table: String, ds: Dataset[T]) +extends CreateTableWriter[T] { + + private var provider: Option[String] = None + + private val options = new mutable.HashMap[String, String]() + + private val properties = new mutable.HashMap[String, String]() + + private var partitioning: Option[Seq[proto.Expression]] = None + + private var overwriteCondition: Option[proto.Expression] = None + + override def using(provider: String): CreateTableWriter[T] = { +this.provider = Some(provider) +this + } + + override def option(key: String, value: String): DataFrameWriterV2[T] = { +this.options.put(key, value) +this + } + + override def options(options: scala.collection.Map[String, String]): DataFrameWriterV2[T] = { +options.foreach { case (key, value) => + this.options.put(key, value) +} +this + } + + override def options(options: java.util.Map[String, String]): DataFrameWriterV2[T] = { +this.options(options.asScala) +this + } + + override def tableProperty(property: String, value: String): CreateTableWriter[T] = { +this.properties.put(property, value) +this + } + + @scala.annotation.varargs + override def partitionedBy(column: Column, columns: Column*): CreateTableWriter[T] = { +val asTransforms = (column +: columns).map(_.expr) +this.partitioning = Some(asTransforms) +this + } + + override def create(): Unit = { +executeWriteOperation(proto.WriteOperationV2.Mode.MODE_CREATE) + } + + override def replace(): Unit = { +executeWriteOperation(proto.WriteOperationV2.Mode.MODE_REPLACE) + } + + override def createOrReplace(): Unit = { +executeWriteOperation(proto.WriteOperationV2.Mode.MODE_CREATE_OR_REPLACE) + } + + /** + * Append the contents of the data frame to the output table. + * + * If the output table does not exist, this operation will fail with + *
[spark] branch branch-3.4 updated: [SPARK-42272][CONNEC][TESTS][FOLLOW-UP] Do not cache local port in SparkConnectService
This is an automated email from the ASF dual-hosted git repository. hvanhovell pushed a commit to branch branch-3.4 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.4 by this push: new fcbb19ac4d3 [SPARK-42272][CONNEC][TESTS][FOLLOW-UP] Do not cache local port in SparkConnectService fcbb19ac4d3 is described below commit fcbb19ac4d34263e15661aecdb57c534d95df8ed Author: Hyukjin Kwon AuthorDate: Wed Feb 22 11:02:35 2023 -0400 [SPARK-42272][CONNEC][TESTS][FOLLOW-UP] Do not cache local port in SparkConnectService ### What changes were proposed in this pull request? This PR proposes to do not cache local port. ### Why are the changes needed? When Spark Context is stopped, and started again, the Spark Connect server shuts down and starts up again too (while JVM itself is alive). So, we should not cache the local port but have the new local port. For example, in https://github.com/apache/spark/pull/40109, the Spark Connect server at `ReadwriterTestsMixin` stops after the tests. And then, `ReadwriterV2TestsMixin` starts the new Spark Connect server which causes failures on any actual protobuf message exchanges to the server. ### Does this PR introduce _any_ user-facing change? No, test-only. ### How was this patch tested? I tested it on the top of https://github.com/apache/spark/pull/40109. That PR should validate it. Closes #40123 from HyukjinKwon/SPARK-42272-followup. Authored-by: Hyukjin Kwon Signed-off-by: Herman van Hovell (cherry picked from commit 8cc8dd6f67a31cb46a228a7fd06ab0c18a462c07) Signed-off-by: Herman van Hovell --- .../org/apache/spark/sql/connect/service/SparkConnectService.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala index 0a5b4197b78..959aceaf46a 100644 --- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala @@ -241,7 +241,7 @@ object SparkConnectService { private[connect] var server: Server = _ // For testing purpose, it's package level private. - private[connect] lazy val localPort = { + private[connect] def localPort: Int = { assert(server != null) // Return the actual local port being used. This can be different from the csonfigured port // when the server binds to the port 0 as an example. - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-42272][CONNEC][TESTS][FOLLOW-UP] Do not cache local port in SparkConnectService
This is an automated email from the ASF dual-hosted git repository. hvanhovell pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 8cc8dd6f67a [SPARK-42272][CONNEC][TESTS][FOLLOW-UP] Do not cache local port in SparkConnectService 8cc8dd6f67a is described below commit 8cc8dd6f67a31cb46a228a7fd06ab0c18a462c07 Author: Hyukjin Kwon AuthorDate: Wed Feb 22 11:02:35 2023 -0400 [SPARK-42272][CONNEC][TESTS][FOLLOW-UP] Do not cache local port in SparkConnectService ### What changes were proposed in this pull request? This PR proposes to do not cache local port. ### Why are the changes needed? When Spark Context is stopped, and started again, the Spark Connect server shuts down and starts up again too (while JVM itself is alive). So, we should not cache the local port but have the new local port. For example, in https://github.com/apache/spark/pull/40109, the Spark Connect server at `ReadwriterTestsMixin` stops after the tests. And then, `ReadwriterV2TestsMixin` starts the new Spark Connect server which causes failures on any actual protobuf message exchanges to the server. ### Does this PR introduce _any_ user-facing change? No, test-only. ### How was this patch tested? I tested it on the top of https://github.com/apache/spark/pull/40109. That PR should validate it. Closes #40123 from HyukjinKwon/SPARK-42272-followup. Authored-by: Hyukjin Kwon Signed-off-by: Herman van Hovell --- .../org/apache/spark/sql/connect/service/SparkConnectService.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala index 0a5b4197b78..959aceaf46a 100644 --- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala @@ -241,7 +241,7 @@ object SparkConnectService { private[connect] var server: Server = _ // For testing purpose, it's package level private. - private[connect] lazy val localPort = { + private[connect] def localPort: Int = { assert(server != null) // Return the actual local port being used. This can be different from the csonfigured port // when the server binds to the port 0 as an example. - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-3.4 updated: [SPARK-42527][CONNECT] Scala Client add Window functions
This is an automated email from the ASF dual-hosted git repository. hvanhovell pushed a commit to branch branch-3.4 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.4 by this push: new 921a633c86c [SPARK-42527][CONNECT] Scala Client add Window functions 921a633c86c is described below commit 921a633c86ce57f4498f5f355af37f80db832298 Author: yangjie01 AuthorDate: Wed Feb 22 10:53:15 2023 -0400 [SPARK-42527][CONNECT] Scala Client add Window functions ### What changes were proposed in this pull request? This PR aims add the window functions to the Scala spark connect client. ### Why are the changes needed? Provide same APIs in the Scala spark connect client as in the original Dataset API. ### Does this PR introduce _any_ user-facing change? Yes, it adds new for functions to the Spark Connect Scala client. ### How was this patch tested? - Add new test - Manual checked connect-client-jvm and connect with Scala-2.13 Closes #40120 from LuciferYang/window-functions. Authored-by: yangjie01 Signed-off-by: Herman van Hovell (cherry picked from commit e2f65b5316ed1473518e2d79e89c9bed756029e9) Signed-off-by: Herman van Hovell --- .../scala/org/apache/spark/sql/functions.scala | 256 - .../org/apache/spark/sql/FunctionTestSuite.scala | 14 ++ .../apache/spark/sql/PlanGenerationTestSuite.scala | 39 .../explain-results/function_cume_dist.explain | 5 + .../explain-results/function_dense_rank.explain| 5 + .../explain-results/function_lag.explain | 5 + .../explain-results/function_lead.explain | 5 + .../explain-results/function_nth_value.explain | 5 + .../explain-results/function_ntile.explain | 5 + .../explain-results/function_percent_rank.explain | 5 + .../explain-results/function_rank.explain | 5 + .../explain-results/function_row_number.explain| 5 + .../query-tests/queries/function_cume_dist.json| 32 +++ .../queries/function_cume_dist.proto.bin | 7 + .../query-tests/queries/function_dense_rank.json | 32 +++ .../queries/function_dense_rank.proto.bin | 8 + .../query-tests/queries/function_lag.json | 52 + .../query-tests/queries/function_lag.proto.bin | Bin 0 -> 209 bytes .../query-tests/queries/function_lead.json | 49 .../query-tests/queries/function_lead.proto.bin| 11 + .../query-tests/queries/function_nth_value.json| 45 .../queries/function_nth_value.proto.bin | 10 + .../query-tests/queries/function_ntile.json| 37 +++ .../query-tests/queries/function_ntile.proto.bin | 8 + .../query-tests/queries/function_percent_rank.json | 32 +++ .../queries/function_percent_rank.proto.bin| 7 + .../query-tests/queries/function_rank.json | 32 +++ .../query-tests/queries/function_rank.proto.bin| 7 + .../query-tests/queries/function_row_number.json | 32 +++ .../queries/function_row_number.proto.bin | 8 + .../sql/connect/planner/SparkConnectPlanner.scala | 20 ++ 31 files changed, 782 insertions(+), 1 deletion(-) diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala index 4996b5033e3..0fd35b570f8 100644 --- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala +++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala @@ -100,6 +100,9 @@ object functions { .setValue(value) } + private val nullType = + proto.DataType.newBuilder().setNull(proto.DataType.NULL.getDefaultInstance).build() + /** * Creates a [[Column]] of literal value. * @@ -129,7 +132,7 @@ object functions { case v: Array[Byte] => createLiteral(_.setBinary(ByteString.copyFrom(v))) case v: collection.mutable.WrappedArray[_] => lit(v.array) case v: LocalDate => createLiteral(_.setDate(v.toEpochDay.toInt)) - case null => unsupported("Null literals not supported yet.") + case null => createLiteral(_.setNull(nullType)) case _ => unsupported(s"literal $literal not supported (yet).") } } @@ -895,6 +898,257 @@ object functions { */ def var_pop(columnName: String): Column = var_pop(Column(columnName)) + // + // Window functions + // + + /** + * Window function: returns the cumulative distribution of values within a window partition, + * i.e. the fraction of rows that are below the current row. + * + * {{{ + * N = total number of rows in the partition + * cumeDist(x) =
[spark] branch master updated: [SPARK-42527][CONNECT] Scala Client add Window functions
This is an automated email from the ASF dual-hosted git repository. hvanhovell pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new e2f65b5316e [SPARK-42527][CONNECT] Scala Client add Window functions e2f65b5316e is described below commit e2f65b5316ed1473518e2d79e89c9bed756029e9 Author: yangjie01 AuthorDate: Wed Feb 22 10:53:15 2023 -0400 [SPARK-42527][CONNECT] Scala Client add Window functions ### What changes were proposed in this pull request? This PR aims add the window functions to the Scala spark connect client. ### Why are the changes needed? Provide same APIs in the Scala spark connect client as in the original Dataset API. ### Does this PR introduce _any_ user-facing change? Yes, it adds new for functions to the Spark Connect Scala client. ### How was this patch tested? - Add new test - Manual checked connect-client-jvm and connect with Scala-2.13 Closes #40120 from LuciferYang/window-functions. Authored-by: yangjie01 Signed-off-by: Herman van Hovell --- .../scala/org/apache/spark/sql/functions.scala | 256 - .../org/apache/spark/sql/FunctionTestSuite.scala | 14 ++ .../apache/spark/sql/PlanGenerationTestSuite.scala | 39 .../explain-results/function_cume_dist.explain | 5 + .../explain-results/function_dense_rank.explain| 5 + .../explain-results/function_lag.explain | 5 + .../explain-results/function_lead.explain | 5 + .../explain-results/function_nth_value.explain | 5 + .../explain-results/function_ntile.explain | 5 + .../explain-results/function_percent_rank.explain | 5 + .../explain-results/function_rank.explain | 5 + .../explain-results/function_row_number.explain| 5 + .../query-tests/queries/function_cume_dist.json| 32 +++ .../queries/function_cume_dist.proto.bin | 7 + .../query-tests/queries/function_dense_rank.json | 32 +++ .../queries/function_dense_rank.proto.bin | 8 + .../query-tests/queries/function_lag.json | 52 + .../query-tests/queries/function_lag.proto.bin | Bin 0 -> 209 bytes .../query-tests/queries/function_lead.json | 49 .../query-tests/queries/function_lead.proto.bin| 11 + .../query-tests/queries/function_nth_value.json| 45 .../queries/function_nth_value.proto.bin | 10 + .../query-tests/queries/function_ntile.json| 37 +++ .../query-tests/queries/function_ntile.proto.bin | 8 + .../query-tests/queries/function_percent_rank.json | 32 +++ .../queries/function_percent_rank.proto.bin| 7 + .../query-tests/queries/function_rank.json | 32 +++ .../query-tests/queries/function_rank.proto.bin| 7 + .../query-tests/queries/function_row_number.json | 32 +++ .../queries/function_row_number.proto.bin | 8 + .../sql/connect/planner/SparkConnectPlanner.scala | 20 ++ 31 files changed, 782 insertions(+), 1 deletion(-) diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala index 4996b5033e3..0fd35b570f8 100644 --- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala +++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala @@ -100,6 +100,9 @@ object functions { .setValue(value) } + private val nullType = + proto.DataType.newBuilder().setNull(proto.DataType.NULL.getDefaultInstance).build() + /** * Creates a [[Column]] of literal value. * @@ -129,7 +132,7 @@ object functions { case v: Array[Byte] => createLiteral(_.setBinary(ByteString.copyFrom(v))) case v: collection.mutable.WrappedArray[_] => lit(v.array) case v: LocalDate => createLiteral(_.setDate(v.toEpochDay.toInt)) - case null => unsupported("Null literals not supported yet.") + case null => createLiteral(_.setNull(nullType)) case _ => unsupported(s"literal $literal not supported (yet).") } } @@ -895,6 +898,257 @@ object functions { */ def var_pop(columnName: String): Column = var_pop(Column(columnName)) + // + // Window functions + // + + /** + * Window function: returns the cumulative distribution of values within a window partition, + * i.e. the fraction of rows that are below the current row. + * + * {{{ + * N = total number of rows in the partition + * cumeDist(x) = number of values before (and including) x / N + * }}} + * + * @group window_funcs + * @since 3.4.0 + */ + def
[spark] branch branch-3.4 updated: [SPARK-41151][FOLLOW-UP][SQL] Improve the doc of `_metadata` generated columns nullability implementation
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch branch-3.4 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.4 by this push: new 006babc71d1 [SPARK-41151][FOLLOW-UP][SQL] Improve the doc of `_metadata` generated columns nullability implementation 006babc71d1 is described below commit 006babc71d17ee8dfcda3f49d69f079383166e45 Author: yaohua AuthorDate: Wed Feb 22 19:53:50 2023 +0800 [SPARK-41151][FOLLOW-UP][SQL] Improve the doc of `_metadata` generated columns nullability implementation ### What changes were proposed in this pull request? Add a doc of how `_metadata` nullability is implemented for generated metadata columns. ### Why are the changes needed? Improve readability ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? N/A Closes #40035 from Yaohua628/spark-41151-doc-follow-up. Lead-authored-by: yaohua Co-authored-by: Yaohua Zhao <79476540+yaohua...@users.noreply.github.com> Signed-off-by: Wenchen Fan (cherry picked from commit 100056ad1b33e134d71239ec729e609e3a68f2c9) Signed-off-by: Wenchen Fan --- .../spark/sql/catalyst/expressions/namedExpressions.scala | 10 ++ .../spark/sql/execution/datasources/FileSourceStrategy.scala | 8 2 files changed, 18 insertions(+) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala index d18cfea1629..52d96f92fdf 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala @@ -578,6 +578,16 @@ object FileSourceGeneratedMetadataAttribute { val FILE_SOURCE_GENERATED_METADATA_COL_ATTR_KEY = "__file_source_generated_metadata_col" + /** + * We keep generated metadata attributes nullability configurable here: + * 1. Before passing to readers, we create generated metadata attributes as nullable; + *Because, for row_index, the readers do not consider the column required. + *row_index can be generated with null in the process by readers. + * 2. When applying the projection, we change the nullability back to not-nullable; + *For row_index, it is generated with nulls which are then replaced, + *so it will not be null in the returned output. + *See `FileSourceStrategy` for more information + */ def apply(name: String, dataType: DataType, nullable: Boolean = false): AttributeReference = AttributeReference(name, dataType, nullable = nullable, new MetadataBuilder() diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala index f48e44d1aab..5838f9e5478 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala @@ -230,6 +230,14 @@ object FileSourceStrategy extends Strategy with PredicateHelper with Logging { case MetadataStructColumn(attr) => attr } + // We divide metadata columns into two categories: constant and generated. + // For constant metadata columns, we create these attributes as non-nullable + // when passing to readers, since the values are always available. + // For generated metadata columns, they are set as nullable when passed to readers, + // as the values will be null when trying to read the missing column from the file. + // They are then replaced by the actual values later in the process. + // All metadata columns will be non-null in the returned output. + // We then change the nullability to non-nullable in the metadata projection node below. val constantMetadataColumns: mutable.Buffer[Attribute] = mutable.Buffer.empty val generatedMetadataColumns: mutable.Buffer[Attribute] = mutable.Buffer.empty - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (50006b9715c -> 100056ad1b3)
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git from 50006b9715c [SPARK-42427][SQL][TESTS][FOLLOW-UP] Disable ANSI for several conv test cases in MathFunctionsSuite add 100056ad1b3 [SPARK-41151][FOLLOW-UP][SQL] Improve the doc of `_metadata` generated columns nullability implementation No new revisions were added by this update. Summary of changes: .../spark/sql/catalyst/expressions/namedExpressions.scala | 10 ++ .../spark/sql/execution/datasources/FileSourceStrategy.scala | 8 2 files changed, 18 insertions(+) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-3.4 updated: [SPARK-42427][SQL][TESTS][FOLLOW-UP] Disable ANSI for several conv test cases in MathFunctionsSuite
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch branch-3.4 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.4 by this push: new 78d0b54d301 [SPARK-42427][SQL][TESTS][FOLLOW-UP] Disable ANSI for several conv test cases in MathFunctionsSuite 78d0b54d301 is described below commit 78d0b54d30186148fd28f23e077d91e689a73672 Author: Hyukjin Kwon AuthorDate: Wed Feb 22 20:38:06 2023 +0900 [SPARK-42427][SQL][TESTS][FOLLOW-UP] Disable ANSI for several conv test cases in MathFunctionsSuite ### What changes were proposed in this pull request? This PR proposes to disable ANSI for several conv test cases in `MathFunctionsSuite`. They are intentionally testing the behaviours when ANSI is disabled. Exception cases are already handled in https://github.com/apache/spark/commit/cb463fb40e8f663b7e3019c8d8560a3490c241d0 I believe. ### Why are the changes needed? To make the ANSI tests pass. It currently fails (https://github.com/apache/spark/actions/runs/4228390267/jobs/7343793692): ``` 2023-02-21T03:03:20.3799795Z [0m[[0m[0minfo[0m] [0m[0m[32m- SPARK-33428 conv function should trim input string (177 milliseconds)[0m[0m 2023-02-21T03:03:20.4252604Z 03:03:20.424 ERROR org.apache.spark.executor.Executor: Exception in task 0.0 in stage 138.0 (TID 256) 2023-02-21T03:03:20.4253602Z org.apache.spark.SparkArithmeticException: [ARITHMETIC_OVERFLOW] Overflow in function conv(). If necessary set "spark.sql.ansi.enabled" to "false" to bypass this error. 2023-02-21T03:03:20.4254440Zat org.apache.spark.sql.errors.QueryExecutionErrors$.arithmeticOverflowError(QueryExecutionErrors.scala:643) 2023-02-21T03:03:20.4255265Zat org.apache.spark.sql.errors.QueryExecutionErrors$.overflowInConvError(QueryExecutionErrors.scala:315) 2023-02-21T03:03:20.4256001Zat org.apache.spark.sql.catalyst.util.NumberConverter$.encode(NumberConverter.scala:68) 2023-02-21T03:03:20.4256888Zat org.apache.spark.sql.catalyst.util.NumberConverter$.convert(NumberConverter.scala:158) 2023-02-21T03:03:20.4257450Zat org.apache.spark.sql.catalyst.util.NumberConverter.convert(NumberConverter.scala) 2023-02-21T03:03:20.4258084Zat org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(generated.java:38) 2023-02-21T03:03:20.4258720Zat org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) 2023-02-21T03:03:20.4259293Zat org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760) 2023-02-21T03:03:20.4259769Zat scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) 2023-02-21T03:03:20.4260157Zat scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) 2023-02-21T03:03:20.4260535Zat org.apache.spark.util.Iterators$.size(Iterators.scala:29) 2023-02-21T03:03:20.4260918Zat org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1944) 2023-02-21T03:03:20.4261283Zat org.apache.spark.rdd.RDD.$anonfun$count$1(RDD.scala:1266) 2023-02-21T03:03:20.4261649Zat org.apache.spark.rdd.RDD.$anonfun$count$1$adapted(RDD.scala:1266) 2023-02-21T03:03:20.4262050Zat org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2303) 2023-02-21T03:03:20.4262726Zat org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:92) 2023-02-21T03:03:20.4263206Zat org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161) 2023-02-21T03:03:20.4263628Zat org.apache.spark.scheduler.Task.run(Task.scala:139) 2023-02-21T03:03:20.4264227Zat org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554) 2023-02-21T03:03:20.4265048Zat org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1520) 2023-02-21T03:03:20.4266209Zat org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557) 2023-02-21T03:03:20.4266805Zat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 2023-02-21T03:03:20.4267369Zat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 2023-02-21T03:03:20.4267799Zat java.lang.Thread.run(Thread.java:750) ``` ### Does this PR introduce _any_ user-facing change? No, test-only. ### How was this patch tested? Fixed unittests. Closes #40117 from HyukjinKwon/SPARK-42427-followup2. Authored-by: Hyukjin Kwon Signed-off-by: Hyukjin Kwon (cherry picked from commit 50006b9715c17be7c9ea5809195945dd78418baa) Signed-off-by: Hyukjin Kwon --- .../org/apache/spark/sql/MathFunctionsSuite.scala| 20 1 file changed, 12 insertions(+), 8 deletions(-) diff --git
[spark] branch master updated (00e56905f77 -> 50006b9715c)
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git from 00e56905f77 [SPARK-42516][SQL] Always capture the session time zone config while creating views add 50006b9715c [SPARK-42427][SQL][TESTS][FOLLOW-UP] Disable ANSI for several conv test cases in MathFunctionsSuite No new revisions were added by this update. Summary of changes: .../org/apache/spark/sql/MathFunctionsSuite.scala| 20 1 file changed, 12 insertions(+), 8 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-3.3 updated: [SPARK-42516][SQL] Always capture the session time zone config while creating views
This is an automated email from the ASF dual-hosted git repository. maxgekk pushed a commit to branch branch-3.3 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.3 by this push: new 82fe3947288 [SPARK-42516][SQL] Always capture the session time zone config while creating views 82fe3947288 is described below commit 82fe39472886da5bf811f158478952cd99044031 Author: Max Gekk AuthorDate: Wed Feb 22 14:03:20 2023 +0300 [SPARK-42516][SQL] Always capture the session time zone config while creating views ### What changes were proposed in this pull request? In the PR, I propose to capture the session time zone config (`spark.sql.session.timeZone`) as a view property, and use it while re-parsing/analysing the view. If the SQL config is not set while creating a view, use the default value of the config. ### Why are the changes needed? To improve user experience with Spark SQL. The current behaviour might confuse users because query results depends on whether or not the session time zone was set explicitly while creating a view. ### Does this PR introduce _any_ user-facing change? Yes. Before the changes, the current value of the session time zone is used in view analysis but this behaviour can be restored via another SQL config `spark.sql.legacy.useCurrentConfigsForView`. ### How was this patch tested? By running the new test via: ``` $ build/sbt "test:testOnly *.PersistedViewTestSuite" ``` Closes #40103 from MaxGekk/view-tz-conf. Authored-by: Max Gekk Signed-off-by: Max Gekk (cherry picked from commit 00e56905f77955f67e3809d724b33aebcc79cb5e) Signed-off-by: Max Gekk --- .../org/apache/spark/sql/execution/command/views.scala | 9 - .../apache/spark/sql/execution/SQLViewTestSuite.scala | 17 + 2 files changed, 25 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala index eca48a69924..4173db3092a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala @@ -398,8 +398,15 @@ object ViewHelper extends SQLConfHelper with Logging { val modifiedConfs = conf.getAllConfs.filter { case (k, _) => conf.isModifiable(k) && shouldCaptureConfig(k) } +// Some configs have dynamic default values, such as SESSION_LOCAL_TIMEZONE whose +// default value relies on the JVM system timezone. We need to always capture them to +// to make sure we apply the same configs when reading the view. +val alwaysCaptured = Seq(SQLConf.SESSION_LOCAL_TIMEZONE) + .filter(c => !modifiedConfs.contains(c.key)) + .map(c => (c.key, conf.getConf(c))) + val props = new mutable.HashMap[String, String] -for ((key, value) <- modifiedConfs) { +for ((key, value) <- modifiedConfs ++ alwaysCaptured) { props.put(s"$VIEW_SQL_CONFIG_PREFIX$key", value) } props.toMap diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewTestSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewTestSuite.scala index 316b1cfd5e8..1874aa15f8f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewTestSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewTestSuite.scala @@ -24,6 +24,7 @@ import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier} import org.apache.spark.sql.catalyst.catalog.CatalogFunction import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.catalyst.plans.logical.Repartition +import org.apache.spark.sql.catalyst.util.DateTimeTestUtils.withDefaultTimeZone import org.apache.spark.sql.connector.catalog._ import org.apache.spark.sql.internal.SQLConf._ import org.apache.spark.sql.test.{SharedSparkSession, SQLTestUtils} @@ -678,6 +679,22 @@ class PersistedViewTestSuite extends SQLViewTestSuite with SharedSparkSession { } } + test("capture the session time zone config while creating a view") { +val viewName = "v1_capture_test" +withView(viewName) { + assert(get.sessionLocalTimeZone === "America/Los_Angeles") + createView(viewName, +"""select hour(ts) as H from ( + | select cast('2022-01-01T00:00:00.000 America/Los_Angeles' as timestamp) as ts + |)""".stripMargin, Seq("H")) + withDefaultTimeZone(java.time.ZoneId.of("UTC-09:00")) { +withSQLConf(SESSION_LOCAL_TIMEZONE.key -> "UTC-10:00") { + checkAnswer(sql(s"select H from $viewName"), Row(0)) +} + } +} + } + def getShowCreateDDL(view: String, serde: Boolean = false): String = { val result = if (serde) { sql(s"SHOW CREATE TABLE $view AS
[spark] branch branch-3.4 updated: [SPARK-42516][SQL] Always capture the session time zone config while creating views
This is an automated email from the ASF dual-hosted git repository. maxgekk pushed a commit to branch branch-3.4 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.4 by this push: new 033eb37f12e [SPARK-42516][SQL] Always capture the session time zone config while creating views 033eb37f12e is described below commit 033eb37f12e3753ea22eb9a069f716981cc58be1 Author: Max Gekk AuthorDate: Wed Feb 22 14:03:20 2023 +0300 [SPARK-42516][SQL] Always capture the session time zone config while creating views ### What changes were proposed in this pull request? In the PR, I propose to capture the session time zone config (`spark.sql.session.timeZone`) as a view property, and use it while re-parsing/analysing the view. If the SQL config is not set while creating a view, use the default value of the config. ### Why are the changes needed? To improve user experience with Spark SQL. The current behaviour might confuse users because query results depends on whether or not the session time zone was set explicitly while creating a view. ### Does this PR introduce _any_ user-facing change? Yes. Before the changes, the current value of the session time zone is used in view analysis but this behaviour can be restored via another SQL config `spark.sql.legacy.useCurrentConfigsForView`. ### How was this patch tested? By running the new test via: ``` $ build/sbt "test:testOnly *.PersistedViewTestSuite" ``` Closes #40103 from MaxGekk/view-tz-conf. Authored-by: Max Gekk Signed-off-by: Max Gekk (cherry picked from commit 00e56905f77955f67e3809d724b33aebcc79cb5e) Signed-off-by: Max Gekk --- .../org/apache/spark/sql/execution/command/views.scala | 9 - .../apache/spark/sql/execution/SQLViewTestSuite.scala | 17 + 2 files changed, 25 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala index 3ad98fa0d0c..f998e134a0a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala @@ -398,8 +398,15 @@ object ViewHelper extends SQLConfHelper with Logging { val modifiedConfs = conf.getAllConfs.filter { case (k, _) => conf.isModifiable(k) && shouldCaptureConfig(k) } +// Some configs have dynamic default values, such as SESSION_LOCAL_TIMEZONE whose +// default value relies on the JVM system timezone. We need to always capture them to +// to make sure we apply the same configs when reading the view. +val alwaysCaptured = Seq(SQLConf.SESSION_LOCAL_TIMEZONE) + .filter(c => !modifiedConfs.contains(c.key)) + .map(c => (c.key, conf.getConf(c))) + val props = new mutable.HashMap[String, String] -for ((key, value) <- modifiedConfs) { +for ((key, value) <- modifiedConfs ++ alwaysCaptured) { props.put(s"$VIEW_SQL_CONFIG_PREFIX$key", value) } props.toMap diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewTestSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewTestSuite.scala index 1d4c52d3ae5..592f1c2607d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewTestSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewTestSuite.scala @@ -24,6 +24,7 @@ import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier} import org.apache.spark.sql.catalyst.catalog.CatalogFunction import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.catalyst.plans.logical.Repartition +import org.apache.spark.sql.catalyst.util.DateTimeTestUtils.withDefaultTimeZone import org.apache.spark.sql.connector.catalog._ import org.apache.spark.sql.connector.catalog.CatalogManager.SESSION_CATALOG_NAME import org.apache.spark.sql.internal.SQLConf._ @@ -706,6 +707,22 @@ class PersistedViewTestSuite extends SQLViewTestSuite with SharedSparkSession { } } + test("capture the session time zone config while creating a view") { +val viewName = "v1_capture_test" +withView(viewName) { + assert(get.sessionLocalTimeZone === "America/Los_Angeles") + createView(viewName, +"""select hour(ts) as H from ( + | select cast('2022-01-01T00:00:00.000 America/Los_Angeles' as timestamp) as ts + |)""".stripMargin, Seq("H")) + withDefaultTimeZone(java.time.ZoneId.of("UTC-09:00")) { +withSQLConf(SESSION_LOCAL_TIMEZONE.key -> "UTC-10:00") { + checkAnswer(sql(s"select H from $viewName"), Row(0)) +} + } +} + } + def getShowCreateDDL(view: String, serde: Boolean = false): String = { val result = if (serde) { sql(s"SHOW CREATE
[spark] branch master updated: [SPARK-42516][SQL] Always capture the session time zone config while creating views
This is an automated email from the ASF dual-hosted git repository. maxgekk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 00e56905f77 [SPARK-42516][SQL] Always capture the session time zone config while creating views 00e56905f77 is described below commit 00e56905f77955f67e3809d724b33aebcc79cb5e Author: Max Gekk AuthorDate: Wed Feb 22 14:03:20 2023 +0300 [SPARK-42516][SQL] Always capture the session time zone config while creating views ### What changes were proposed in this pull request? In the PR, I propose to capture the session time zone config (`spark.sql.session.timeZone`) as a view property, and use it while re-parsing/analysing the view. If the SQL config is not set while creating a view, use the default value of the config. ### Why are the changes needed? To improve user experience with Spark SQL. The current behaviour might confuse users because query results depends on whether or not the session time zone was set explicitly while creating a view. ### Does this PR introduce _any_ user-facing change? Yes. Before the changes, the current value of the session time zone is used in view analysis but this behaviour can be restored via another SQL config `spark.sql.legacy.useCurrentConfigsForView`. ### How was this patch tested? By running the new test via: ``` $ build/sbt "test:testOnly *.PersistedViewTestSuite" ``` Closes #40103 from MaxGekk/view-tz-conf. Authored-by: Max Gekk Signed-off-by: Max Gekk --- .../org/apache/spark/sql/execution/command/views.scala | 9 - .../apache/spark/sql/execution/SQLViewTestSuite.scala | 17 + 2 files changed, 25 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala index 3ad98fa0d0c..f998e134a0a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala @@ -398,8 +398,15 @@ object ViewHelper extends SQLConfHelper with Logging { val modifiedConfs = conf.getAllConfs.filter { case (k, _) => conf.isModifiable(k) && shouldCaptureConfig(k) } +// Some configs have dynamic default values, such as SESSION_LOCAL_TIMEZONE whose +// default value relies on the JVM system timezone. We need to always capture them to +// to make sure we apply the same configs when reading the view. +val alwaysCaptured = Seq(SQLConf.SESSION_LOCAL_TIMEZONE) + .filter(c => !modifiedConfs.contains(c.key)) + .map(c => (c.key, conf.getConf(c))) + val props = new mutable.HashMap[String, String] -for ((key, value) <- modifiedConfs) { +for ((key, value) <- modifiedConfs ++ alwaysCaptured) { props.put(s"$VIEW_SQL_CONFIG_PREFIX$key", value) } props.toMap diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewTestSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewTestSuite.scala index f64be6fcd2c..34dfdf12357 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewTestSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewTestSuite.scala @@ -24,6 +24,7 @@ import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier} import org.apache.spark.sql.catalyst.catalog.CatalogFunction import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.catalyst.plans.logical.Repartition +import org.apache.spark.sql.catalyst.util.DateTimeTestUtils.withDefaultTimeZone import org.apache.spark.sql.connector.catalog._ import org.apache.spark.sql.connector.catalog.CatalogManager.SESSION_CATALOG_NAME import org.apache.spark.sql.internal.SQLConf._ @@ -714,6 +715,22 @@ class PersistedViewTestSuite extends SQLViewTestSuite with SharedSparkSession { } } + test("capture the session time zone config while creating a view") { +val viewName = "v1_capture_test" +withView(viewName) { + assert(get.sessionLocalTimeZone === "America/Los_Angeles") + createView(viewName, +"""select hour(ts) as H from ( + | select cast('2022-01-01T00:00:00.000 America/Los_Angeles' as timestamp) as ts + |)""".stripMargin, Seq("H")) + withDefaultTimeZone(java.time.ZoneId.of("UTC-09:00")) { +withSQLConf(SESSION_LOCAL_TIMEZONE.key -> "UTC-10:00") { + checkAnswer(sql(s"select H from $viewName"), Row(0)) +} + } +} + } + def getShowCreateDDL(view: String, serde: Boolean = false): String = { val result = if (serde) { sql(s"SHOW CREATE TABLE $view AS SERDE") - To unsubscribe,
[spark] branch branch-3.4 updated: [SPARK-42526][ML] Add Classifier.getNumClasses back
This is an automated email from the ASF dual-hosted git repository. ruifengz pushed a commit to branch branch-3.4 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.4 by this push: new c0822512655 [SPARK-42526][ML] Add Classifier.getNumClasses back c0822512655 is described below commit c082251265584c442140b152701a58f571048be7 Author: Ruifeng Zheng AuthorDate: Wed Feb 22 19:02:01 2023 +0800 [SPARK-42526][ML] Add Classifier.getNumClasses back ### What changes were proposed in this pull request? Add Classifier.getNumClasses back ### Why are the changes needed? some famous libraries like `xgboost` happen to depend on this method, even though it is not a public API so it should be nice to make xgboost integration better. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? update mima Closes #40119 from zhengruifeng/ml_add_classifier_get_num_class. Authored-by: Ruifeng Zheng Signed-off-by: Ruifeng Zheng (cherry picked from commit a6098beade01eac5cf92727e69b3537fcac31b2d) Signed-off-by: Ruifeng Zheng --- .../apache/spark/ml/classification/Classifier.scala | 19 +++ .../ml/classification/DecisionTreeClassifier.scala| 2 +- .../ml/classification/RandomForestClassifier.scala| 2 +- project/MimaExcludes.scala| 2 -- 4 files changed, 21 insertions(+), 4 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/Classifier.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/Classifier.scala index 2d7719a29ca..c46be175cb2 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/Classifier.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/Classifier.scala @@ -56,6 +56,25 @@ abstract class Classifier[ M <: ClassificationModel[FeaturesType, M]] extends Predictor[FeaturesType, E, M] with ClassifierParams { + /** + * Get the number of classes. This looks in column metadata first, and if that is missing, + * then this assumes classes are indexed 0,1,...,numClasses-1 and computes numClasses + * by finding the maximum label value. + * + * Label validation (ensuring all labels are integers >= 0) needs to be handled elsewhere, + * such as in `extractLabeledPoints()`. + * + * @param dataset Dataset which contains a column [[labelCol]] + * @param maxNumClasses Maximum number of classes allowed when inferred from data. If numClasses + * is specified in the metadata, then maxNumClasses is ignored. + * @return number of classes + * @throws IllegalArgumentException if metadata does not specify numClasses, and the + * actual numClasses exceeds maxNumClasses + */ + protected def getNumClasses(dataset: Dataset[_], maxNumClasses: Int = 100): Int = { +DatasetUtils.getNumClasses(dataset, $(labelCol), maxNumClasses) + } + /** @group setParam */ def setRawPredictionCol(value: String): E = set(rawPredictionCol, value).asInstanceOf[E] diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/DecisionTreeClassifier.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/DecisionTreeClassifier.scala index 688d2d18f48..7deefda2eea 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/DecisionTreeClassifier.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/DecisionTreeClassifier.scala @@ -117,7 +117,7 @@ class DecisionTreeClassifier @Since("1.4.0") ( instr.logPipelineStage(this) instr.logDataset(dataset) val categoricalFeatures = MetadataUtils.getCategoricalFeatures(dataset.schema($(featuresCol))) -val numClasses = getNumClasses(dataset, $(labelCol)) +val numClasses = getNumClasses(dataset) if (isDefined(thresholds)) { require($(thresholds).length == numClasses, this.getClass.getSimpleName + diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/RandomForestClassifier.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/RandomForestClassifier.scala index 048e5949e1c..9295425f9d6 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/RandomForestClassifier.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/RandomForestClassifier.scala @@ -141,7 +141,7 @@ class RandomForestClassifier @Since("1.4.0") ( instr.logDataset(dataset) val categoricalFeatures: Map[Int, Int] = MetadataUtils.getCategoricalFeatures(dataset.schema($(featuresCol))) -val numClasses = getNumClasses(dataset, $(labelCol)) +val numClasses = getNumClasses(dataset) if (isDefined(thresholds)) { require($(thresholds).length == numClasses, this.getClass.getSimpleName + diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index
[spark] branch master updated: [SPARK-42526][ML] Add Classifier.getNumClasses back
This is an automated email from the ASF dual-hosted git repository. ruifengz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new a6098beade0 [SPARK-42526][ML] Add Classifier.getNumClasses back a6098beade0 is described below commit a6098beade01eac5cf92727e69b3537fcac31b2d Author: Ruifeng Zheng AuthorDate: Wed Feb 22 19:02:01 2023 +0800 [SPARK-42526][ML] Add Classifier.getNumClasses back ### What changes were proposed in this pull request? Add Classifier.getNumClasses back ### Why are the changes needed? some famous libraries like `xgboost` happen to depend on this method, even though it is not a public API so it should be nice to make xgboost integration better. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? update mima Closes #40119 from zhengruifeng/ml_add_classifier_get_num_class. Authored-by: Ruifeng Zheng Signed-off-by: Ruifeng Zheng --- .../apache/spark/ml/classification/Classifier.scala | 19 +++ .../ml/classification/DecisionTreeClassifier.scala| 2 +- .../ml/classification/RandomForestClassifier.scala| 2 +- project/MimaExcludes.scala| 2 -- 4 files changed, 21 insertions(+), 4 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/Classifier.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/Classifier.scala index 2d7719a29ca..c46be175cb2 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/Classifier.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/Classifier.scala @@ -56,6 +56,25 @@ abstract class Classifier[ M <: ClassificationModel[FeaturesType, M]] extends Predictor[FeaturesType, E, M] with ClassifierParams { + /** + * Get the number of classes. This looks in column metadata first, and if that is missing, + * then this assumes classes are indexed 0,1,...,numClasses-1 and computes numClasses + * by finding the maximum label value. + * + * Label validation (ensuring all labels are integers >= 0) needs to be handled elsewhere, + * such as in `extractLabeledPoints()`. + * + * @param dataset Dataset which contains a column [[labelCol]] + * @param maxNumClasses Maximum number of classes allowed when inferred from data. If numClasses + * is specified in the metadata, then maxNumClasses is ignored. + * @return number of classes + * @throws IllegalArgumentException if metadata does not specify numClasses, and the + * actual numClasses exceeds maxNumClasses + */ + protected def getNumClasses(dataset: Dataset[_], maxNumClasses: Int = 100): Int = { +DatasetUtils.getNumClasses(dataset, $(labelCol), maxNumClasses) + } + /** @group setParam */ def setRawPredictionCol(value: String): E = set(rawPredictionCol, value).asInstanceOf[E] diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/DecisionTreeClassifier.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/DecisionTreeClassifier.scala index 688d2d18f48..7deefda2eea 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/DecisionTreeClassifier.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/DecisionTreeClassifier.scala @@ -117,7 +117,7 @@ class DecisionTreeClassifier @Since("1.4.0") ( instr.logPipelineStage(this) instr.logDataset(dataset) val categoricalFeatures = MetadataUtils.getCategoricalFeatures(dataset.schema($(featuresCol))) -val numClasses = getNumClasses(dataset, $(labelCol)) +val numClasses = getNumClasses(dataset) if (isDefined(thresholds)) { require($(thresholds).length == numClasses, this.getClass.getSimpleName + diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/RandomForestClassifier.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/RandomForestClassifier.scala index 048e5949e1c..9295425f9d6 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/RandomForestClassifier.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/RandomForestClassifier.scala @@ -141,7 +141,7 @@ class RandomForestClassifier @Since("1.4.0") ( instr.logDataset(dataset) val categoricalFeatures: Map[Int, Int] = MetadataUtils.getCategoricalFeatures(dataset.schema($(featuresCol))) -val numClasses = getNumClasses(dataset, $(labelCol)) +val numClasses = getNumClasses(dataset) if (isDefined(thresholds)) { require($(thresholds).length == numClasses, this.getClass.getSimpleName + diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index 70a7c29b8dc..9741e53452a 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -55,8 +55,6 @@ object