Repository: spark
Updated Branches:
  refs/heads/master a9aacdf1c -> 8ed044928


[SPARK-25204][SS] Fix race in rate source test.

## What changes were proposed in this pull request?

Fix a race in the rate source tests. We need a better way of testing restart 
behavior.

## How was this patch tested?

unit test

Closes #22191 from jose-torres/racetest.

Authored-by: Jose Torres <torres.joseph.f+git...@gmail.com>
Signed-off-by: Tathagata Das <tathagata.das1...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8ed04492
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8ed04492
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8ed04492

Branch: refs/heads/master
Commit: 8ed0449285507459bbd00752338ed3242427a14f
Parents: a9aacdf
Author: Jose Torres <torres.joseph.f+git...@gmail.com>
Authored: Thu Aug 23 12:14:27 2018 -0700
Committer: Tathagata Das <tathagata.das1...@gmail.com>
Committed: Thu Aug 23 12:14:27 2018 -0700

----------------------------------------------------------------------
 .../sources/RateStreamProviderSuite.scala       | 40 ++++++++++++++++++--
 .../apache/spark/sql/streaming/StreamTest.scala |  5 ++-
 2 files changed, 40 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/8ed04492/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProviderSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProviderSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProviderSuite.scala
index 9c1756d..dd74af8 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProviderSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProviderSuite.scala
@@ -22,6 +22,7 @@ import java.util.concurrent.TimeUnit
 import scala.collection.JavaConverters._
 import scala.collection.mutable.ArrayBuffer
 
+import org.apache.spark.sql.Row
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.execution.datasources.DataSource
 import org.apache.spark.sql.execution.streaming._
@@ -81,12 +82,43 @@ class RateSourceSuite extends StreamTest {
       .load()
     testStream(input)(
       AdvanceRateManualClock(seconds = 1),
-      CheckLastBatch((0 until 10).map(v => new java.sql.Timestamp(v * 100L) -> 
v): _*),
+      CheckLastBatch((0 until 10).map(v => new java.sql.Timestamp(v * 100L) -> 
v): _*)
+    )
+  }
+
+  test("microbatch - restart") {
+    val input = spark.readStream
+      .format("rate")
+      .option("rowsPerSecond", "10")
+      .load()
+      .select('value)
+
+    var streamDuration = 0
+
+    // Microbatch rate stream offsets contain the number of seconds since the 
beginning of
+    // the stream.
+    def updateStreamDurationFromOffset(s: StreamExecution, expectedMin: Int): 
Unit = {
+      streamDuration = s.lastProgress.sources(0).endOffset.toInt
+      assert(streamDuration >= expectedMin)
+    }
+
+    // We have to use the lambda version of CheckAnswer because we don't know 
the right range
+    // until we see the last offset.
+    def expectedResultsFromDuration(rows: Seq[Row]): Unit = {
+      assert(rows.map(_.getLong(0)).sorted == (0 until (streamDuration * 10)))
+    }
+
+    testStream(input)(
+      StartStream(),
+      Execute(_.awaitOffset(0, LongOffset(2), streamingTimeout.toMillis)),
       StopStream,
+      Execute(updateStreamDurationFromOffset(_, 2)),
+      CheckAnswer(expectedResultsFromDuration _),
       StartStream(),
-      // Advance 2 seconds because creating a new RateSource will also create 
a new ManualClock
-      AdvanceRateManualClock(seconds = 2),
-      CheckLastBatch((10 until 20).map(v => new java.sql.Timestamp(v * 100L) 
-> v): _*)
+      Execute(_.awaitOffset(0, LongOffset(4), streamingTimeout.toMillis)),
+      StopStream,
+      Execute(updateStreamDurationFromOffset(_, 4)),
+      CheckAnswer(expectedResultsFromDuration _)
     )
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/8ed04492/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
index cd9b892..491dc34 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
@@ -735,7 +735,10 @@ trait StreamTest extends QueryTest with SharedSQLContext 
with TimeLimits with Be
           }
 
         case CheckAnswerRowsByFunc(globalCheckFunction, lastOnly) =>
-          val sparkAnswer = fetchStreamAnswer(currentStream, lastOnly)
+          val sparkAnswer = currentStream match {
+            case null => fetchStreamAnswer(lastStream, lastOnly)
+            case s => fetchStreamAnswer(s, lastOnly)
+          }
           try {
             globalCheckFunction(sparkAnswer)
           } catch {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to