Repository: spark
Updated Branches:
  refs/heads/master d610d2a3f -> 3fabbc576


[SPARK-24040][SS] Support single partition aggregates in continuous processing.

## What changes were proposed in this pull request?

Support aggregates with exactly 1 partition in continuous processing.

A few small tweaks are needed to make this work:

* Replace currentEpoch tracking with an ThreadLocal. This means that current 
epoch is scoped to a task rather than a node, but I think that's sustainable 
even once we add shuffle.
* Add a new testing-only flag to disable the UnsupportedOperationChecker 
whitelist of allowed continuous processing nodes. I think this is preferable to 
writing a pile of custom logic to enforce that there is in fact only 1 
partition; we plan to support multi-partition aggregates before the next Spark 
release, so we'd just have to tear that logic back out.
* Restart continuous processing queries from the first available uncommitted 
epoch, rather than one that's guaranteed to be unused. This is required for 
stateful operators to overwrite partial state from the previous attempt at the 
epoch, and there was no specific motivation for the original strategy. In 
another PR before stabilizing the StreamWriter API, we'll need to narrow down 
and document more precise semantic guarantees for the epoch IDs.
* We need a single-partition ContinuousMemoryStream. The way MemoryStream is 
constructed means it can't be a text option like it is for rate source, 
unfortunately.

## How was this patch tested?

new unit tests

Author: Jose Torres <torres.joseph.f+git...@gmail.com>

Closes #21239 from jose-torres/withAggr.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3fabbc57
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3fabbc57
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3fabbc57

Branch: refs/heads/master
Commit: 3fabbc576203c7fd63808a259adafc5c3cea1838
Parents: d610d2a
Author: Jose Torres <torres.joseph.f+git...@gmail.com>
Authored: Tue May 15 10:25:29 2018 -0700
Committer: Shixiong Zhu <zsxw...@gmail.com>
Committed: Tue May 15 10:25:29 2018 -0700

----------------------------------------------------------------------
 .../analysis/UnsupportedOperationChecker.scala  |  1 +
 .../continuous/ContinuousExecution.scala        | 11 +--
 .../continuous/ContinuousQueuedDataReader.scala |  7 +-
 .../continuous/ContinuousWriteRDD.scala         | 18 +++--
 .../streaming/continuous/EpochTracker.scala     | 58 ++++++++++++++++
 .../sources/ContinuousMemoryStream.scala        | 14 ++--
 .../streaming/state/StateStoreRDD.scala         | 10 ++-
 .../sql/streaming/StreamingQueryManager.scala   |  4 +-
 .../continuous/ContinuousAggregationSuite.scala | 72 ++++++++++++++++++++
 .../ContinuousQueuedDataReaderSuite.scala       |  1 +
 10 files changed, 167 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/3fabbc57/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
