Repository: spark Updated Branches: refs/heads/master a9aacdf1c -> 8ed044928
[SPARK-25204][SS] Fix race in rate source test. ## What changes were proposed in this pull request? Fix a race in the rate source tests. We need a better way of testing restart behavior. ## How was this patch tested? unit test Closes #22191 from jose-torres/racetest. Authored-by: Jose Torres <torres.joseph.f+git...@gmail.com> Signed-off-by: Tathagata Das <tathagata.das1...@gmail.com> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8ed04492 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8ed04492 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8ed04492 Branch: refs/heads/master Commit: 8ed0449285507459bbd00752338ed3242427a14f Parents: a9aacdf Author: Jose Torres <torres.joseph.f+git...@gmail.com> Authored: Thu Aug 23 12:14:27 2018 -0700 Committer: Tathagata Das <tathagata.das1...@gmail.com> Committed: Thu Aug 23 12:14:27 2018 -0700 ---------------------------------------------------------------------- .../sources/RateStreamProviderSuite.scala | 40 ++++++++++++++++++-- .../apache/spark/sql/streaming/StreamTest.scala | 5 ++- 2 files changed, 40 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/8ed04492/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProviderSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProviderSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProviderSuite.scala index 9c1756d..dd74af8 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProviderSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProviderSuite.scala @@ -22,6 +22,7 @@ import java.util.concurrent.TimeUnit import scala.collection.JavaConverters._ import scala.collection.mutable.ArrayBuffer +import org.apache.spark.sql.Row import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.execution.datasources.DataSource import org.apache.spark.sql.execution.streaming._ @@ -81,12 +82,43 @@ class RateSourceSuite extends StreamTest { .load() testStream(input)( AdvanceRateManualClock(seconds = 1), - CheckLastBatch((0 until 10).map(v => new java.sql.Timestamp(v * 100L) -> v): _*), + CheckLastBatch((0 until 10).map(v => new java.sql.Timestamp(v * 100L) -> v): _*) + ) + } + + test("microbatch - restart") { + val input = spark.readStream + .format("rate") + .option("rowsPerSecond", "10") + .load() + .select('value) + + var streamDuration = 0 + + // Microbatch rate stream offsets contain the number of seconds since the beginning of + // the stream. + def updateStreamDurationFromOffset(s: StreamExecution, expectedMin: Int): Unit = { + streamDuration = s.lastProgress.sources(0).endOffset.toInt + assert(streamDuration >= expectedMin) + } + + // We have to use the lambda version of CheckAnswer because we don't know the right range + // until we see the last offset. + def expectedResultsFromDuration(rows: Seq[Row]): Unit = { + assert(rows.map(_.getLong(0)).sorted == (0 until (streamDuration * 10))) + } + + testStream(input)( + StartStream(), + Execute(_.awaitOffset(0, LongOffset(2), streamingTimeout.toMillis)), StopStream, + Execute(updateStreamDurationFromOffset(_, 2)), + CheckAnswer(expectedResultsFromDuration _), StartStream(), - // Advance 2 seconds because creating a new RateSource will also create a new ManualClock - AdvanceRateManualClock(seconds = 2), - CheckLastBatch((10 until 20).map(v => new java.sql.Timestamp(v * 100L) -> v): _*) + Execute(_.awaitOffset(0, LongOffset(4), streamingTimeout.toMillis)), + StopStream, + Execute(updateStreamDurationFromOffset(_, 4)), + CheckAnswer(expectedResultsFromDuration _) ) } http://git-wip-us.apache.org/repos/asf/spark/blob/8ed04492/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala index cd9b892..491dc34 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala @@ -735,7 +735,10 @@ trait StreamTest extends QueryTest with SharedSQLContext with TimeLimits with Be } case CheckAnswerRowsByFunc(globalCheckFunction, lastOnly) => - val sparkAnswer = fetchStreamAnswer(currentStream, lastOnly) + val sparkAnswer = currentStream match { + case null => fetchStreamAnswer(lastStream, lastOnly) + case s => fetchStreamAnswer(s, lastOnly) + } try { globalCheckFunction(sparkAnswer) } catch { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org