This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 3cd7f1166fc [SPARK-41249][SS][TEST] Add acceptance test for self-union 
on streaming query
3cd7f1166fc is described below

commit 3cd7f1166fc949abd79d3fb430e855b2925b038c
Author: Jungtaek Lim <kabhwan.opensou...@gmail.com>
AuthorDate: Thu Nov 24 20:27:18 2022 +0900

    [SPARK-41249][SS][TEST] Add acceptance test for self-union on streaming 
query
    
    ### What changes were proposed in this pull request?
    
    This PR proposes to add a new test suite specifically for self-union tests 
on streaming query. The test cases are acceptance tests for 4 different cases, 
DSv1 vs DSv2 / DataStreamReader API vs table API.
    
    ### Why are the changes needed?
    
    This PR brings more test coverage on streaming workloads. We should have 
caught an issue during the work of 
[SPARK-39564](https://issues.apache.org/jira/browse/SPARK-39564) if we had this 
test suite.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    New test suite.
    
    Closes #38785 from HeartSaVioR/SPARK-41249.
    
    Authored-by: Jungtaek Lim <kabhwan.opensou...@gmail.com>
    Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com>
---
 .../sql/streaming/StreamingSelfUnionSuite.scala    | 160 +++++++++++++++++++++
 1 file changed, 160 insertions(+)

diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingSelfUnionSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingSelfUnionSuite.scala
new file mode 100644
index 00000000000..8f099c31e6b
--- /dev/null
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingSelfUnionSuite.scala
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming
+
+import org.scalatest.BeforeAndAfter
+import org.scalatest.concurrent.PatienceConfiguration.Timeout
+
+import org.apache.spark.sql.SaveMode
+import org.apache.spark.sql.connector.catalog.Identifier
+import org.apache.spark.sql.execution.streaming.MemoryStream
+import org.apache.spark.sql.streaming.test.{InMemoryStreamTable, 
InMemoryStreamTableCatalog}
+import org.apache.spark.sql.streaming.util.StreamManualClock
+import org.apache.spark.sql.types.{LongType, StructField, StructType}
+
+class StreamingSelfUnionSuite extends StreamTest with BeforeAndAfter {
+
+  import testImplicits._
+  import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
+
+  before {
+    spark.conf.set("spark.sql.catalog.teststream", 
classOf[InMemoryStreamTableCatalog].getName)
+  }
+
+  after {
+    spark.sessionState.catalogManager.reset()
+    spark.sessionState.conf.clear()
+    sqlContext.streams.active.foreach(_.stop())
+  }
+
+  test("self-union, DSv1, read via DataStreamReader API") {
+    withTempPath { dir =>
+      val dataLocation = dir.getAbsolutePath
+      spark.range(1, 4).write.format("parquet").save(dataLocation)
+
+      val streamDf = spark.readStream.format("parquet")
+        .schema(StructType(Seq(StructField("id", 
LongType)))).load(dataLocation)
+      val unionedDf = streamDf.union(streamDf)
+
+      testStream(unionedDf)(
+        ProcessAllAvailable(),
+        CheckLastBatch(1, 2, 3, 1, 2, 3),
+        AssertOnQuery { q =>
+          val lastProgress = getLastProgressWithData(q)
+          assert(lastProgress.nonEmpty)
+          assert(lastProgress.get.numInputRows == 6)
+          assert(lastProgress.get.sources.length == 1)
+          assert(lastProgress.get.sources(0).numInputRows == 6)
+          true
+        }
+      )
+    }
+  }
+
+  test("self-union, DSv1, read via table API") {
+    withTable("parquet_streaming_tbl") {
+      spark.sql("CREATE TABLE parquet_streaming_tbl (key integer) USING 
parquet")
+
+      val streamDf = spark.readStream.table("parquet_streaming_tbl")
+      val unionedDf = streamDf.union(streamDf)
+
+      val clock = new StreamManualClock()
+      testStream(unionedDf)(
+        StartStream(triggerClock = clock, trigger = 
Trigger.ProcessingTime(100)),
+        Execute { _ =>
+          spark.range(1, 4).selectExpr("id AS key")
+            
.write.format("parquet").mode(SaveMode.Append).saveAsTable("parquet_streaming_tbl")
+        },
+        AdvanceManualClock(150),
+        waitUntilBatchProcessed(clock),
+        CheckLastBatch(1, 2, 3, 1, 2, 3),
+        AssertOnQuery { q =>
+          val lastProgress = getLastProgressWithData(q)
+          assert(lastProgress.nonEmpty)
+          assert(lastProgress.get.numInputRows == 6)
+          assert(lastProgress.get.sources.length == 1)
+          assert(lastProgress.get.sources(0).numInputRows == 6)
+          true
+        }
+      )
+    }
+  }
+
+  test("self-union, DSv2, read via DataStreamReader API") {
+    val inputData = MemoryStream[Int]
+
+    val streamDf = inputData.toDF()
+    val unionedDf = streamDf.union(streamDf)
+
+    testStream(unionedDf)(
+      AddData(inputData, 1, 2, 3),
+      CheckLastBatch(1, 2, 3, 1, 2, 3),
+      AssertOnQuery { q =>
+        val lastProgress = getLastProgressWithData(q)
+        assert(lastProgress.nonEmpty)
+        assert(lastProgress.get.numInputRows == 6)
+        assert(lastProgress.get.sources.length == 1)
+        assert(lastProgress.get.sources(0).numInputRows == 6)
+        true
+      }
+    )
+  }
+
+  test("self-union, DSv2, read via table API") {
+    val tblName = "teststream.table_name"
+    withTable(tblName) {
+      spark.sql(s"CREATE TABLE $tblName (data int) USING foo")
+      val stream = MemoryStream[Int]
+      val testCatalog = 
spark.sessionState.catalogManager.catalog("teststream").asTableCatalog
+      val table = testCatalog.loadTable(Identifier.of(Array(), "table_name"))
+      table.asInstanceOf[InMemoryStreamTable].setStream(stream)
+
+      val streamDf = spark.readStream.table(tblName)
+      val unionedDf = streamDf.union(streamDf)
+
+      testStream(unionedDf) (
+        AddData(stream, 1, 2, 3),
+        CheckLastBatch(1, 2, 3, 1, 2, 3),
+        AssertOnQuery { q =>
+          val lastProgress = getLastProgressWithData(q)
+          assert(lastProgress.nonEmpty)
+          assert(lastProgress.get.numInputRows == 6)
+          assert(lastProgress.get.sources.length == 1)
+          assert(lastProgress.get.sources(0).numInputRows == 6)
+          true
+        }
+      )
+    }
+  }
+
+  private def waitUntilBatchProcessed(clock: StreamManualClock) = 
AssertOnQuery { q =>
+    eventually(Timeout(streamingTimeout)) {
+      if (!q.exception.isDefined) {
+        assert(clock.isStreamWaitingAt(clock.getTimeMillis()))
+      }
+    }
+    if (q.exception.isDefined) {
+      throw q.exception.get
+    }
+    true
+  }
+
+  private def getLastProgressWithData(q: StreamingQuery): 
Option[StreamingQueryProgress] = {
+    q.recentProgress.filter(_.numInputRows > 0).lastOption
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to