[spark] branch master updated: [SPARK-41571][SQL] Assign name to _LEGACY_ERROR_TEMP_2310

2023-01-01 Thread maxgekk
This is an automated email from the ASF dual-hosted git repository.

maxgekk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 470beda2231 [SPARK-41571][SQL] Assign name to _LEGACY_ERROR_TEMP_2310
470beda2231 is described below

commit 470beda2231c89d9cbd609bcf1e83d84c80a7f06
Author: itholic 
AuthorDate: Mon Jan 2 11:53:27 2023 +0500

[SPARK-41571][SQL] Assign name to _LEGACY_ERROR_TEMP_2310

### What changes were proposed in this pull request?

This PR proposes to assign name to _LEGACY_ERROR_TEMP_2310, 
"WRITE_STREAM_NOT_ALLOWED".

### Why are the changes needed?

We should assign proper name to _LEGACY_ERROR_TEMP_*

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

`./build/sbt "sql/testOnly org.apache.spark.sql.SQLQueryTestSuite*`

Closes #39285 from itholic/LEGACY_2310.

Authored-by: itholic 
Signed-off-by: Max Gekk 
---
 R/pkg/tests/fulltests/test_streaming.R |  3 +--
 core/src/main/resources/error/error-classes.json   | 10 +-
 sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala |  2 +-
 .../org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala |  8 +---
 4 files changed, 12 insertions(+), 11 deletions(-)

