This is an automated email from the ASF dual-hosted git repository.

lixiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new bae5baa  [SPARK-27642][SS] make v1 offset extends v2 offset
bae5baa is described below

commit bae5baae5281d01dc8c67077b90592be857329bd
Author: Wenchen Fan <wenc...@databricks.com>
AuthorDate: Tue May 7 23:03:15 2019 -0700

    [SPARK-27642][SS] make v1 offset extends v2 offset
    
    ## What changes were proposed in this pull request?
    
    To move DS v2 to the catalyst module, we can't make v2 offset rely on v1 
offset, as v1 offset is in sql/core.
    
    ## How was this patch tested?
    
    existing tests
    
    Closes #24538 from cloud-fan/offset.
    
    Authored-by: Wenchen Fan <wenc...@databricks.com>
    Signed-off-by: gatorsmile <gatorsm...@gmail.com>
---
 .../spark/sql/kafka010/KafkaContinuousStream.scala |  2 +-
 .../spark/sql/kafka010/KafkaSourceOffset.scala     |  4 +--
 .../spark/sql/execution/streaming/Offset.java      | 42 +++-------------------
 .../sql/sources/v2/reader/streaming/Offset.java    | 11 ++----
 .../spark/sql/execution/streaming/LongOffset.scala | 14 +-------
 .../execution/streaming/MicroBatchExecution.scala  | 10 +++---
 .../spark/sql/execution/streaming/OffsetSeq.scala  |  9 ++---
 .../sql/execution/streaming/OffsetSeqLog.scala     |  3 +-
 .../sql/execution/streaming/StreamExecution.scala  |  4 +--
 .../sql/execution/streaming/StreamProgress.scala   | 19 +++++-----
 .../spark/sql/execution/streaming/memory.scala     | 25 +++++--------
 .../sources/TextSocketMicroBatchStream.scala       |  5 +--
 .../apache/spark/sql/streaming/StreamTest.scala    |  8 ++---
 13 files changed, 49 insertions(+), 107 deletions(-)

diff --git 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousStream.scala
 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousStream.scala
