Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/19984#discussion_r158397203 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala --- @@ -0,0 +1,336 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.streaming.continuous + +import java.io.{File, InterruptedIOException, IOException, UncheckedIOException} +import java.nio.channels.ClosedByInterruptException +import java.util.concurrent.{CountDownLatch, ExecutionException, TimeoutException, TimeUnit} + +import scala.reflect.ClassTag +import scala.util.control.ControlThrowable + +import com.google.common.util.concurrent.UncheckedExecutionException +import org.apache.commons.io.FileUtils +import org.apache.hadoop.conf.Configuration + +import org.apache.spark.{SparkContext, SparkEnv} +import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart} +import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.plans.logical.Range +import org.apache.spark.sql.catalyst.streaming.InternalOutputModes +import org.apache.spark.sql.execution.command.ExplainCommand +import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2ScanExec, WriteToDataSourceV2Exec} +import org.apache.spark.sql.execution.streaming._ +import org.apache.spark.sql.execution.streaming.continuous._ +import org.apache.spark.sql.execution.streaming.sources.MemorySinkV2 +import org.apache.spark.sql.execution.streaming.state.{StateStore, StateStoreConf, StateStoreId, StateStoreProvider} +import org.apache.spark.sql.functions._ +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.sources.StreamSourceProvider +import org.apache.spark.sql.streaming.{StreamTest, Trigger} +import org.apache.spark.sql.streaming.util.StreamManualClock +import org.apache.spark.sql.test.TestSparkSession +import org.apache.spark.sql.types._ +import org.apache.spark.util.Utils + +class ContinuousSuiteBase extends StreamTest { + // We need more than the default local[2] to be able to schedule all partitions simultaneously. + override protected def createSparkSession = new TestSparkSession( + new SparkContext( + "local[10]", + "continuous-stream-test-sql-context", + sparkConf.set("spark.sql.testkey", "true"))) + + protected def waitForRateSourceTriggers(query: StreamExecution, numTriggers: Int): Unit = { + query match { + case s: ContinuousExecution => + assert(numTriggers >= 2, "must wait for at least 2 triggers to ensure query is initialized") + val reader = s.lastExecution.executedPlan.collectFirst { + case DataSourceV2ScanExec(_, r: ContinuousRateStreamReader) => r + }.get + + val deltaMs = numTriggers * 1000 + 300 + while (System.currentTimeMillis < reader.creationTime + deltaMs) { + Thread.sleep(reader.creationTime + deltaMs - System.currentTimeMillis) + } + } + } + + // A continuous trigger that will only fire the initial time for the duration of a test. + // This allows clean testing with manual epoch advancement. + protected val longContinuousTrigger = Trigger.Continuous("1 hour") +} + +class ContinuousSuite extends ContinuousSuiteBase { + import testImplicits._ + + test("basic rate source") { + val df = spark.readStream + .format("rate") + .option("numPartitions", "5") + .option("rowsPerSecond", "5") + .load() + .select('value) + + testStream(df, useV2Sink = true)( + StartStream(longContinuousTrigger), + AwaitEpoch(0), + Execute(waitForRateSourceTriggers(_, 2)), + IncrementEpoch(), + CheckAnswer(scala.Range(0, 10): _*), --- End diff -- let's not do an exact match check here.
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org