[spark] branch master updated: [SPARK-41571][SQL] Assign name to _LEGACY_ERROR_TEMP_2310
This is an automated email from the ASF dual-hosted git repository. maxgekk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 470beda2231 [SPARK-41571][SQL] Assign name to _LEGACY_ERROR_TEMP_2310 470beda2231 is described below commit 470beda2231c89d9cbd609bcf1e83d84c80a7f06 Author: itholic AuthorDate: Mon Jan 2 11:53:27 2023 +0500 [SPARK-41571][SQL] Assign name to _LEGACY_ERROR_TEMP_2310 ### What changes were proposed in this pull request? This PR proposes to assign name to _LEGACY_ERROR_TEMP_2310, "WRITE_STREAM_NOT_ALLOWED". ### Why are the changes needed? We should assign proper name to _LEGACY_ERROR_TEMP_* ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? `./build/sbt "sql/testOnly org.apache.spark.sql.SQLQueryTestSuite*` Closes #39285 from itholic/LEGACY_2310. Authored-by: itholic Signed-off-by: Max Gekk --- R/pkg/tests/fulltests/test_streaming.R | 3 +-- core/src/main/resources/error/error-classes.json | 10 +- sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala | 2 +- .../org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala | 8 +--- 4 files changed, 12 insertions(+), 11 deletions(-) diff --git a/R/pkg/tests/fulltests/test_streaming.R b/R/pkg/tests/fulltests/test_streaming.R index cc84a985423..8804471e640 100644 --- a/R/pkg/tests/fulltests/test_streaming.R +++ b/R/pkg/tests/fulltests/test_streaming.R @@ -140,8 +140,7 @@ test_that("Non-streaming DataFrame", { expect_false(isStreaming(c)) expect_error(write.stream(c, "memory", queryName = "people", outputMode = "complete"), - paste0(".*(writeStream : analysis error - 'writeStream' can be called only on ", - "streaming Dataset/DataFrame).*")) + paste0("Error in writeStream : analysis error - \\[WRITE_STREAM_NOT_ALLOWED\\].*")) }) test_that("Unsupported operation", { diff --git a/core/src/main/resources/error/error-classes.json b/core/src/main/resources/error/error-classes.json index 4003fab0685..4687d04bf71 100644 --- a/core/src/main/resources/error/error-classes.json +++ b/core/src/main/resources/error/error-classes.json @@ -1618,6 +1618,11 @@ ], "sqlState" : "42000" }, + "WRITE_STREAM_NOT_ALLOWED" : { +"message" : [ + "`writeStream` can be called only on streaming Dataset/DataFrame." +] + }, "WRONG_NUM_ARGS" : { "message" : [ "Invalid number of arguments for the function ." @@ -4907,11 +4912,6 @@ "cannot resolve in MERGE command given columns []" ] }, - "_LEGACY_ERROR_TEMP_2310" : { -"message" : [ - "'writeStream' can be called only on streaming Dataset/DataFrame" -] - }, "_LEGACY_ERROR_TEMP_2311" : { "message" : [ "'writeTo' can not be called on streaming Dataset/DataFrame" diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index 5f6512d4e4b..c8e2a48859d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -3875,7 +3875,7 @@ class Dataset[T] private[sql]( def writeStream: DataStreamWriter[T] = { if (!isStreaming) { logicalPlan.failAnalysis( -errorClass = "_LEGACY_ERROR_TEMP_2310", +errorClass = "WRITE_STREAM_NOT_ALLOWED", messageParameters = Map.empty) } new DataStreamWriter[T](this) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala index 3f2414d2178..17a003dfe8f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala @@ -162,9 +162,11 @@ class DataFrameReaderWriterSuite extends QueryTest with SharedSparkSession with .writeStream .start() } -Seq("'writeStream'", "only", "streaming Dataset/DataFrame").foreach { s => - assert(e.getMessage.toLowerCase(Locale.ROOT).contains(s.toLowerCase(Locale.ROOT))) -} +checkError( + exception = e, + errorClass = "WRITE_STREAM_NOT_ALLOWED", + parameters = Map.empty +) } test("resolve default source") { - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (d9c604ec932 -> a3ae6b04ba1)
This is an automated email from the ASF dual-hosted git repository. ruifengz pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git from d9c604ec932 [SPARK-41066][CONNECT][PYTHON] Implement `DataFrame.sampleBy ` and `DataFrame.stat.sampleBy ` add a3ae6b04ba1 [SPARK-41808][CONNECT][PYTHON] Make JSON functions support options No new revisions were added by this update. Summary of changes: python/pyspark/sql/connect/functions.py| 23 ++--- .../sql/tests/connect/test_connect_function.py | 24 ++ 2 files changed, 40 insertions(+), 7 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-41066][CONNECT][PYTHON] Implement `DataFrame.sampleBy ` and `DataFrame.stat.sampleBy `
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new d9c604ec932 [SPARK-41066][CONNECT][PYTHON] Implement `DataFrame.sampleBy ` and `DataFrame.stat.sampleBy ` d9c604ec932 is described below commit d9c604ec9322117fce0c9b3302c3cd73f5d16df7 Author: Ruifeng Zheng AuthorDate: Mon Jan 2 09:31:21 2023 +0900 [SPARK-41066][CONNECT][PYTHON] Implement `DataFrame.sampleBy ` and `DataFrame.stat.sampleBy ` ### What changes were proposed in this pull request? Implement `DataFrame.sampleBy ` and `DataFrame.stat.sampleBy ` ### Why are the changes needed? For API coverage ### Does this PR introduce _any_ user-facing change? yes ### How was this patch tested? added UT Closes #39328 from zhengruifeng/connect_df_sampleby. Authored-by: Ruifeng Zheng Signed-off-by: Hyukjin Kwon --- .../main/protobuf/spark/connect/relations.proto| 29 +++ .../sql/connect/planner/SparkConnectPlanner.scala | 24 ++- python/pyspark/sql/connect/dataframe.py| 27 +++ python/pyspark/sql/connect/plan.py | 44 + python/pyspark/sql/connect/proto/relations_pb2.py | 219 - python/pyspark/sql/connect/proto/relations_pb2.pyi | 97 + .../sql/tests/connect/test_connect_basic.py| 28 +++ 7 files changed, 371 insertions(+), 97 deletions(-) diff --git a/connector/connect/common/src/main/protobuf/spark/connect/relations.proto b/connector/connect/common/src/main/protobuf/spark/connect/relations.proto index db3565eda61..2d834f3fd8c 100644 --- a/connector/connect/common/src/main/protobuf/spark/connect/relations.proto +++ b/connector/connect/common/src/main/protobuf/spark/connect/relations.proto @@ -74,6 +74,7 @@ message Relation { StatCorr corr = 104; StatApproxQuantile approx_quantile = 105; StatFreqItems freq_items = 106; +StatSampleBy sample_by = 107; // Catalog API (experimental / unstable) Catalog catalog = 200; @@ -546,6 +547,34 @@ message StatFreqItems { optional double support = 3; } + +// Returns a stratified sample without replacement based on the fraction +// given on each stratum. +message StatSampleBy { + // (Required) The input relation. + Relation input = 1; + + // (Required) The column that defines strata. + Expression col = 2; + + // (Required) Sampling fraction for each stratum. + // + // If a stratum is not specified, we treat its fraction as zero. + repeated Fraction fractions = 3; + + // (Optional) The random seed. + optional int64 seed = 5; + + message Fraction { +// (Required) The stratum. +Expression.Literal stratum = 1; + +// (Required) The fraction value. Must be in [0, 1]. +double fraction = 2; + } +} + + // Replaces null values. // It will invoke 'Dataset.na.fill' (same as 'DataFrameNaFunctions.fill') to compute the results. // Following 3 parameter combinations are supported: diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala index dcfdc3f8b52..d7e2908a1c5 100644 --- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala @@ -98,6 +98,8 @@ class SparkConnectPlanner(session: SparkSession) { case proto.Relation.RelTypeCase.CROSSTAB => transformStatCrosstab(rel.getCrosstab) case proto.Relation.RelTypeCase.FREQ_ITEMS => transformStatFreqItems(rel.getFreqItems) + case proto.Relation.RelTypeCase.SAMPLE_BY => +transformStatSampleBy(rel.getSampleBy) case proto.Relation.RelTypeCase.TO_SCHEMA => transformToSchema(rel.getToSchema) case proto.Relation.RelTypeCase.RENAME_COLUMNS_BY_SAME_LENGTH_NAMES => transformRenameColumnsBySamelenghtNames(rel.getRenameColumnsBySameLengthNames) @@ -419,6 +421,26 @@ class SparkConnectPlanner(session: SparkSession) { } } + private def transformStatSampleBy(rel: proto.StatSampleBy): LogicalPlan = { +val fractions = mutable.Map.empty[Any, Double] +rel.getFractionsList.asScala.toSeq.foreach { protoFraction => + val stratum = transformLiteral(protoFraction.getStratum) match { +case Literal(s, StringType) if s != null => s.toString +case literal => literal.value + } + fractions.update(stratum, protoFraction.getFraction) +} + +Dataset + .ofRows(session, transformRelation(rel.getInput)) + .stat + .sampleBy( +col = Column(transformExpression(rel.getCol)), +fractions = fractions.toMap, +seed = if (rel.hasSeed)
[spark] branch master updated: [SPARK-41792][SHUFFLE] Fix DB update for push based shuffle when newer shuffle merge is received
This is an automated email from the ASF dual-hosted git repository. mridulm80 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 5d6b69fbc46 [SPARK-41792][SHUFFLE] Fix DB update for push based shuffle when newer shuffle merge is received 5d6b69fbc46 is described below commit 5d6b69fbc46ea52852dbe86f144ab87495af64a8 Author: Mridul Muralidharan AuthorDate: Sun Jan 1 13:41:35 2023 -0600 [SPARK-41792][SHUFFLE] Fix DB update for push based shuffle when newer shuffle merge is received ### What changes were proposed in this pull request? Incorrect merge id is removed from the DB when a newer shuffle merge id is received. ### Why are the changes needed? Bug fix ### Does this PR introduce _any_ user-facing change? No, fixes a corner case bug ### How was this patch tested? Unit test updated Closes #39316 from mridulm/SPARK-41792. Authored-by: Mridul Muralidharan Signed-off-by: Mridul Muralidharan gmail.com> --- .../network/shuffle/RemoteBlockPushResolver.java | 18 ++ .../network/shuffle/RemoteBlockPushResolverSuite.java | 19 +++ .../spark/network/yarn/YarnShuffleServiceSuite.scala | 2 +- 3 files changed, 30 insertions(+), 9 deletions(-) diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java index 816d1082850..c3a2e9a883a 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java @@ -227,15 +227,15 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager { // Higher shuffleMergeId seen for the shuffle ID meaning new stage attempt is being // run for the shuffle ID. Close and clean up old shuffleMergeId files, // happens in the indeterminate stage retries -AppAttemptShuffleMergeId appAttemptShuffleMergeId = -new AppAttemptShuffleMergeId( -appShuffleInfo.appId, appShuffleInfo.attemptId, shuffleId, shuffleMergeId); +AppAttemptShuffleMergeId currrentAppAttemptShuffleMergeId = +new AppAttemptShuffleMergeId(appShuffleInfo.appId, appShuffleInfo.attemptId, +shuffleId, latestShuffleMergeId); logger.info("{}: creating a new shuffle merge metadata since received " + -"shuffleMergeId is higher than latest shuffleMergeId {}", -appAttemptShuffleMergeId, latestShuffleMergeId); +"shuffleMergeId {} is higher than latest shuffleMergeId {}", +currrentAppAttemptShuffleMergeId, shuffleMergeId, latestShuffleMergeId); submitCleanupTask(() -> -closeAndDeleteOutdatedPartitions( -appAttemptShuffleMergeId, mergePartitionsInfo.shuffleMergePartitions)); + closeAndDeleteOutdatedPartitions(currrentAppAttemptShuffleMergeId, +mergePartitionsInfo.shuffleMergePartitions)); return new AppShuffleMergePartitionsInfo(shuffleMergeId, false); } else { // The request is for block with same shuffleMergeId as the latest shuffleMergeId @@ -653,9 +653,11 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager { } else if (msg.shuffleMergeId > mergePartitionsInfo.shuffleMergeId) { // If no blocks pushed for the finalizeShuffleMerge shuffleMergeId then return // empty MergeStatuses but cleanup the older shuffleMergeId files. + AppAttemptShuffleMergeId currentAppAttemptShuffleMergeId = new AppAttemptShuffleMergeId( + msg.appId, msg.appAttemptId, msg.shuffleId, mergePartitionsInfo.shuffleMergeId); submitCleanupTask(() -> closeAndDeleteOutdatedPartitions( - appAttemptShuffleMergeId, mergePartitionsInfo.shuffleMergePartitions)); + currentAppAttemptShuffleMergeId, mergePartitionsInfo.shuffleMergePartitions)); } else { // This block covers: // 1. finalization of determinate stage diff --git a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java index eb2c1d9fa5c..6a595ee346d 100644 --- a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java +++ b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java @@ -28,7 +28,9 @@
[spark] branch master updated: [SPARK-41796][TESTS] Test the error class: UNSUPPORTED_SUBQUERY_EXPRESSION_CATEGORY.UNSUPPORTED_CORRELATED_REFERENCE_DATA_TYPE
This is an automated email from the ASF dual-hosted git repository. maxgekk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new b0751ed22b9 [SPARK-41796][TESTS] Test the error class: UNSUPPORTED_SUBQUERY_EXPRESSION_CATEGORY.UNSUPPORTED_CORRELATED_REFERENCE_DATA_TYPE b0751ed22b9 is described below commit b0751ed22b94a93a5a60a20b24a88ca77d67c694 Author: panbingkun AuthorDate: Sun Jan 1 21:45:56 2023 +0500 [SPARK-41796][TESTS] Test the error class: UNSUPPORTED_SUBQUERY_EXPRESSION_CATEGORY.UNSUPPORTED_CORRELATED_REFERENCE_DATA_TYPE ### What changes were proposed in this pull request? This PR aims to modify a test for the error class UNSUPPORTED_SUBQUERY_EXPRESSION_CATEGORY.UNSUPPORTED_CORRELATED_REFERENCE_DATA_TYPE in SubquerySuite. ### Why are the changes needed? The changes improve test coverage, and document expected error messages in tests. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Pass GA. Update existed UT. Closes #39320 from panbingkun/SPARK-41796. Authored-by: panbingkun Signed-off-by: Max Gekk --- .../scala/org/apache/spark/sql/SubquerySuite.scala | 20 ++-- 1 file changed, 14 insertions(+), 6 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala index 65dd911df31..3d4a629f7a9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala @@ -2452,16 +2452,24 @@ class SubquerySuite extends QueryTest Row(2)) // Cannot use non-orderable data type in one row subquery that cannot be collapsed. -val error = intercept[AnalysisException] { + checkError( +exception = intercept[AnalysisException] { sql( -""" - |select ( +"""select ( | select concat(a, a) from | (select upper(x['a'] + rand()) as a) |) from v1 - |""".stripMargin).collect() -} -assert(error.getMessage.contains("Correlated column reference 'v1.x' cannot be map type")) + |""".stripMargin + ).collect() +}, +errorClass = "UNSUPPORTED_SUBQUERY_EXPRESSION_CATEGORY." + + "UNSUPPORTED_CORRELATED_REFERENCE_DATA_TYPE", +parameters = Map("expr" -> "v1.x", "dataType" -> "map"), +context = ExpectedContext( + fragment = "select upper(x['a'] + rand()) as a", + start = 39, + stop = 72) + ) } } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (8cfa748a78e -> b1a1a3ccdef)
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git from 8cfa748a78e [SPARK-41742][SPARK-41745][CONNECT] Reenable doc tests and add missing column alias to count() add b1a1a3ccdef [SPARK-41493][CONNECT][PYTHON] Make csv functions support options No new revisions were added by this update. Summary of changes: python/pyspark/sql/connect/functions.py| 43 ++ .../sql/tests/connect/test_connect_function.py | 26 + 2 files changed, 61 insertions(+), 8 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (b6ff4aa237c -> 8cfa748a78e)
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git from b6ff4aa237c [SPARK-41799][CONNECT][PYTHON][TESTS] Combine plan-related tests into single file add 8cfa748a78e [SPARK-41742][SPARK-41745][CONNECT] Reenable doc tests and add missing column alias to count() No new revisions were added by this update. Summary of changes: python/pyspark/sql/connect/group.py | 2 +- python/pyspark/sql/group.py | 21 +++-- 2 files changed, 8 insertions(+), 15 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-41799][CONNECT][PYTHON][TESTS] Combine plan-related tests into single file
This is an automated email from the ASF dual-hosted git repository. ruifengz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new b6ff4aa237c [SPARK-41799][CONNECT][PYTHON][TESTS] Combine plan-related tests into single file b6ff4aa237c is described below commit b6ff4aa237cd4dcce20d6244295d038a7d3cfab7 Author: Ruifeng Zheng AuthorDate: Sun Jan 1 16:06:50 2023 +0800 [SPARK-41799][CONNECT][PYTHON][TESTS] Combine plan-related tests into single file ### What changes were proposed in this pull request? Combine plan-related tests into single file ### Why are the changes needed? 1, `test_connect_column_expressions`, `test_connect_plan_only`, `test_connect_select_ops` almost did the same thing: generate and then validate a plan; 2, the three tests are pretty small, and normally finished in 1 sec. ``` Starting test(python3.9): pyspark.sql.tests.connect.test_connect_column_expressions (temp output: /__w/spark/spark/python/target/8d15e343-028e-44de-b998-6a7e7cc98047/python3.9__pyspark.sql.tests.connect.test_connect_column_expressions__fkvdhn26.log) Finished test(python3.9): pyspark.sql.tests.connect.test_connect_column_expressions (0s) Starting test(python3.9): pyspark.sql.tests.connect.test_connect_function (temp output: /__w/spark/spark/python/target/b35958dc-4dd3-4420-8f44-cb6f66d568dc/python3.9__pyspark.sql.tests.connect.test_connect_function__te14hoiz.log) Finished test(python3.9): pyspark.sql.tests.connect.test_connect_function (80s) Starting test(python3.9): pyspark.sql.tests.connect.test_connect_plan_only (temp output: /__w/spark/spark/python/target/3225a48d-5b4c-4cbe-803d-680c9408e3a8/python3.9__pyspark.sql.tests.connect.test_connect_plan_only__4bjohyey.log) Finished test(python3.9): pyspark.sql.tests.connect.test_connect_plan_only (0s) Starting test(python3.9): pyspark.sql.tests.connect.test_connect_select_ops (temp output: /__w/spark/spark/python/target/fe6a37ff-9aa8-41d5-8204-44a86423381f/python3.9__pyspark.sql.tests.connect.test_connect_select_ops__cicvg0w7.log) Finished test(python3.9): pyspark.sql.tests.connect.test_connect_select_ops (0s) ``` ### Does this PR introduce _any_ user-facing change? no, test-only ### How was this patch tested? CI Closes #39323 from zhengruifeng/connect_test_reorg. Authored-by: Ruifeng Zheng Signed-off-by: Ruifeng Zheng --- dev/sparktestsupport/modules.py| 4 +- .../sql/tests/connect/test_connect_basic.py| 24 ++- .../sql/tests/connect/test_connect_column.py | 2 +- .../connect/test_connect_column_expressions.py | 195 -- ...t_connect_plan_only.py => test_connect_plan.py} | 225 ++--- .../sql/tests/connect/test_connect_select_ops.py | 71 --- 6 files changed, 225 insertions(+), 296 deletions(-) diff --git a/dev/sparktestsupport/modules.py b/dev/sparktestsupport/modules.py index df3a1f180fc..dff17792148 100644 --- a/dev/sparktestsupport/modules.py +++ b/dev/sparktestsupport/modules.py @@ -509,9 +509,7 @@ pyspark_connect = Module( "pyspark.sql.connect.window", "pyspark.sql.connect.column", # unittests -"pyspark.sql.tests.connect.test_connect_column_expressions", -"pyspark.sql.tests.connect.test_connect_plan_only", -"pyspark.sql.tests.connect.test_connect_select_ops", +"pyspark.sql.tests.connect.test_connect_plan", "pyspark.sql.tests.connect.test_connect_basic", "pyspark.sql.tests.connect.test_connect_function", "pyspark.sql.tests.connect.test_connect_column", diff --git a/python/pyspark/sql/tests/connect/test_connect_basic.py b/python/pyspark/sql/tests/connect/test_connect_basic.py index 6cdef25d5bc..0b615d2e32a 100644 --- a/python/pyspark/sql/tests/connect/test_connect_basic.py +++ b/python/pyspark/sql/tests/connect/test_connect_basic.py @@ -117,7 +117,7 @@ class SparkConnectSQLTestCase(PandasOnSparkTestCase, ReusedPySparkTestCase, SQLT cls.spark.sql("DROP TABLE IF EXISTS {}".format(cls.tbl_name_empty)) -class SparkConnectTests(SparkConnectSQLTestCase): +class SparkConnectBasicTests(SparkConnectSQLTestCase): def test_df_get_item(self): # SPARK-41779: test __getitem__ @@ -1746,6 +1746,28 @@ class SparkConnectTests(SparkConnectSQLTestCase): ): cdf.groupBy("name").pivot("department").sum("salary", "department").show() +def test_unsupported_functions(self): +# SPARK-41225: Disable unsupported functions. +df = self.connect.read.table(self.tbl_name) +for f in ( +"rdd", +"unpersist", +"cache", +"persist", +"withWatermark", +"observe", +"foreach", +"foreachPartition", +