diff --git a/R/pkg/tests/fulltests/test_streaming.R 
b/R/pkg/tests/fulltests/test_streaming.R
index cc84a985423..8804471e640 100644
--- a/R/pkg/tests/fulltests/test_streaming.R
+++ b/R/pkg/tests/fulltests/test_streaming.R
@@ -140,8 +140,7 @@ test_that("Non-streaming DataFrame", {
   expect_false(isStreaming(c))
 
   expect_error(write.stream(c, "memory", queryName = "people", outputMode = 
"complete"),
-   paste0(".*(writeStream : analysis error - 'writeStream' can be 
called only on ",
-  "streaming Dataset/DataFrame).*"))
+   paste0("Error in writeStream : analysis error - 
\\[WRITE_STREAM_NOT_ALLOWED\\].*"))
 })
 
 test_that("Unsupported operation", {
diff --git a/core/src/main/resources/error/error-classes.json 
b/core/src/main/resources/error/error-classes.json
index 4003fab0685..4687d04bf71 100644
--- a/core/src/main/resources/error/error-classes.json
+++ b/core/src/main/resources/error/error-classes.json
@@ -1618,6 +1618,11 @@
 ],
 "sqlState" : "42000"
   },
+  "WRITE_STREAM_NOT_ALLOWED" : {
+"message" : [
+  "`writeStream` can be called only on streaming Dataset/DataFrame."
+]
+  },
   "WRONG_NUM_ARGS" : {
 "message" : [
   "Invalid number of arguments for the function ."
@@ -4907,11 +4912,6 @@
   "cannot resolve  in MERGE command given columns []"
 ]
   },
-  "_LEGACY_ERROR_TEMP_2310" : {
-"message" : [
-  "'writeStream' can be called only on streaming Dataset/DataFrame"
-]
-  },
   "_LEGACY_ERROR_TEMP_2311" : {
 "message" : [
   "'writeTo' can not be called on streaming Dataset/DataFrame"
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index 5f6512d4e4b..c8e2a48859d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -3875,7 +3875,7 @@ class Dataset[T] private[sql](
   def writeStream: DataStreamWriter[T] = {
 if (!isStreaming) {
   logicalPlan.failAnalysis(
-errorClass = "_LEGACY_ERROR_TEMP_2310",
+errorClass = "WRITE_STREAM_NOT_ALLOWED",
 messageParameters = Map.empty)
 }
 new DataStreamWriter[T](this)
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala
index 3f2414d2178..17a003dfe8f 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala
@@ -162,9 +162,11 @@ class DataFrameReaderWriterSuite extends QueryTest with 
SharedSparkSession with
 .writeStream
 .start()
 }
-Seq("'writeStream'", "only", "streaming Dataset/DataFrame").foreach { s =>
-  
assert(e.getMessage.toLowerCase(Locale.ROOT).contains(s.toLowerCase(Locale.ROOT)))
-}
+checkError(
+  exception = e,
+  errorClass = "WRITE_STREAM_NOT_ALLOWED",
+  parameters = Map.empty
+)
   }
 
   test("resolve default source") {


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated (d9c604ec932 -> a3ae6b04ba1)

2023-01-01 Thread ruifengz
This is an automated email from the ASF dual-hosted git repository.

ruifengz pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


from d9c604ec932 [SPARK-41066][CONNECT][PYTHON] Implement 
`DataFrame.sampleBy ` and `DataFrame.stat.sampleBy `
 add a3ae6b04ba1 [SPARK-41808][CONNECT][PYTHON] Make JSON functions support 
options

No new revisions were added by this update.

Summary of changes:
 python/pyspark/sql/connect/functions.py| 23 ++---
 .../sql/tests/connect/test_connect_function.py | 24 ++
 2 files changed, 40 insertions(+), 7 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-41066][CONNECT][PYTHON] Implement `DataFrame.sampleBy ` and `DataFrame.stat.sampleBy `

2023-01-01 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new d9c604ec932 [SPARK-41066][CONNECT][PYTHON] Implement 
`DataFrame.sampleBy ` and `DataFrame.stat.sampleBy `
d9c604ec932 is described below

commit d9c604ec9322117fce0c9b3302c3cd73f5d16df7
Author: Ruifeng Zheng 
AuthorDate: Mon Jan 2 09:31:21 2023 +0900

[SPARK-41066][CONNECT][PYTHON] Implement `DataFrame.sampleBy ` and 
`DataFrame.stat.sampleBy `

### What changes were proposed in this pull request?
 Implement `DataFrame.sampleBy ` and `DataFrame.stat.sampleBy `

### Why are the changes needed?
For API coverage

### Does this PR introduce _any_ user-facing change?
yes

### How was this patch tested?
added UT

Closes #39328 from zhengruifeng/connect_df_sampleby.

Authored-by: Ruifeng Zheng 
Signed-off-by: Hyukjin Kwon 
---
 .../main/protobuf/spark/connect/relations.proto|  29 +++
 .../sql/connect/planner/SparkConnectPlanner.scala  |  24 ++-
 python/pyspark/sql/connect/dataframe.py|  27 +++
 python/pyspark/sql/connect/plan.py |  44 +
 python/pyspark/sql/connect/proto/relations_pb2.py  | 219 -
 python/pyspark/sql/connect/proto/relations_pb2.pyi |  97 +
 .../sql/tests/connect/test_connect_basic.py|  28 +++
 7 files changed, 371 insertions(+), 97 deletions(-)

diff --git 
a/connector/connect/common/src/main/protobuf/spark/connect/relations.proto 
b/connector/connect/common/src/main/protobuf/spark/connect/relations.proto
index db3565eda61..2d834f3fd8c 100644
--- a/connector/connect/common/src/main/protobuf/spark/connect/relations.proto
+++ b/connector/connect/common/src/main/protobuf/spark/connect/relations.proto
@@ -74,6 +74,7 @@ message Relation {
 StatCorr corr = 104;
 StatApproxQuantile approx_quantile = 105;
 StatFreqItems freq_items = 106;
+StatSampleBy sample_by = 107;
 
 // Catalog API (experimental / unstable)
 Catalog catalog = 200;
@@ -546,6 +547,34 @@ message StatFreqItems {
   optional double support = 3;
 }
 
+
+// Returns a stratified sample without replacement based on the fraction
+// given on each stratum.
+message StatSampleBy {
+  // (Required) The input relation.
+  Relation input = 1;
+
+  // (Required) The column that defines strata.
+  Expression col = 2;
+
+  // (Required) Sampling fraction for each stratum.
+  //
+  // If a stratum is not specified, we treat its fraction as zero.
+  repeated Fraction fractions = 3;
+
+  // (Optional) The random seed.
+  optional int64 seed = 5;
+
+  message Fraction {
+// (Required) The stratum.
+Expression.Literal stratum = 1;
+
+// (Required) The fraction value. Must be in [0, 1].
+double fraction = 2;
+  }
+}
+
+
 // Replaces null values.
 // It will invoke 'Dataset.na.fill' (same as 'DataFrameNaFunctions.fill') to 
compute the results.
 // Following 3 parameter combinations are supported:
diff --git 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
index dcfdc3f8b52..d7e2908a1c5 100644
--- 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
+++ 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
@@ -98,6 +98,8 @@ class SparkConnectPlanner(session: SparkSession) {
   case proto.Relation.RelTypeCase.CROSSTAB =>
 transformStatCrosstab(rel.getCrosstab)
   case proto.Relation.RelTypeCase.FREQ_ITEMS => 
transformStatFreqItems(rel.getFreqItems)
+  case proto.Relation.RelTypeCase.SAMPLE_BY =>
+transformStatSampleBy(rel.getSampleBy)
   case proto.Relation.RelTypeCase.TO_SCHEMA => 
transformToSchema(rel.getToSchema)
   case proto.Relation.RelTypeCase.RENAME_COLUMNS_BY_SAME_LENGTH_NAMES =>
 
transformRenameColumnsBySamelenghtNames(rel.getRenameColumnsBySameLengthNames)
@@ -419,6 +421,26 @@ class SparkConnectPlanner(session: SparkSession) {
 }
   }
 
+  private def transformStatSampleBy(rel: proto.StatSampleBy): LogicalPlan = {
+val fractions = mutable.Map.empty[Any, Double]
+rel.getFractionsList.asScala.toSeq.foreach { protoFraction =>
+  val stratum = transformLiteral(protoFraction.getStratum) match {
+case Literal(s, StringType) if s != null => s.toString
+case literal => literal.value
+  }
+  fractions.update(stratum, protoFraction.getFraction)
+}
+
+Dataset
+  .ofRows(session, transformRelation(rel.getInput))
+  .stat
+  .sampleBy(
+col = Column(transformExpression(rel.getCol)),
+fractions = fractions.toMap,
+seed = if (rel.hasSeed) 

[spark] branch master updated: [SPARK-41792][SHUFFLE] Fix DB update for push based shuffle when newer shuffle merge is received

2023-01-01 Thread mridulm80
This is an automated email from the ASF dual-hosted git repository.

mridulm80 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 5d6b69fbc46 [SPARK-41792][SHUFFLE] Fix DB update for push based 
shuffle when newer shuffle merge is received
5d6b69fbc46 is described below

commit 5d6b69fbc46ea52852dbe86f144ab87495af64a8
Author: Mridul Muralidharan 
AuthorDate: Sun Jan 1 13:41:35 2023 -0600

[SPARK-41792][SHUFFLE] Fix DB update for push based shuffle when newer 
shuffle merge is received

### What changes were proposed in this pull request?
Incorrect merge id is removed from the DB when a newer shuffle merge id is 
received.

### Why are the changes needed?
Bug fix

### Does this PR introduce _any_ user-facing change?
No, fixes a corner case bug

### How was this patch tested?
Unit test updated

Closes #39316 from mridulm/SPARK-41792.

Authored-by: Mridul Muralidharan 
Signed-off-by: Mridul Muralidharan gmail.com>
---
 .../network/shuffle/RemoteBlockPushResolver.java  | 18 ++
 .../network/shuffle/RemoteBlockPushResolverSuite.java | 19 +++
 .../spark/network/yarn/YarnShuffleServiceSuite.scala  |  2 +-
 3 files changed, 30 insertions(+), 9 deletions(-)

diff --git 
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
 
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
index 816d1082850..c3a2e9a883a 100644
--- 
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
+++ 
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
@@ -227,15 +227,15 @@ public class RemoteBlockPushResolver implements 
MergedShuffleFileManager {
 // Higher shuffleMergeId seen for the shuffle ID meaning new stage 
attempt is being
 // run for the shuffle ID. Close and clean up old shuffleMergeId 
files,
 // happens in the indeterminate stage retries
-AppAttemptShuffleMergeId appAttemptShuffleMergeId =
-new AppAttemptShuffleMergeId(
-appShuffleInfo.appId, appShuffleInfo.attemptId, shuffleId, 
shuffleMergeId);
+AppAttemptShuffleMergeId currrentAppAttemptShuffleMergeId =
+new AppAttemptShuffleMergeId(appShuffleInfo.appId, 
appShuffleInfo.attemptId,
+shuffleId, latestShuffleMergeId);
 logger.info("{}: creating a new shuffle merge metadata since 
received " +
-"shuffleMergeId is higher than latest shuffleMergeId {}",
-appAttemptShuffleMergeId, latestShuffleMergeId);
+"shuffleMergeId {} is higher than latest shuffleMergeId {}",
+currrentAppAttemptShuffleMergeId, shuffleMergeId, 
latestShuffleMergeId);
 submitCleanupTask(() ->
-closeAndDeleteOutdatedPartitions(
-appAttemptShuffleMergeId, 
mergePartitionsInfo.shuffleMergePartitions));
+
closeAndDeleteOutdatedPartitions(currrentAppAttemptShuffleMergeId,
+mergePartitionsInfo.shuffleMergePartitions));
 return new AppShuffleMergePartitionsInfo(shuffleMergeId, false);
   } else {
 // The request is for block with same shuffleMergeId as the latest 
shuffleMergeId
@@ -653,9 +653,11 @@ public class RemoteBlockPushResolver implements 
MergedShuffleFileManager {
 } else if (msg.shuffleMergeId > mergePartitionsInfo.shuffleMergeId) {
   // If no blocks pushed for the finalizeShuffleMerge shuffleMergeId 
then return
   // empty MergeStatuses but cleanup the older shuffleMergeId files.
+  AppAttemptShuffleMergeId currentAppAttemptShuffleMergeId = new 
AppAttemptShuffleMergeId(
+  msg.appId, msg.appAttemptId, msg.shuffleId, 
mergePartitionsInfo.shuffleMergeId);
   submitCleanupTask(() ->
   closeAndDeleteOutdatedPartitions(
-  appAttemptShuffleMergeId, 
mergePartitionsInfo.shuffleMergePartitions));
+  currentAppAttemptShuffleMergeId, 
mergePartitionsInfo.shuffleMergePartitions));
 } else {
   // This block covers:
   //  1. finalization of determinate stage
diff --git 
a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java
 
b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java
index eb2c1d9fa5c..6a595ee346d 100644
--- 
a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java
+++ 
b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java
@@ -28,7 +28,9 @@ 

[spark] branch master updated: [SPARK-41796][TESTS] Test the error class: UNSUPPORTED_SUBQUERY_EXPRESSION_CATEGORY.UNSUPPORTED_CORRELATED_REFERENCE_DATA_TYPE

2023-01-01 Thread maxgekk
This is an automated email from the ASF dual-hosted git repository.

maxgekk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new b0751ed22b9 [SPARK-41796][TESTS] Test the error class: 
UNSUPPORTED_SUBQUERY_EXPRESSION_CATEGORY.UNSUPPORTED_CORRELATED_REFERENCE_DATA_TYPE
b0751ed22b9 is described below

commit b0751ed22b94a93a5a60a20b24a88ca77d67c694
Author: panbingkun 
AuthorDate: Sun Jan 1 21:45:56 2023 +0500

[SPARK-41796][TESTS] Test the error class: 
UNSUPPORTED_SUBQUERY_EXPRESSION_CATEGORY.UNSUPPORTED_CORRELATED_REFERENCE_DATA_TYPE

### What changes were proposed in this pull request?
This PR aims to modify a test for the error class 
UNSUPPORTED_SUBQUERY_EXPRESSION_CATEGORY.UNSUPPORTED_CORRELATED_REFERENCE_DATA_TYPE
 in SubquerySuite.

### Why are the changes needed?
The changes improve test coverage, and document expected error messages in 
tests.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Pass GA.
Update existed UT.

Closes #39320 from panbingkun/SPARK-41796.

Authored-by: panbingkun 
Signed-off-by: Max Gekk 
---
 .../scala/org/apache/spark/sql/SubquerySuite.scala   | 20 ++--
 1 file changed, 14 insertions(+), 6 deletions(-)

diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
index 65dd911df31..3d4a629f7a9 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
@@ -2452,16 +2452,24 @@ class SubquerySuite extends QueryTest
 Row(2))
 
   // Cannot use non-orderable data type in one row subquery that cannot be 
collapsed.
-val error = intercept[AnalysisException] {
+  checkError(
+exception = intercept[AnalysisException] {
   sql(
-"""
-  |select (
+"""select (
   |  select concat(a, a) from
   |  (select upper(x['a'] + rand()) as a)
   |) from v1
-  |""".stripMargin).collect()
-}
-assert(error.getMessage.contains("Correlated column reference 'v1.x' 
cannot be map type"))
+  |""".stripMargin
+  ).collect()
+},
+errorClass = "UNSUPPORTED_SUBQUERY_EXPRESSION_CATEGORY." +
+  "UNSUPPORTED_CORRELATED_REFERENCE_DATA_TYPE",
+parameters = Map("expr" -> "v1.x", "dataType" -> "map"),
+context = ExpectedContext(
+  fragment = "select upper(x['a'] + rand()) as a",
+  start = 39,
+  stop = 72)
+  )
 }
   }
 


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated (8cfa748a78e -> b1a1a3ccdef)

2023-01-01 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


from 8cfa748a78e [SPARK-41742][SPARK-41745][CONNECT] Reenable doc tests and 
add missing column alias to count()
 add b1a1a3ccdef [SPARK-41493][CONNECT][PYTHON] Make csv functions support 
options

No new revisions were added by this update.

Summary of changes:
 python/pyspark/sql/connect/functions.py| 43 ++
 .../sql/tests/connect/test_connect_function.py | 26 +
 2 files changed, 61 insertions(+), 8 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated (b6ff4aa237c -> 8cfa748a78e)

2023-01-01 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


from b6ff4aa237c [SPARK-41799][CONNECT][PYTHON][TESTS] Combine plan-related 
tests into single file
 add 8cfa748a78e [SPARK-41742][SPARK-41745][CONNECT] Reenable doc tests and 
add missing column alias to count()

No new revisions were added by this update.

Summary of changes:
 python/pyspark/sql/connect/group.py |  2 +-
 python/pyspark/sql/group.py | 21 +++--
 2 files changed, 8 insertions(+), 15 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-41799][CONNECT][PYTHON][TESTS] Combine plan-related tests into single file

2023-01-01 Thread ruifengz
This is an automated email from the ASF dual-hosted git repository.

ruifengz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new b6ff4aa237c [SPARK-41799][CONNECT][PYTHON][TESTS] Combine plan-related 
tests into single file
b6ff4aa237c is described below

commit b6ff4aa237cd4dcce20d6244295d038a7d3cfab7
Author: Ruifeng Zheng 
AuthorDate: Sun Jan 1 16:06:50 2023 +0800

[SPARK-41799][CONNECT][PYTHON][TESTS] Combine plan-related tests into 
single file

### What changes were proposed in this pull request?
Combine plan-related tests into single file

### Why are the changes needed?
1, `test_connect_column_expressions`, `test_connect_plan_only`, 
`test_connect_select_ops` almost did the same thing: generate and then validate 
a plan;

2, the three tests are pretty small, and normally finished in 1 sec.

```
Starting test(python3.9): 
pyspark.sql.tests.connect.test_connect_column_expressions (temp output: 
/__w/spark/spark/python/target/8d15e343-028e-44de-b998-6a7e7cc98047/python3.9__pyspark.sql.tests.connect.test_connect_column_expressions__fkvdhn26.log)
Finished test(python3.9): 
pyspark.sql.tests.connect.test_connect_column_expressions (0s)
Starting test(python3.9): pyspark.sql.tests.connect.test_connect_function 
(temp output: 
/__w/spark/spark/python/target/b35958dc-4dd3-4420-8f44-cb6f66d568dc/python3.9__pyspark.sql.tests.connect.test_connect_function__te14hoiz.log)
Finished test(python3.9): pyspark.sql.tests.connect.test_connect_function 
(80s)
Starting test(python3.9): pyspark.sql.tests.connect.test_connect_plan_only 
(temp output: 
/__w/spark/spark/python/target/3225a48d-5b4c-4cbe-803d-680c9408e3a8/python3.9__pyspark.sql.tests.connect.test_connect_plan_only__4bjohyey.log)
Finished test(python3.9): pyspark.sql.tests.connect.test_connect_plan_only 
(0s)
Starting test(python3.9): pyspark.sql.tests.connect.test_connect_select_ops 
(temp output: 
/__w/spark/spark/python/target/fe6a37ff-9aa8-41d5-8204-44a86423381f/python3.9__pyspark.sql.tests.connect.test_connect_select_ops__cicvg0w7.log)
Finished test(python3.9): pyspark.sql.tests.connect.test_connect_select_ops 
(0s)
```

### Does this PR introduce _any_ user-facing change?
no, test-only

### How was this patch tested?
CI

Closes #39323 from zhengruifeng/connect_test_reorg.

Authored-by: Ruifeng Zheng 
Signed-off-by: Ruifeng Zheng 
---
 dev/sparktestsupport/modules.py|   4 +-
 .../sql/tests/connect/test_connect_basic.py|  24 ++-
 .../sql/tests/connect/test_connect_column.py   |   2 +-
 .../connect/test_connect_column_expressions.py | 195 --
 ...t_connect_plan_only.py => test_connect_plan.py} | 225 ++---
 .../sql/tests/connect/test_connect_select_ops.py   |  71 ---
 6 files changed, 225 insertions(+), 296 deletions(-)