index d3d6c63..2bed416 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
@@ -24,6 +24,7 @@ import 
org.apache.spark.sql.catalyst.planning.ExtractEquiJoinKeys
 import org.apache.spark.sql.catalyst.plans._
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.streaming.InternalOutputModes
+import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.streaming.OutputMode
 
 /**

http://git-wip-us.apache.org/repos/asf/spark/blob/3fabbc57/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
index f58146a..0e7d101 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
@@ -122,16 +122,7 @@ class ContinuousExecution(
             s"Batch $latestEpochId was committed without end epoch offsets!")
         }
         committedOffsets = nextOffsets.toStreamProgress(sources)
-
-        // Get to an epoch ID that has definitely never been sent to a sink 
before. Since sink
-        // commit happens between offset log write and commit log write, this 
means an epoch ID
-        // which is not in the offset log.
-        val (latestOffsetEpoch, _) = offsetLog.getLatest().getOrElse {
-          throw new IllegalStateException(
-            s"Offset log had no latest element. This shouldn't be possible 
because nextOffsets is" +
-              s"an element.")
-        }
-        currentBatchId = latestOffsetEpoch + 1
+        currentBatchId = latestEpochId + 1
 
         logDebug(s"Resuming at epoch $currentBatchId with committed offsets 
$committedOffsets")
         nextOffsets

http://git-wip-us.apache.org/repos/asf/spark/blob/3fabbc57/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousQueuedDataReader.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousQueuedDataReader.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousQueuedDataReader.scala
index d864557..f38577b 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousQueuedDataReader.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousQueuedDataReader.scala
@@ -46,8 +46,6 @@ class ContinuousQueuedDataReader(
   // Important sequencing - we must get our starting point before the provider 
threads start running
   private var currentOffset: PartitionOffset =
     ContinuousDataSourceRDD.getContinuousReader(reader).getOffset
-  private var currentEpoch: Long =
-    context.getLocalProperty(ContinuousExecution.START_EPOCH_KEY).toLong
 
   /**
    * The record types in the read buffer.
@@ -115,8 +113,7 @@ class ContinuousQueuedDataReader(
     currentEntry match {
       case EpochMarker =>
         epochCoordEndpoint.send(ReportPartitionOffset(
-          context.partitionId(), currentEpoch, currentOffset))
-        currentEpoch += 1
+          context.partitionId(), EpochTracker.getCurrentEpoch.get, 
currentOffset))
         null
       case ContinuousRow(row, offset) =>
         currentOffset = offset
@@ -184,7 +181,7 @@ class ContinuousQueuedDataReader(
 
     private val epochCoordEndpoint = EpochCoordinatorRef.get(
       context.getLocalProperty(ContinuousExecution.EPOCH_COORDINATOR_ID_KEY), 
SparkEnv.get)
-    // Note that this is *not* the same as the currentEpoch in 
[[ContinuousDataQueuedReader]]! That
+    // Note that this is *not* the same as the currentEpoch in 
[[ContinuousWriteRDD]]! That
     // field represents the epoch wrt the data being processed. The 
currentEpoch here is just a
     // counter to ensure we send the appropriate number of markers if we fall 
behind the driver.
     private var currentEpoch = 
context.getLocalProperty(ContinuousExecution.START_EPOCH_KEY).toLong

http://git-wip-us.apache.org/repos/asf/spark/blob/3fabbc57/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousWriteRDD.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousWriteRDD.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousWriteRDD.scala
index 91f1576..ef5f0da 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousWriteRDD.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousWriteRDD.scala
@@ -45,7 +45,8 @@ class ContinuousWriteRDD(var prev: RDD[InternalRow], 
writeTask: DataWriterFactor
     val epochCoordinator = EpochCoordinatorRef.get(
       context.getLocalProperty(ContinuousExecution.EPOCH_COORDINATOR_ID_KEY),
       SparkEnv.get)
-    var currentEpoch = 
context.getLocalProperty(ContinuousExecution.START_EPOCH_KEY).toLong
+    EpochTracker.initializeCurrentEpoch(
+      context.getLocalProperty(ContinuousExecution.START_EPOCH_KEY).toLong)
 
     while (!context.isInterrupted() && !context.isCompleted()) {
       var dataWriter: DataWriter[InternalRow] = null
@@ -54,19 +55,24 @@ class ContinuousWriteRDD(var prev: RDD[InternalRow], 
writeTask: DataWriterFactor
         try {
           val dataIterator = prev.compute(split, context)
           dataWriter = writeTask.createDataWriter(
-            context.partitionId(), context.attemptNumber(), currentEpoch)
+            context.partitionId(),
+            context.attemptNumber(),
+            EpochTracker.getCurrentEpoch.get)
           while (dataIterator.hasNext) {
             dataWriter.write(dataIterator.next())
           }
           logInfo(s"Writer for partition ${context.partitionId()} " +
-            s"in epoch $currentEpoch is committing.")
+            s"in epoch ${EpochTracker.getCurrentEpoch.get} is committing.")
           val msg = dataWriter.commit()
           epochCoordinator.send(
-            CommitPartitionEpoch(context.partitionId(), currentEpoch, msg)
+            CommitPartitionEpoch(
+              context.partitionId(),
+              EpochTracker.getCurrentEpoch.get,
+              msg)
           )
           logInfo(s"Writer for partition ${context.partitionId()} " +
-            s"in epoch $currentEpoch committed.")
-          currentEpoch += 1
+            s"in epoch ${EpochTracker.getCurrentEpoch.get} committed.")
+          EpochTracker.incrementCurrentEpoch()
         } catch {
           case _: InterruptedException =>
           // Continuous shutdown always involves an interrupt. Just finish the 
task.

http://git-wip-us.apache.org/repos/asf/spark/blob/3fabbc57/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/EpochTracker.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/EpochTracker.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/EpochTracker.scala
new file mode 100644
index 0000000..bc0ae42
--- /dev/null
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/EpochTracker.scala
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.continuous
+
+import java.util.concurrent.atomic.AtomicLong
+
+/**
+ * Tracks the current continuous processing epoch within a task. Call
+ * EpochTracker.getCurrentEpoch to get the current epoch.
+ */
+object EpochTracker {
+  // The current epoch. Note that this is a shared reference; 
ContinuousWriteRDD.compute() will
+  // update the underlying AtomicLong as it finishes epochs. Other code should 
only read the value.
+  private val currentEpoch: ThreadLocal[AtomicLong] = new 
ThreadLocal[AtomicLong] {
+    override def initialValue() = new AtomicLong(-1)
+  }
+
+  /**
+   * Get the current epoch for the current task, or None if the task has no 
current epoch.
+   */
+  def getCurrentEpoch: Option[Long] = {
+    currentEpoch.get().get() match {
+      case n if n < 0 => None
+      case e => Some(e)
+    }
+  }
+
+  /**
+   * Increment the current epoch for this task thread. Should be called by 
[[ContinuousWriteRDD]]
+   * between epochs.
+   */
+  def incrementCurrentEpoch(): Unit = {
+    currentEpoch.get().incrementAndGet()
+  }
+
+  /**
+   * Initialize the current epoch for this task thread. Should be called by 
[[ContinuousWriteRDD]]
+   * at the beginning of a task.
+   */
+  def initializeCurrentEpoch(startEpoch: Long): Unit = {
+    currentEpoch.get().set(startEpoch)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/3fabbc57/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ContinuousMemoryStream.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ContinuousMemoryStream.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ContinuousMemoryStream.scala
index fef792e..4daafa6 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ContinuousMemoryStream.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ContinuousMemoryStream.scala
@@ -47,10 +47,9 @@ import org.apache.spark.util.RpcUtils
  *    ContinuousMemoryStreamDataReader instances to poll. It returns the 
record at the specified
  *    offset within the list, or null if that offset doesn't yet have a record.
  */
-class ContinuousMemoryStream[A : Encoder](id: Int, sqlContext: SQLContext)
+class ContinuousMemoryStream[A : Encoder](id: Int, sqlContext: SQLContext, 
numPartitions: Int = 2)
   extends MemoryStreamBase[A](sqlContext) with ContinuousReader with 
ContinuousReadSupport {
   private implicit val formats = Serialization.formats(NoTypeHints)
-  private val NUM_PARTITIONS = 2
 
   protected val logicalPlan =
     StreamingRelationV2(this, "memory", Map(), attributes, 
None)(sqlContext.sparkSession)
@@ -58,7 +57,7 @@ class ContinuousMemoryStream[A : Encoder](id: Int, 
sqlContext: SQLContext)
   // ContinuousReader implementation
 
   @GuardedBy("this")
-  private val records = Seq.fill(NUM_PARTITIONS)(new ListBuffer[A])
+  private val records = Seq.fill(numPartitions)(new ListBuffer[A])
 
   @GuardedBy("this")
   private var startOffset: ContinuousMemoryStreamOffset = _
@@ -69,17 +68,17 @@ class ContinuousMemoryStream[A : Encoder](id: Int, 
sqlContext: SQLContext)
   def addData(data: TraversableOnce[A]): Offset = synchronized {
     // Distribute data evenly among partition lists.
     data.toSeq.zipWithIndex.map {
-      case (item, index) => records(index % NUM_PARTITIONS) += item
+      case (item, index) => records(index % numPartitions) += item
     }
 
     // The new target offset is the offset where all records in all partitions 
have been processed.
-    ContinuousMemoryStreamOffset((0 until NUM_PARTITIONS).map(i => (i, 
records(i).size)).toMap)
+    ContinuousMemoryStreamOffset((0 until numPartitions).map(i => (i, 
records(i).size)).toMap)
   }
 
   override def setStartOffset(start: Optional[Offset]): Unit = synchronized {
     // Inferred initial offset is position 0 in each partition.
     startOffset = start.orElse {
-      ContinuousMemoryStreamOffset((0 until NUM_PARTITIONS).map(i => (i, 
0)).toMap)
+      ContinuousMemoryStreamOffset((0 until numPartitions).map(i => (i, 
0)).toMap)
     }.asInstanceOf[ContinuousMemoryStreamOffset]
   }
 
@@ -152,6 +151,9 @@ object ContinuousMemoryStream {
 
   def apply[A : Encoder](implicit sqlContext: SQLContext): 
ContinuousMemoryStream[A] =
     new ContinuousMemoryStream[A](memoryStreamId.getAndIncrement(), sqlContext)
+
+  def singlePartition[A : Encoder](implicit sqlContext: SQLContext): 
ContinuousMemoryStream[A] =
+    new ContinuousMemoryStream[A](memoryStreamId.getAndIncrement(), 
sqlContext, 1)
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/spark/blob/3fabbc57/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreRDD.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreRDD.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreRDD.scala
index 01d8e75..3f11b8f 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreRDD.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreRDD.scala
@@ -23,6 +23,7 @@ import scala.reflect.ClassTag
 
 import org.apache.spark.{Partition, TaskContext}
 import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.execution.streaming.continuous.EpochTracker
 import org.apache.spark.sql.internal.SessionState
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.util.SerializableConfiguration
@@ -71,8 +72,15 @@ class StateStoreRDD[T: ClassTag, U: ClassTag](
       StateStoreId(checkpointLocation, operatorId, partition.index),
       queryRunId)
 
+    // If we're in continuous processing mode, we should get the store version 
for the current
+    // epoch rather than the one at planning time.
+    val currentVersion = EpochTracker.getCurrentEpoch match {
+      case None => storeVersion
+      case Some(value) => value
+    }
+
     store = StateStore.get(
-      storeProviderId, keySchema, valueSchema, indexOrdinal, storeVersion,
+      storeProviderId, keySchema, valueSchema, indexOrdinal, currentVersion,
       storeConf, hadoopConfBroadcast.value.value)
     val inputIter = dataRDD.iterator(partition, ctxt)
     storeUpdateFunction(store, inputIter)

http://git-wip-us.apache.org/repos/asf/spark/blob/3fabbc57/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
index 7cefd03..97da2b1 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
@@ -242,7 +242,9 @@ class StreamingQueryManager private[sql] (sparkSession: 
SparkSession) extends Lo
 
     (sink, trigger) match {
       case (v2Sink: StreamWriteSupport, trigger: ContinuousTrigger) =>
-        UnsupportedOperationChecker.checkForContinuous(analyzedPlan, 
outputMode)
+        if (sparkSession.sessionState.conf.isUnsupportedOperationCheckEnabled) 
{
+          UnsupportedOperationChecker.checkForContinuous(analyzedPlan, 
outputMode)
+        }
         new StreamingQueryWrapper(new ContinuousExecution(
           sparkSession,
           userSpecifiedName.orNull,

http://git-wip-us.apache.org/repos/asf/spark/blob/3fabbc57/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousAggregationSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousAggregationSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousAggregationSuite.scala
new file mode 100644
index 0000000..b7ef637
--- /dev/null
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousAggregationSuite.scala
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming.continuous
+
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.execution.streaming.sources.ContinuousMemoryStream
+import org.apache.spark.sql.functions._
+import org.apache.spark.sql.streaming.OutputMode
+
+class ContinuousAggregationSuite extends ContinuousSuiteBase {
+  import testImplicits._
+
+  test("not enabled") {
+    val ex = intercept[AnalysisException] {
+      val input = ContinuousMemoryStream.singlePartition[Int]
+      testStream(input.toDF().agg(max('value)), OutputMode.Complete)()
+    }
+
+    assert(ex.getMessage.contains("Continuous processing does not support 
Aggregate operations"))
+  }
+
+  test("basic") {
+    withSQLConf(("spark.sql.streaming.unsupportedOperationCheck", "false")) {
+      val input = ContinuousMemoryStream.singlePartition[Int]
+
+      testStream(input.toDF().agg(max('value)), OutputMode.Complete)(
+        AddData(input, 0, 1, 2),
+        CheckAnswer(2),
+        StopStream,
+        AddData(input, 3, 4, 5),
+        StartStream(),
+        CheckAnswer(5),
+        AddData(input, -1, -2, -3),
+        CheckAnswer(5))
+    }
+  }
+
+  test("repeated restart") {
+    withSQLConf(("spark.sql.streaming.unsupportedOperationCheck", "false")) {
+      val input = ContinuousMemoryStream.singlePartition[Int]
+
+      testStream(input.toDF().agg(max('value)), OutputMode.Complete)(
+        AddData(input, 0, 1, 2),
+        CheckAnswer(2),
+        StopStream,
+        StartStream(),
+        StopStream,
+        StartStream(),
+        StopStream,
+        StartStream(),
+        AddData(input, 0),
+        CheckAnswer(2),
+        AddData(input, 5),
+        CheckAnswer(5))
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/3fabbc57/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousQueuedDataReaderSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousQueuedDataReaderSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousQueuedDataReaderSuite.scala
index f47d3ec..e663fa8 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousQueuedDataReaderSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousQueuedDataReaderSuite.scala
@@ -51,6 +51,7 @@ class ContinuousQueuedDataReaderSuite extends StreamTest with 
MockitoSugar {
       startEpoch,
       spark,
       SparkEnv.get)
+    EpochTracker.initializeCurrentEpoch(0)
   }
 
   override def afterEach(): Unit = {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to