WweiL commented on code in PR #41129:
URL: https://github.com/apache/spark/pull/41129#discussion_r1190646225


##########
connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala:
##########
@@ -17,131 +17,172 @@
 
 package org.apache.spark.sql.streaming
 
-import org.scalatest.concurrent.Eventually.eventually
-import org.scalatest.concurrent.Futures.timeout
-import org.scalatest.time.SpanSugar._
+import java.io.FileWriter
 
-import org.apache.spark.sql.SQLHelper
+//import org.scalatest.concurrent.Eventually.eventually
+//import org.scalatest.concurrent.Futures.timeout
+//import org.scalatest.time.SpanSugar._
+
+import org.apache.spark.sql.{ForeachWriter, Row, SQLHelper}
 import org.apache.spark.sql.connect.client.util.RemoteSparkSession
-import org.apache.spark.sql.functions.col
-import org.apache.spark.sql.functions.window
+//import org.apache.spark.sql.functions.col
+//import org.apache.spark.sql.functions.window
 
 class StreamingQuerySuite extends RemoteSparkSession with SQLHelper {
 
-  test("Streaming API with windowed aggregate query") {
-    // This verifies standard streaming API by starting a streaming query with 
windowed count.
-    withSQLConf(
-      "spark.sql.shuffle.partitions" -> "1" // Avoid too many reducers.
-    ) {
-      val readDF = spark.readStream
-        .format("rate")
-        .option("rowsPerSecond", "10")
-        .option("numPartitions", "1")
-        .load()
-
-      // Verify schema (results in sending an RPC)
-      assert(readDF.schema.toDDL == "timestamp TIMESTAMP,value BIGINT")
-
-      val countsDF = readDF
-        .withWatermark("timestamp", "10 seconds")
-        .groupBy(window(col("timestamp"), "5 seconds"))
-        .count()
-        .selectExpr("window.start as timestamp", "count as num_events")
-
-      assert(countsDF.schema.toDDL == "timestamp TIMESTAMP,num_events BIGINT 
NOT NULL")
-
-      // Start the query
-      val queryName = "sparkConnectStreamingQuery"
-
-      val query = countsDF.writeStream
-        .format("memory")
-        .queryName(queryName)
-        .trigger(Trigger.ProcessingTime("1 second"))
-        .start()
-
-      try {
-        // Verify some of the API.
-        assert(query.isActive)
-
-        eventually(timeout(10.seconds)) {
-          assert(query.status.isDataAvailable)
-          assert(query.recentProgress.nonEmpty) // Query made progress.
-        }
-
-        query.explain() // Prints the plan to console.
-        // Consider verifying explain output by capturing stdout similar to
-        // test("Dataset explain") in ClientE2ETestSuite.
-
-      } finally {
-        // Don't wait for any processed data. Otherwise the test could take 
multiple seconds.
-        query.stop()
+//  test("Streaming API with windowed aggregate query") {
+//    // This verifies standard streaming API by starting a streaming query 
with windowed count.
+//    withSQLConf(
+//      "spark.sql.shuffle.partitions" -> "1" // Avoid too many reducers.
+//    ) {
+//      val readDF = spark.readStream
+//        .format("rate")
+//        .option("rowsPerSecond", "10")
+//        .option("numPartitions", "1")
+//        .load()
+//
+//      // Verify schema (results in sending an RPC)
+//      assert(readDF.schema.toDDL == "timestamp TIMESTAMP,value BIGINT")
+//
+//      val countsDF = readDF
+//        .withWatermark("timestamp", "10 seconds")
+//        .groupBy(window(col("timestamp"), "5 seconds"))
+//        .count()
+//        .selectExpr("window.start as timestamp", "count as num_events")
+//
+//      assert(countsDF.schema.toDDL == "timestamp TIMESTAMP,num_events BIGINT 
NOT NULL")
+//
+//      // Start the query
+//      val queryName = "sparkConnectStreamingQuery"
+//
+//      val query = countsDF.writeStream
+//        .format("memory")
+//        .queryName(queryName)
+//        .trigger(Trigger.ProcessingTime("1 second"))
+//        .start()
+//
+//      try {
+//        // Verify some of the API.
+//        assert(query.isActive)
+//
+//        eventually(timeout(10.seconds)) {
+//          assert(query.status.isDataAvailable)
+//          assert(query.recentProgress.nonEmpty) // Query made progress.
+//        }
+//
+//        query.explain() // Prints the plan to console.
+//        // Consider verifying explain output by capturing stdout similar to
+//        // test("Dataset explain") in ClientE2ETestSuite.
+//
+//      } finally {
+//        // Don't wait for any processed data. Otherwise the test could take 
multiple seconds.
+//        query.stop()
+//
+//        // The query should still be accessible after stopped.
+//        assert(!query.isActive)
+//        assert(query.recentProgress.nonEmpty)
+//      }
+//    }
+//  }
+
+//  test("Streaming table API") {
+//    withSQLConf(
+//      "spark.sql.shuffle.partitions" -> "1" // Avoid too many reducers.
+//    ) {
+//      spark.sql("DROP TABLE IF EXISTS my_table")
+//
+//      withTempPath { ckpt =>
+//        val q1 = spark.readStream
+//          .format("rate")
+//          .load()
+//          .writeStream
+//          .option("checkpointLocation", ckpt.getCanonicalPath)
+//          .toTable("my_table")
+//
+//        val q2 = spark.readStream
+//          .table("my_table")
+//          .writeStream
+//          .format("memory")
+//          .queryName("my_sink")
+//          .start()
+//
+//        try {
+//          q1.processAllAvailable()
+//          q2.processAllAvailable()
+//          eventually(timeout(10.seconds)) {
+//            assert(spark.table("my_sink").count() > 0)
+//          }
+//        } finally {
+//          q1.stop()
+//          q2.stop()
+//          spark.sql("DROP TABLE my_table")
+//        }
+//      }
+//    }
+//  }
+
+//  test("awaitTermination") {
+//    withSQLConf(
+//      "spark.sql.shuffle.partitions" -> "1" // Avoid too many reducers.
+//    ) {
+//      val q = spark.readStream
+//        .format("rate")
+//        .load()
+//        .writeStream
+//        .format("memory")
+//        .queryName("test")
+//        .start()
+//
+//      val start = System.nanoTime
+//      val terminated = q.awaitTermination(500)
+//      val end = System.nanoTime
+//      assert((end - start) / 1e6 >= 500)
+//      assert(!terminated)
+//
+//      q.stop()
+//      // TODO (SPARK-43032): uncomment below
+//      // eventually(timeout(1.minute)) {
+//      // q.awaitTermination()
+//      // }
+//    }
+//  }
+
+  test("foreach") {
+    withTempPath {
+      f =>
+        val path = f.getCanonicalPath + "/output"

Review Comment:
   likely not work as foreach runs on worker



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to