diff --git a/dev/sparktestsupport/modules.py b/dev/sparktestsupport/modules.py
index df3a1f180fc..dff17792148 100644
--- a/dev/sparktestsupport/modules.py
+++ b/dev/sparktestsupport/modules.py
@@ -509,9 +509,7 @@ pyspark_connect = Module(
 "pyspark.sql.connect.window",
 "pyspark.sql.connect.column",
 # unittests
-"pyspark.sql.tests.connect.test_connect_column_expressions",
-"pyspark.sql.tests.connect.test_connect_plan_only",
-"pyspark.sql.tests.connect.test_connect_select_ops",
+"pyspark.sql.tests.connect.test_connect_plan",
 "pyspark.sql.tests.connect.test_connect_basic",
 "pyspark.sql.tests.connect.test_connect_function",
 "pyspark.sql.tests.connect.test_connect_column",
diff --git a/python/pyspark/sql/tests/connect/test_connect_basic.py 
b/python/pyspark/sql/tests/connect/test_connect_basic.py
index 6cdef25d5bc..0b615d2e32a 100644
--- a/python/pyspark/sql/tests/connect/test_connect_basic.py
+++ b/python/pyspark/sql/tests/connect/test_connect_basic.py
@@ -117,7 +117,7 @@ class SparkConnectSQLTestCase(PandasOnSparkTestCase, 
ReusedPySparkTestCase, SQLT
 cls.spark.sql("DROP TABLE IF EXISTS {}".format(cls.tbl_name_empty))
 
 
-class SparkConnectTests(SparkConnectSQLTestCase):
+class SparkConnectBasicTests(SparkConnectSQLTestCase):
 def test_df_get_item(self):
 # SPARK-41779: test __getitem__
 
@@ -1746,6 +1746,28 @@ class SparkConnectTests(SparkConnectSQLTestCase):
 ):
 cdf.groupBy("name").pivot("department").sum("salary", 
"department").show()
 
+def test_unsupported_functions(self):
+# SPARK-41225: Disable unsupported functions.
+df = self.connect.read.table(self.tbl_name)
+for f in (
+"rdd",
+"unpersist",
+"cache",
+"persist",
+"withWatermark",
+"observe",
+"foreach",
+"foreachPartition",
+