index d60ee1c..92686d2 100644
--- 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousStream.scala
+++ 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousStream.scala
@@ -76,7 +76,7 @@ class KafkaContinuousStream(
   }
 
   override def planInputPartitions(start: Offset): Array[InputPartition] = {
-    val oldStartPartitionOffsets = KafkaSourceOffset.getPartitionOffsets(start)
+    val oldStartPartitionOffsets = 
start.asInstanceOf[KafkaSourceOffset].partitionToOffsets
 
     val currentPartitionSet = offsetReader.fetchEarliestOffsets().keySet
     val newPartitions = 
currentPartitionSet.diff(oldStartPartitionOffsets.keySet)
diff --git 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceOffset.scala
 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceOffset.scala
index 8d41c0d..90d7043 100644
--- 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceOffset.scala
+++ 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceOffset.scala
@@ -20,14 +20,14 @@ package org.apache.spark.sql.kafka010
 import org.apache.kafka.common.TopicPartition
 
 import org.apache.spark.sql.execution.streaming.{Offset, SerializedOffset}
-import org.apache.spark.sql.sources.v2.reader.streaming.{Offset => OffsetV2, 
PartitionOffset}
+import org.apache.spark.sql.sources.v2.reader.streaming.PartitionOffset
 
 /**
  * An [[Offset]] for the [[KafkaSource]]. This one tracks all partitions of 
subscribed topics and
  * their offsets.
  */
 private[kafka010]
-case class KafkaSourceOffset(partitionToOffsets: Map[TopicPartition, Long]) 
extends OffsetV2 {
+case class KafkaSourceOffset(partitionToOffsets: Map[TopicPartition, Long]) 
extends Offset {
 
   override val json = JsonUtils.partitionOffsets(partitionToOffsets)
 }
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/execution/streaming/Offset.java 
b/sql/core/src/main/java/org/apache/spark/sql/execution/streaming/Offset.java
index 43ad4b3..7c167dc 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/execution/streaming/Offset.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/execution/streaming/Offset.java
@@ -18,44 +18,10 @@
 package org.apache.spark.sql.execution.streaming;
 
 /**
- * This is an internal, deprecated interface. New source implementations 
should use the
- * org.apache.spark.sql.sources.v2.reader.streaming.Offset class, which is the 
one that will be
- * supported in the long term.
+ * This class is an alias of {@link 
org.apache.spark.sql.sources.v2.reader.streaming.Offset}. It's
+ * internal and deprecated. New streaming data source implementations should 
use data source v2 API,
+ * which will be supported in the long term.
  *
  * This class will be removed in a future release.
  */
-public abstract class Offset {
-    /**
-     * A JSON-serialized representation of an Offset that is
-     * used for saving offsets to the offset log.
-     * Note: We assume that equivalent/equal offsets serialize to
-     * identical JSON strings.
-     *
-     * @return JSON string encoding
-     */
-    public abstract String json();
-
-    /**
-     * Equality based on JSON string representation. We leverage the
-     * JSON representation for normalization between the Offset's
-     * in memory and on disk representations.
-     */
-    @Override
-    public boolean equals(Object obj) {
-        if (obj instanceof Offset) {
-            return this.json().equals(((Offset) obj).json());
-        } else {
-            return false;
-        }
-    }
-
-    @Override
-    public int hashCode() {
-        return this.json().hashCode();
-    }
-
-    @Override
-    public String toString() {
-        return this.json();
-    }
-}
+public abstract class Offset extends 
org.apache.spark.sql.sources.v2.reader.streaming.Offset {}
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/Offset.java
 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/Offset.java
index a066713..1d34fdd 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/Offset.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/Offset.java
@@ -25,13 +25,9 @@ import org.apache.spark.annotation.Evolving;
  * During execution, offsets provided by the data source implementation will 
be logged and used as
  * restart checkpoints. Each source should provide an offset implementation 
which the source can use
  * to reconstruct a position in the stream up to which data has been 
seen/processed.
- *
- * Note: This class currently extends {@link 
org.apache.spark.sql.execution.streaming.Offset} to
- * maintain compatibility with DataSource V1 APIs. This extension will be 
removed once we
- * get rid of V1 completely.
  */
 @Evolving
-public abstract class Offset extends 
org.apache.spark.sql.execution.streaming.Offset {
+public abstract class Offset {
     /**
      * A JSON-serialized representation of an Offset that is
      * used for saving offsets to the offset log.
@@ -49,9 +45,8 @@ public abstract class Offset extends 
org.apache.spark.sql.execution.streaming.Of
      */
     @Override
     public boolean equals(Object obj) {
-        if (obj instanceof org.apache.spark.sql.execution.streaming.Offset) {
-            return this.json()
-                .equals(((org.apache.spark.sql.execution.streaming.Offset) 
obj).json());
+        if (obj instanceof Offset) {
+            return this.json().equals(((Offset) obj).json());
         } else {
             return false;
         }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/LongOffset.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/LongOffset.scala
index 3ff5b86..a27898c 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/LongOffset.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/LongOffset.scala
@@ -17,12 +17,10 @@
 
 package org.apache.spark.sql.execution.streaming
 
-import org.apache.spark.sql.sources.v2.reader.streaming.{Offset => OffsetV2}
-
 /**
  * A simple offset for sources that produce a single linear stream of data.
  */
-case class LongOffset(offset: Long) extends OffsetV2 {
+case class LongOffset(offset: Long) extends Offset {
 
   override val json = offset.toString
 
@@ -37,14 +35,4 @@ object LongOffset {
    * @return new LongOffset
    */
   def apply(offset: SerializedOffset) : LongOffset = new 
LongOffset(offset.json.toLong)
-
-  /**
-   * Convert generic Offset to LongOffset if possible.
-   * @return converted LongOffset
-   */
-  def convert(offset: Offset): Option[LongOffset] = offset match {
-    case lo: LongOffset => Some(lo)
-    case so: SerializedOffset => Some(LongOffset(so))
-    case _ => None
-  }
 }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
index 58c265d..7a3cdbc 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
@@ -296,7 +296,7 @@ class MicroBatchExecution(
                * batch will be executed before getOffset is called again. */
               availableOffsets.foreach {
                 case (source: Source, end: Offset) =>
-                  val start = committedOffsets.get(source)
+                  val start = 
committedOffsets.get(source).map(_.asInstanceOf[Offset])
                   source.getBatch(start, end)
                 case nonV1Tuple =>
                   // The V2 API does not have the same edge case requiring 
getBatch to be called
@@ -354,7 +354,7 @@ class MicroBatchExecution(
     if (isCurrentBatchConstructed) return true
 
     // Generate a map from each unique source to the next available offset.
-    val latestOffsets: Map[SparkDataStream, Option[Offset]] = 
uniqueSources.map {
+    val latestOffsets: Map[SparkDataStream, Option[OffsetV2]] = 
uniqueSources.map {
       case s: Source =>
         updateStatusMessage(s"Getting offsets from $s")
         reportTimeTaken("getOffset") {
@@ -411,7 +411,7 @@ class MicroBatchExecution(
           val prevBatchOff = offsetLog.get(currentBatchId - 1)
           if (prevBatchOff.isDefined) {
             prevBatchOff.get.toStreamProgress(sources).foreach {
-              case (src: Source, off) => src.commit(off)
+              case (src: Source, off: Offset) => src.commit(off)
               case (stream: MicroBatchStream, off) =>
                 stream.commit(stream.deserializeOffset(off.json))
               case (src, _) =>
@@ -448,9 +448,9 @@ class MicroBatchExecution(
     // Request unprocessed data from all sources.
     newData = reportTimeTaken("getBatch") {
       availableOffsets.flatMap {
-        case (source: Source, available)
+        case (source: Source, available: Offset)
           if committedOffsets.get(source).map(_ != available).getOrElse(true) 
=>
-          val current = committedOffsets.get(source)
+          val current = 
committedOffsets.get(source).map(_.asInstanceOf[Offset])
           val batch = source.getBatch(current, available)
           assert(batch.isStreaming,
             s"DataFrame returned by getBatch from $source did not have 
isStreaming=true\n" +
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeq.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeq.scala
index 0f7ad75..b6fa2e9 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeq.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeq.scala
@@ -24,14 +24,15 @@ import org.apache.spark.internal.Logging
 import org.apache.spark.sql.RuntimeConfig
 import 
org.apache.spark.sql.execution.streaming.state.{FlatMapGroupsWithStateExecHelper,
 StreamingAggregationStateManager}
 import 
org.apache.spark.sql.internal.SQLConf.{FLATMAPGROUPSWITHSTATE_STATE_FORMAT_VERSION,
 _}
-import org.apache.spark.sql.sources.v2.reader.streaming.SparkDataStream
+import org.apache.spark.sql.sources.v2.reader.streaming.{Offset => OffsetV2, 
SparkDataStream}
+
 
 /**
  * An ordered collection of offsets, used to track the progress of processing 
data from one or more
  * [[Source]]s that are present in a streaming query. This is similar to 
simplified, single-instance
  * vector clock that must progress linearly forward.
  */
-case class OffsetSeq(offsets: Seq[Option[Offset]], metadata: 
Option[OffsetSeqMetadata] = None) {
+case class OffsetSeq(offsets: Seq[Option[OffsetV2]], metadata: 
Option[OffsetSeqMetadata] = None) {
 
   /**
    * Unpacks an offset into [[StreamProgress]] by associating each offset with 
the ordered list of
@@ -57,13 +58,13 @@ object OffsetSeq {
    * Returns a [[OffsetSeq]] with a variable sequence of offsets.
    * `nulls` in the sequence are converted to `None`s.
    */
-  def fill(offsets: Offset*): OffsetSeq = OffsetSeq.fill(None, offsets: _*)
+  def fill(offsets: OffsetV2*): OffsetSeq = OffsetSeq.fill(None, offsets: _*)
 
   /**
    * Returns a [[OffsetSeq]] with metadata and a variable sequence of offsets.
    * `nulls` in the sequence are converted to `None`s.
    */
-  def fill(metadata: Option[String], offsets: Offset*): OffsetSeq = {
+  def fill(metadata: Option[String], offsets: OffsetV2*): OffsetSeq = {
     OffsetSeq(offsets.map(Option(_)), metadata.map(OffsetSeqMetadata.apply))
   }
 }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLog.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLog.scala
index 2c8d7c7..8a05dad 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLog.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLog.scala
@@ -24,6 +24,7 @@ import java.nio.charset.StandardCharsets._
 import scala.io.{Source => IOSource}
 
 import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.sources.v2.reader.streaming.{Offset => OffsetV2}
 
 /**
  * This class is used to log offsets to persistent files in HDFS.
@@ -47,7 +48,7 @@ class OffsetSeqLog(sparkSession: SparkSession, path: String)
 
   override protected def deserialize(in: InputStream): OffsetSeq = {
     // called inside a try-finally where the underlying stream is closed in 
the caller
-    def parseOffset(value: String): Offset = value match {
+    def parseOffset(value: String): OffsetV2 = value match {
       case OffsetSeqLog.SERIALIZED_VOID_OFFSET => null
       case json => SerializedOffset(json)
     }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index 5d66b61..4c08b3a 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -41,7 +41,7 @@ import 
org.apache.spark.sql.execution.command.StreamingExplainCommand
 import org.apache.spark.sql.execution.datasources.v2.StreamWriterCommitProgress
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.sources.v2.{SupportsWrite, Table}
-import org.apache.spark.sql.sources.v2.reader.streaming.SparkDataStream
+import org.apache.spark.sql.sources.v2.reader.streaming.{Offset => OffsetV2, 
SparkDataStream}
 import org.apache.spark.sql.sources.v2.writer.SupportsTruncate
 import org.apache.spark.sql.sources.v2.writer.streaming.StreamingWrite
 import org.apache.spark.sql.streaming._
@@ -438,7 +438,7 @@ abstract class StreamExecution(
    * Blocks the current thread until processing for data from the given 
`source` has reached at
    * least the given `Offset`. This method is intended for use primarily when 
writing tests.
    */
-  private[sql] def awaitOffset(sourceIndex: Int, newOffset: Offset, timeoutMs: 
Long): Unit = {
+  private[sql] def awaitOffset(sourceIndex: Int, newOffset: OffsetV2, 
timeoutMs: Long): Unit = {
     assertAwaitThread()
     def notDone = {
       val localCommittedOffsets = committedOffsets
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamProgress.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamProgress.scala
index 8a1d064..8783eaa 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamProgress.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamProgress.scala
@@ -19,15 +19,16 @@ package org.apache.spark.sql.execution.streaming
 
 import scala.collection.{immutable, GenTraversableOnce}
 
-import org.apache.spark.sql.sources.v2.reader.streaming.SparkDataStream
+import org.apache.spark.sql.sources.v2.reader.streaming.{Offset => OffsetV2, 
SparkDataStream}
+
 
 /**
  * A helper class that looks like a Map[Source, Offset].
  */
 class StreamProgress(
-    val baseMap: immutable.Map[SparkDataStream, Offset] =
-        new immutable.HashMap[SparkDataStream, Offset])
-  extends scala.collection.immutable.Map[SparkDataStream, Offset] {
+    val baseMap: immutable.Map[SparkDataStream, OffsetV2] =
+        new immutable.HashMap[SparkDataStream, OffsetV2])
+  extends scala.collection.immutable.Map[SparkDataStream, OffsetV2] {
 
   def toOffsetSeq(source: Seq[SparkDataStream], metadata: OffsetSeqMetadata): 
OffsetSeq = {
     OffsetSeq(source.map(get), Some(metadata))
@@ -36,17 +37,17 @@ class StreamProgress(
   override def toString: String =
     baseMap.map { case (k, v) => s"$k: $v"}.mkString("{", ",", "}")
 
-  override def +[B1 >: Offset](kv: (SparkDataStream, B1)): 
Map[SparkDataStream, B1] = {
+  override def +[B1 >: OffsetV2](kv: (SparkDataStream, B1)): 
Map[SparkDataStream, B1] = {
     baseMap + kv
   }
 
-  override def get(key: SparkDataStream): Option[Offset] = baseMap.get(key)
+  override def get(key: SparkDataStream): Option[OffsetV2] = baseMap.get(key)
 
-  override def iterator: Iterator[(SparkDataStream, Offset)] = baseMap.iterator
+  override def iterator: Iterator[(SparkDataStream, OffsetV2)] = 
baseMap.iterator
 
-  override def -(key: SparkDataStream): Map[SparkDataStream, Offset] = baseMap 
- key
+  override def -(key: SparkDataStream): Map[SparkDataStream, OffsetV2] = 
baseMap - key
 
-  def ++(updates: GenTraversableOnce[(SparkDataStream, Offset)]): 
StreamProgress = {
+  def ++(updates: GenTraversableOnce[(SparkDataStream, OffsetV2)]): 
StreamProgress = {
     new StreamProgress(baseMap ++ updates)
   }
 }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
index 022c8da..df14955 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
@@ -61,10 +61,12 @@ abstract class MemoryStreamBase[A : Encoder](sqlContext: 
SQLContext) extends Spa
     Dataset.ofRows(sqlContext.sparkSession, logicalPlan)
   }
 
-  def addData(data: A*): Offset = {
+  def addData(data: A*): OffsetV2 = {
     addData(data.toTraversable)
   }
 
+  def addData(data: TraversableOnce[A]): OffsetV2
+
   def fullSchema(): StructType = encoder.schema
 
   protected val logicalPlan: LogicalPlan = {
@@ -77,8 +79,6 @@ abstract class MemoryStreamBase[A : Encoder](sqlContext: 
SQLContext) extends Spa
       None)(sqlContext.sparkSession)
   }
 
-  def addData(data: TraversableOnce[A]): Offset
-
   override def initialOffset(): OffsetV2 = {
     throw new IllegalStateException("should not be called.")
   }
@@ -226,22 +226,15 @@ case class MemoryStream[A : Encoder](id: Int, sqlContext: 
SQLContext)
   }
 
   override def commit(end: OffsetV2): Unit = synchronized {
-    def check(newOffset: LongOffset): Unit = {
-      val offsetDiff = (newOffset.offset - lastOffsetCommitted.offset).toInt
+    val newOffset = end.asInstanceOf[LongOffset]
+    val offsetDiff = (newOffset.offset - lastOffsetCommitted.offset).toInt
 
-      if (offsetDiff < 0) {
-        sys.error(s"Offsets committed out of order: $lastOffsetCommitted 
followed by $end")
-      }
-
-      batches.trimStart(offsetDiff)
-      lastOffsetCommitted = newOffset
+    if (offsetDiff < 0) {
+      sys.error(s"Offsets committed out of order: $lastOffsetCommitted 
followed by $end")
     }
 
-    LongOffset.convert(end) match {
-      case Some(lo) => check(lo)
-      case None => sys.error(s"MemoryStream.commit() received an offset ($end) 
" +
-        "that did not originate with an instance of this class")
-    }
+    batches.trimStart(offsetDiff)
+    lastOffsetCommitted = newOffset
   }
 
   override def stop() {}
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketMicroBatchStream.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketMicroBatchStream.scala
index dab64e1..25e9af2b 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketMicroBatchStream.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketMicroBatchStream.scala
@@ -150,10 +150,7 @@ class TextSocketMicroBatchStream(host: String, port: Int, 
numPartitions: Int)
     }
 
   override def commit(end: Offset): Unit = synchronized {
-    val newOffset = LongOffset.convert(end).getOrElse(
-      sys.error(s"TextSocketStream.commit() received an offset ($end) that did 
not " +
-        s"originate with an instance of this class")
-    )
+    val newOffset = end.asInstanceOf[LongOffset]
 
     val offsetDiff = (newOffset.offset - lastOffsetCommitted.offset).toInt
 
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
index 89c62ba..3a4414f 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
@@ -42,7 +42,7 @@ import org.apache.spark.sql.execution.streaming._
 import 
org.apache.spark.sql.execution.streaming.continuous.{ContinuousExecution, 
EpochCoordinatorRef, IncrementAndGetEpoch}
 import org.apache.spark.sql.execution.streaming.sources.MemorySink
 import org.apache.spark.sql.execution.streaming.state.StateStore
-import org.apache.spark.sql.sources.v2.reader.streaming.SparkDataStream
+import org.apache.spark.sql.sources.v2.reader.streaming.{Offset => OffsetV2, 
SparkDataStream}
 import org.apache.spark.sql.streaming.StreamingQueryListener._
 import org.apache.spark.sql.test.SharedSQLContext
 import org.apache.spark.util.{Clock, SystemClock, Utils}
@@ -124,7 +124,7 @@ trait StreamTest extends QueryTest with SharedSQLContext 
with TimeLimits with Be
      * the active query, and then return the source object the data was added, 
as well as the
      * offset of added data.
      */
-    def addData(query: Option[StreamExecution]): (SparkDataStream, Offset)
+    def addData(query: Option[StreamExecution]): (SparkDataStream, OffsetV2)
   }
 
   /** A trait that can be extended when testing a source. */
@@ -135,7 +135,7 @@ trait StreamTest extends QueryTest with SharedSQLContext 
with TimeLimits with Be
   case class AddDataMemory[A](source: MemoryStreamBase[A], data: Seq[A]) 
extends AddData {
     override def toString: String = s"AddData to $source: 
${data.mkString(",")}"
 
-    override def addData(query: Option[StreamExecution]): (SparkDataStream, 
Offset) = {
+    override def addData(query: Option[StreamExecution]): (SparkDataStream, 
OffsetV2) = {
       (source, source.addData(data))
     }
   }
@@ -337,7 +337,7 @@ trait StreamTest extends QueryTest with SharedSQLContext 
with TimeLimits with Be
     var pos = 0
     var currentStream: StreamExecution = null
     var lastStream: StreamExecution = null
-    val awaiting = new mutable.HashMap[Int, Offset]() // source index -> 
offset to wait for
+    val awaiting = new mutable.HashMap[Int, OffsetV2]() // source index -> 
offset to wait for
     val sink = new MemorySink
     val resetConfValues = mutable.Map[String, Option[String]]()
     val defaultCheckpointLocation =


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to