WweiL commented on code in PR #41129: URL: https://github.com/apache/spark/pull/41129#discussion_r1190646225
########## connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala: ########## @@ -17,131 +17,172 @@ package org.apache.spark.sql.streaming -import org.scalatest.concurrent.Eventually.eventually -import org.scalatest.concurrent.Futures.timeout -import org.scalatest.time.SpanSugar._ +import java.io.FileWriter -import org.apache.spark.sql.SQLHelper +//import org.scalatest.concurrent.Eventually.eventually +//import org.scalatest.concurrent.Futures.timeout +//import org.scalatest.time.SpanSugar._ + +import org.apache.spark.sql.{ForeachWriter, Row, SQLHelper} import org.apache.spark.sql.connect.client.util.RemoteSparkSession -import org.apache.spark.sql.functions.col -import org.apache.spark.sql.functions.window +//import org.apache.spark.sql.functions.col +//import org.apache.spark.sql.functions.window class StreamingQuerySuite extends RemoteSparkSession with SQLHelper { - test("Streaming API with windowed aggregate query") { - // This verifies standard streaming API by starting a streaming query with windowed count. - withSQLConf( - "spark.sql.shuffle.partitions" -> "1" // Avoid too many reducers. - ) { - val readDF = spark.readStream - .format("rate") - .option("rowsPerSecond", "10") - .option("numPartitions", "1") - .load() - - // Verify schema (results in sending an RPC) - assert(readDF.schema.toDDL == "timestamp TIMESTAMP,value BIGINT") - - val countsDF = readDF - .withWatermark("timestamp", "10 seconds") - .groupBy(window(col("timestamp"), "5 seconds")) - .count() - .selectExpr("window.start as timestamp", "count as num_events") - - assert(countsDF.schema.toDDL == "timestamp TIMESTAMP,num_events BIGINT NOT NULL") - - // Start the query - val queryName = "sparkConnectStreamingQuery" - - val query = countsDF.writeStream - .format("memory") - .queryName(queryName) - .trigger(Trigger.ProcessingTime("1 second")) - .start() - - try { - // Verify some of the API. - assert(query.isActive) - - eventually(timeout(10.seconds)) { - assert(query.status.isDataAvailable) - assert(query.recentProgress.nonEmpty) // Query made progress. - } - - query.explain() // Prints the plan to console. - // Consider verifying explain output by capturing stdout similar to - // test("Dataset explain") in ClientE2ETestSuite. - - } finally { - // Don't wait for any processed data. Otherwise the test could take multiple seconds. - query.stop() +// test("Streaming API with windowed aggregate query") { +// // This verifies standard streaming API by starting a streaming query with windowed count. +// withSQLConf( +// "spark.sql.shuffle.partitions" -> "1" // Avoid too many reducers. +// ) { +// val readDF = spark.readStream +// .format("rate") +// .option("rowsPerSecond", "10") +// .option("numPartitions", "1") +// .load() +// +// // Verify schema (results in sending an RPC) +// assert(readDF.schema.toDDL == "timestamp TIMESTAMP,value BIGINT") +// +// val countsDF = readDF +// .withWatermark("timestamp", "10 seconds") +// .groupBy(window(col("timestamp"), "5 seconds")) +// .count() +// .selectExpr("window.start as timestamp", "count as num_events") +// +// assert(countsDF.schema.toDDL == "timestamp TIMESTAMP,num_events BIGINT NOT NULL") +// +// // Start the query +// val queryName = "sparkConnectStreamingQuery" +// +// val query = countsDF.writeStream +// .format("memory") +// .queryName(queryName) +// .trigger(Trigger.ProcessingTime("1 second")) +// .start() +// +// try { +// // Verify some of the API. +// assert(query.isActive) +// +// eventually(timeout(10.seconds)) { +// assert(query.status.isDataAvailable) +// assert(query.recentProgress.nonEmpty) // Query made progress. +// } +// +// query.explain() // Prints the plan to console. +// // Consider verifying explain output by capturing stdout similar to +// // test("Dataset explain") in ClientE2ETestSuite. +// +// } finally { +// // Don't wait for any processed data. Otherwise the test could take multiple seconds. +// query.stop() +// +// // The query should still be accessible after stopped. +// assert(!query.isActive) +// assert(query.recentProgress.nonEmpty) +// } +// } +// } + +// test("Streaming table API") { +// withSQLConf( +// "spark.sql.shuffle.partitions" -> "1" // Avoid too many reducers. +// ) { +// spark.sql("DROP TABLE IF EXISTS my_table") +// +// withTempPath { ckpt => +// val q1 = spark.readStream +// .format("rate") +// .load() +// .writeStream +// .option("checkpointLocation", ckpt.getCanonicalPath) +// .toTable("my_table") +// +// val q2 = spark.readStream +// .table("my_table") +// .writeStream +// .format("memory") +// .queryName("my_sink") +// .start() +// +// try { +// q1.processAllAvailable() +// q2.processAllAvailable() +// eventually(timeout(10.seconds)) { +// assert(spark.table("my_sink").count() > 0) +// } +// } finally { +// q1.stop() +// q2.stop() +// spark.sql("DROP TABLE my_table") +// } +// } +// } +// } + +// test("awaitTermination") { +// withSQLConf( +// "spark.sql.shuffle.partitions" -> "1" // Avoid too many reducers. +// ) { +// val q = spark.readStream +// .format("rate") +// .load() +// .writeStream +// .format("memory") +// .queryName("test") +// .start() +// +// val start = System.nanoTime +// val terminated = q.awaitTermination(500) +// val end = System.nanoTime +// assert((end - start) / 1e6 >= 500) +// assert(!terminated) +// +// q.stop() +// // TODO (SPARK-43032): uncomment below +// // eventually(timeout(1.minute)) { +// // q.awaitTermination() +// // } +// } +// } + + test("foreach") { + withTempPath { + f => + val path = f.getCanonicalPath + "/output" Review Comment: likely not work as foreach runs on worker -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org