WweiL commented on code in PR #41129:
URL: https://github.com/apache/spark/pull/41129#discussion_r1190646225

@@ -17,131 +17,172 @@
 package org.apache.spark.sql.streaming
-import org.scalatest.concurrent.Eventually.eventually
-import org.scalatest.concurrent.Futures.timeout
-import org.scalatest.time.SpanSugar._
+import java.io.FileWriter
-import org.apache.spark.sql.SQLHelper
+//import org.scalatest.concurrent.Eventually.eventually
+//import org.scalatest.concurrent.Futures.timeout
+//import org.scalatest.time.SpanSugar._
+import org.apache.spark.sql.{ForeachWriter, Row, SQLHelper}
 import org.apache.spark.sql.connect.client.util.RemoteSparkSession
-import org.apache.spark.sql.functions.col
-import org.apache.spark.sql.functions.window
+//import org.apache.spark.sql.functions.col
+//import org.apache.spark.sql.functions.window
 class StreamingQuerySuite extends RemoteSparkSession with SQLHelper {
-  test("Streaming API with windowed aggregate query") {
-    // This verifies standard streaming API by starting a streaming query with 
windowed count.
-    withSQLConf(
-      "spark.sql.shuffle.partitions" -> "1" // Avoid too many reducers.
-    ) {
-      val readDF = spark.readStream
-        .format("rate")
-        .option("rowsPerSecond", "10")
-        .option("numPartitions", "1")
-        .load()
-      // Verify schema (results in sending an RPC)
-      assert(readDF.schema.toDDL == "timestamp TIMESTAMP,value BIGINT")
-      val countsDF = readDF
-        .withWatermark("timestamp", "10 seconds")
-        .groupBy(window(col("timestamp"), "5 seconds"))
-        .count()
-        .selectExpr("window.start as timestamp", "count as num_events")
-      assert(countsDF.schema.toDDL == "timestamp TIMESTAMP,num_events BIGINT 
-      // Start the query
-      val queryName = "sparkConnectStreamingQuery"
-      val query = countsDF.writeStream
-        .format("memory")
-        .queryName(queryName)
-        .trigger(Trigger.ProcessingTime("1 second"))
-        .start()
-      try {
-        // Verify some of the API.
-        assert(query.isActive)
-        eventually(timeout(10.seconds)) {
-          assert(query.status.isDataAvailable)
-          assert(query.recentProgress.nonEmpty) // Query made progress.
-        }
-        query.explain() // Prints the plan to console.
-        // Consider verifying explain output by capturing stdout similar to
-        // test("Dataset explain") in ClientE2ETestSuite.
-      } finally {
-        // Don't wait for any processed data. Otherwise the test could take 
multiple seconds.
-        query.stop()
+//  test("Streaming API with windowed aggregate query") {
+//    // This verifies standard streaming API by starting a streaming query 
with windowed count.
+//    withSQLConf(
+//      "spark.sql.shuffle.partitions" -> "1" // Avoid too many reducers.
+//    ) {
+//      val readDF = spark.readStream
+//        .format("rate")
+//        .option("rowsPerSecond", "10")
+//        .option("numPartitions", "1")
+//        .load()
+//      // Verify schema (results in sending an RPC)
+//      assert(readDF.schema.toDDL == "timestamp TIMESTAMP,value BIGINT")
+//      val countsDF = readDF
+//        .withWatermark("timestamp", "10 seconds")
+//        .groupBy(window(col("timestamp"), "5 seconds"))
+//        .count()
+//        .selectExpr("window.start as timestamp", "count as num_events")
+//      assert(countsDF.schema.toDDL == "timestamp TIMESTAMP,num_events BIGINT 
+//      // Start the query
+//      val queryName = "sparkConnectStreamingQuery"
+//      val query = countsDF.writeStream
+//        .format("memory")
+//        .queryName(queryName)
+//        .trigger(Trigger.ProcessingTime("1 second"))
+//        .start()
+//      try {
+//        // Verify some of the API.
+//        assert(query.isActive)
+//        eventually(timeout(10.seconds)) {
+//          assert(query.status.isDataAvailable)
+//          assert(query.recentProgress.nonEmpty) // Query made progress.
+//        }
+//        query.explain() // Prints the plan to console.
+//        // Consider verifying explain output by capturing stdout similar to
+//        // test("Dataset explain") in ClientE2ETestSuite.
+//      } finally {
+//        // Don't wait for any processed data. Otherwise the test could take 
multiple seconds.
+//        query.stop()
+//        // The query should still be accessible after stopped.
+//        assert(!query.isActive)
+//        assert(query.recentProgress.nonEmpty)
+//      }
+//    }
+//  }
+//  test("Streaming table API") {
+//    withSQLConf(
+//      "spark.sql.shuffle.partitions" -> "1" // Avoid too many reducers.
+//    ) {
+//      spark.sql("DROP TABLE IF EXISTS my_table")
+//      withTempPath { ckpt =>
+//        val q1 = spark.readStream
+//          .format("rate")
+//          .load()
+//          .writeStream
+//          .option("checkpointLocation", ckpt.getCanonicalPath)
+//          .toTable("my_table")
+//        val q2 = spark.readStream
+//          .table("my_table")
+//          .writeStream
+//          .format("memory")
+//          .queryName("my_sink")
+//          .start()
+//        try {
+//          q1.processAllAvailable()
+//          q2.processAllAvailable()
+//          eventually(timeout(10.seconds)) {
+//            assert(spark.table("my_sink").count() > 0)
+//          }
+//        } finally {
+//          q1.stop()
+//          q2.stop()
+//          spark.sql("DROP TABLE my_table")
+//        }
+//      }
+//    }
+//  }
+//  test("awaitTermination") {
+//    withSQLConf(
+//      "spark.sql.shuffle.partitions" -> "1" // Avoid too many reducers.
+//    ) {
+//      val q = spark.readStream
+//        .format("rate")
+//        .load()
+//        .writeStream
+//        .format("memory")
+//        .queryName("test")
+//        .start()
+//      val start = System.nanoTime
+//      val terminated = q.awaitTermination(500)
+//      val end = System.nanoTime
+//      assert((end - start) / 1e6 >= 500)
+//      assert(!terminated)
+//      q.stop()
+//      // TODO (SPARK-43032): uncomment below
+//      // eventually(timeout(1.minute)) {
+//      // q.awaitTermination()
+//      // }
+//    }
+//  }
+  test("foreach") {
+    withTempPath {
+      f =>
+        val path = f.getCanonicalPath + "/output"

Review Comment:
   likely not work as foreach runs on worker

Reply via email to