Repository: spark
Updated Branches:
  refs/heads/master 7570eab6b -> 7798c9e6e


[SPARK-22824] Restore old offset for binary compatibility

## What changes were proposed in this pull request?

Some users depend on source compatibility with the 
org.apache.spark.sql.execution.streaming.Offset class. Although this is not a 
stable interface, we can keep it in place for now to simplify upgrades to 2.3.

Author: Jose Torres <j...@databricks.com>

Closes #20012 from joseph-torres/binary-compat.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7798c9e6
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7798c9e6
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7798c9e6

Branch: refs/heads/master
Commit: 7798c9e6ef55dbadfc9eb896fa3f366c76dc187b
Parents: 7570eab
Author: Jose Torres <j...@databricks.com>
Authored: Wed Dec 20 10:43:10 2017 -0800
Committer: Shixiong Zhu <zsxw...@gmail.com>
Committed: Wed Dec 20 10:43:10 2017 -0800

----------------------------------------------------------------------
 .../apache/spark/sql/kafka010/KafkaSource.scala |   1 -
 .../spark/sql/kafka010/KafkaSourceOffset.scala  |   3 +-
 .../spark/sql/kafka010/KafkaSourceSuite.scala   |   1 -
 .../spark/sql/sources/v2/reader/Offset.java     |   6 +-
 .../execution/streaming/FileStreamSource.scala  |   1 -
 .../streaming/FileStreamSourceOffset.scala      |   2 -
 .../sql/execution/streaming/LongOffset.scala    |   2 -
 .../streaming/MicroBatchExecution.scala         |   1 -
 .../spark/sql/execution/streaming/Offset.java   |  61 +++++++
 .../spark/sql/execution/streaming/Offset.scala  |  32 ----
 .../sql/execution/streaming/OffsetSeq.scala     |   1 -
 .../sql/execution/streaming/OffsetSeqLog.scala  |   1 -
 .../streaming/RateSourceProvider.scala          |   3 +-
 .../execution/streaming/RateStreamOffset.scala  |   4 +-
 .../streaming/RateStreamSourceV2.scala          | 162 -----------------
 .../execution/streaming/SerializedOffset.scala  |  29 +++
 .../spark/sql/execution/streaming/Source.scala  |   1 -
 .../execution/streaming/StreamExecution.scala   |   1 -
 .../execution/streaming/StreamProgress.scala    |   2 -
 .../continuous/ContinuousRateStreamSource.scala |   3 +-
 .../spark/sql/execution/streaming/memory.scala  |   1 -
 .../sql/execution/streaming/memoryV2.scala      | 178 ------------------
 .../spark/sql/execution/streaming/socket.scala  |   1 -
 .../streaming/sources/RateStreamSourceV2.scala  | 165 +++++++++++++++++
 .../execution/streaming/sources/memoryV2.scala  | 179 +++++++++++++++++++
 .../execution/streaming/MemorySinkV2Suite.scala |   1 +
 .../execution/streaming/RateSourceV2Suite.scala |   1 +
 .../sql/streaming/FileStreamSourceSuite.scala   |   1 -
 .../spark/sql/streaming/OffsetSuite.scala       |   3 +-
 .../spark/sql/streaming/StreamSuite.scala       |   1 -
 .../apache/spark/sql/streaming/StreamTest.scala |   1 -
 .../streaming/StreamingAggregationSuite.scala   |   1 -
 .../streaming/StreamingQueryListenerSuite.scala |   1 -
 .../sql/streaming/StreamingQuerySuite.scala     |   2 -
 .../test/DataStreamReaderWriterSuite.scala      |   1 -
 .../sql/streaming/util/BlockingSource.scala     |   3 +-
 36 files changed, 448 insertions(+), 409 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/7798c9e6/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
index 87f31fc..e9cff04 100644
--- 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
+++ 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
@@ -32,7 +32,6 @@ import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.util.DateTimeUtils
 import org.apache.spark.sql.execution.streaming._
 import org.apache.spark.sql.kafka010.KafkaSource._
-import org.apache.spark.sql.sources.v2.reader.Offset
 import org.apache.spark.sql.types._
 import org.apache.spark.unsafe.types.UTF8String
 

http://git-wip-us.apache.org/repos/asf/spark/blob/7798c9e6/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceOffset.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceOffset.scala
 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceOffset.scala
index 6e24423..b5da415 100644
--- 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceOffset.scala
+++ 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceOffset.scala
@@ -19,8 +19,7 @@ package org.apache.spark.sql.kafka010
 
 import org.apache.kafka.common.TopicPartition
 
-import org.apache.spark.sql.execution.streaming.SerializedOffset
-import org.apache.spark.sql.sources.v2.reader.Offset
+import org.apache.spark.sql.execution.streaming.{Offset, SerializedOffset}
 
 /**
  * An [[Offset]] for the [[KafkaSource]]. This one tracks all partitions of 
subscribed topics and

http://git-wip-us.apache.org/repos/asf/spark/blob/7798c9e6/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
index 9cac0e5..2034b9b 100644
--- 
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
+++ 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
@@ -38,7 +38,6 @@ import org.apache.spark.sql.ForeachWriter
 import org.apache.spark.sql.execution.streaming._
 import org.apache.spark.sql.functions.{count, window}
 import org.apache.spark.sql.kafka010.KafkaSourceProvider._
-import org.apache.spark.sql.sources.v2.reader.Offset
 import org.apache.spark.sql.streaming.{ProcessingTime, StreamTest}
 import org.apache.spark.sql.streaming.util.StreamManualClock
 import org.apache.spark.sql.test.{SharedSQLContext, TestSparkSession}

http://git-wip-us.apache.org/repos/asf/spark/blob/7798c9e6/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/Offset.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/Offset.java 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/Offset.java
index 1ebd353..ce1c489 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/Offset.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/Offset.java
@@ -23,7 +23,7 @@ package org.apache.spark.sql.sources.v2.reader;
  * restart checkpoints. Sources should provide an Offset implementation which 
they can use to
  * reconstruct the stream position where the offset was taken.
  */
-public abstract class Offset {
+public abstract class Offset extends 
org.apache.spark.sql.execution.streaming.Offset {
     /**
      * A JSON-serialized representation of an Offset that is
      * used for saving offsets to the offset log.
@@ -41,8 +41,8 @@ public abstract class Offset {
      */
     @Override
     public boolean equals(Object obj) {
-        if (obj instanceof Offset) {
-            return this.json().equals(((Offset) obj).json());
+        if (obj instanceof org.apache.spark.sql.execution.streaming.Offset) {
+            return 
this.json().equals(((org.apache.spark.sql.execution.streaming.Offset) 
obj).json());
         } else {
             return false;
         }

http://git-wip-us.apache.org/repos/asf/spark/blob/7798c9e6/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
index a33b785..0debd7d 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
@@ -27,7 +27,6 @@ import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
 import org.apache.spark.sql.execution.datasources.{DataSource, 
InMemoryFileIndex, LogicalRelation}
-import org.apache.spark.sql.sources.v2.reader.Offset
 import org.apache.spark.sql.types.StructType
 
 /**

http://git-wip-us.apache.org/repos/asf/spark/blob/7798c9e6/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceOffset.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceOffset.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceOffset.scala
index 431e5b9..a2b49d94 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceOffset.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceOffset.scala
@@ -22,8 +22,6 @@ import scala.util.control.Exception._
 import org.json4s.NoTypeHints
 import org.json4s.jackson.Serialization
 
-import org.apache.spark.sql.sources.v2.reader.Offset
-
 /**
  * Offset for the [[FileStreamSource]].
  *

http://git-wip-us.apache.org/repos/asf/spark/blob/7798c9e6/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/LongOffset.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/LongOffset.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/LongOffset.scala
index 7ea3146..5f0b195 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/LongOffset.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/LongOffset.scala
@@ -17,8 +17,6 @@
 
 package org.apache.spark.sql.execution.streaming
 
-import org.apache.spark.sql.sources.v2.reader.Offset
-
 /**
  * A simple offset for sources that produce a single linear stream of data.
  */

http://git-wip-us.apache.org/repos/asf/spark/blob/7798c9e6/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
index a67dda9..4a3de8b 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
@@ -24,7 +24,6 @@ import org.apache.spark.sql.catalyst.encoders.RowEncoder
 import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, 
CurrentBatchTimestamp, CurrentDate, CurrentTimestamp}
 import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
 import org.apache.spark.sql.execution.SQLExecution
-import org.apache.spark.sql.sources.v2.reader.Offset
 import org.apache.spark.sql.streaming.{OutputMode, ProcessingTime, Trigger}
 import org.apache.spark.util.{Clock, Utils}
 

http://git-wip-us.apache.org/repos/asf/spark/blob/7798c9e6/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Offset.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Offset.java 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Offset.java
new file mode 100644
index 0000000..80aa550
--- /dev/null
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Offset.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming;
+
+/**
+ * This is an internal, deprecated interface. New source implementations 
should use the
+ * org.apache.spark.sql.sources.v2.reader.Offset class, which is the one that 
will be supported
+ * in the long term.
+ *
+ * This class will be removed in a future release.
+ */
+public abstract class Offset {
+    /**
+     * A JSON-serialized representation of an Offset that is
+     * used for saving offsets to the offset log.
+     * Note: We assume that equivalent/equal offsets serialize to
+     * identical JSON strings.
+     *
+     * @return JSON string encoding
+     */
+    public abstract String json();
+
+    /**
+     * Equality based on JSON string representation. We leverage the
+     * JSON representation for normalization between the Offset's
+     * in memory and on disk representations.
+     */
+    @Override
+    public boolean equals(Object obj) {
+        if (obj instanceof Offset) {
+            return this.json().equals(((Offset) obj).json());
+        } else {
+            return false;
+        }
+    }
+
+    @Override
+    public int hashCode() {
+        return this.json().hashCode();
+    }
+
+    @Override
+    public String toString() {
+        return this.json();
+    }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/7798c9e6/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Offset.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Offset.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Offset.scala
deleted file mode 100644
index 73f0c62..0000000
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Offset.scala
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution.streaming
-
-import org.apache.spark.sql.sources.v2.reader.Offset
-
-
-/**
- * Used when loading a JSON serialized offset from external storage.
- * We are currently not responsible for converting JSON serialized
- * data into an internal (i.e., object) representation. Sources should
- * define a factory method in their source Offset companion objects
- * that accepts a [[SerializedOffset]] for doing the conversion.
- */
-case class SerializedOffset(override val json: String) extends Offset
-
-

http://git-wip-us.apache.org/repos/asf/spark/blob/7798c9e6/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeq.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeq.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeq.scala
index dcc5935..4e0a468 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeq.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeq.scala
@@ -23,7 +23,6 @@ import org.json4s.jackson.Serialization
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.RuntimeConfig
 import org.apache.spark.sql.internal.SQLConf.{SHUFFLE_PARTITIONS, 
STATE_STORE_PROVIDER_CLASS}
-import org.apache.spark.sql.sources.v2.reader.Offset
 
 /**
  * An ordered collection of offsets, used to track the progress of processing 
data from one or more

http://git-wip-us.apache.org/repos/asf/spark/blob/7798c9e6/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLog.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLog.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLog.scala
index bfdbc65..e3f4abc 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLog.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLog.scala
@@ -24,7 +24,6 @@ import java.nio.charset.StandardCharsets._
 import scala.io.{Source => IOSource}
 
 import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.sources.v2.reader.Offset
 
 /**
  * This class is used to log offsets to persistent files in HDFS.

http://git-wip-us.apache.org/repos/asf/spark/blob/7798c9e6/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/RateSourceProvider.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/RateSourceProvider.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/RateSourceProvider.scala
index 50671a4..4176132 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/RateSourceProvider.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/RateSourceProvider.scala
@@ -30,9 +30,10 @@ import org.apache.spark.sql.{AnalysisException, DataFrame, 
SQLContext}
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils}
 import 
org.apache.spark.sql.execution.streaming.continuous.ContinuousRateStreamReader
+import org.apache.spark.sql.execution.streaming.sources.RateStreamV2Reader
 import org.apache.spark.sql.sources.{DataSourceRegister, StreamSourceProvider}
 import org.apache.spark.sql.sources.v2._
-import org.apache.spark.sql.sources.v2.reader.{ContinuousReader, 
MicroBatchReader, Offset}
+import org.apache.spark.sql.sources.v2.reader.{ContinuousReader, 
MicroBatchReader}
 import org.apache.spark.sql.types._
 import org.apache.spark.util.{ManualClock, SystemClock}
 

http://git-wip-us.apache.org/repos/asf/spark/blob/7798c9e6/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/RateStreamOffset.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/RateStreamOffset.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/RateStreamOffset.scala
index 13679df..726d857 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/RateStreamOffset.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/RateStreamOffset.scala
@@ -20,10 +20,10 @@ package org.apache.spark.sql.execution.streaming
 import org.json4s.DefaultFormats
 import org.json4s.jackson.Serialization
 
-import org.apache.spark.sql.sources.v2.reader.Offset
+import org.apache.spark.sql.sources.v2
 
 case class RateStreamOffset(partitionToValueAndRunTimeMs: Map[Int, (Long, 
Long)])
-  extends Offset {
+  extends v2.reader.Offset {
   implicit val defaultFormats: DefaultFormats = DefaultFormats
   override val json = Serialization.write(partitionToValueAndRunTimeMs)
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/7798c9e6/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/RateStreamSourceV2.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/RateStreamSourceV2.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/RateStreamSourceV2.scala
deleted file mode 100644
index 102551c..0000000
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/RateStreamSourceV2.scala
+++ /dev/null
@@ -1,162 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution.streaming
-
-import java.util.Optional
-
-import scala.collection.JavaConverters._
-import scala.collection.mutable
-
-import org.json4s.DefaultFormats
-import org.json4s.jackson.Serialization
-
-import org.apache.spark.sql.Row
-import org.apache.spark.sql.catalyst.util.DateTimeUtils
-import org.apache.spark.sql.sources.v2.DataSourceV2Options
-import org.apache.spark.sql.sources.v2.reader._
-import org.apache.spark.sql.types.{LongType, StructField, StructType, 
TimestampType}
-import org.apache.spark.util.SystemClock
-
-class RateStreamV2Reader(options: DataSourceV2Options)
-  extends MicroBatchReader {
-  implicit val defaultFormats: DefaultFormats = DefaultFormats
-
-  val clock = new SystemClock
-
-  private val numPartitions =
-    options.get(RateStreamSourceV2.NUM_PARTITIONS).orElse("5").toInt
-  private val rowsPerSecond =
-    options.get(RateStreamSourceV2.ROWS_PER_SECOND).orElse("6").toLong
-
-  // The interval (in milliseconds) between rows in each partition.
-  // e.g. if there are 4 global rows per second, and 2 partitions, each 
partition
-  // should output rows every (1000 * 2 / 4) = 500 ms.
-  private val msPerPartitionBetweenRows = (1000 * numPartitions) / 
rowsPerSecond
-
-  override def readSchema(): StructType = {
-    StructType(
-      StructField("timestamp", TimestampType, false) ::
-      StructField("value", LongType, false) :: Nil)
-  }
-
-  val creationTimeMs = clock.getTimeMillis()
-
-  private var start: RateStreamOffset = _
-  private var end: RateStreamOffset = _
-
-  override def setOffsetRange(start: Optional[Offset], end: Optional[Offset]): 
Unit = {
-    this.start = start.orElse(
-      RateStreamSourceV2.createInitialOffset(numPartitions, creationTimeMs))
-      .asInstanceOf[RateStreamOffset]
-
-    this.end = end.orElse {
-      val currentTime = clock.getTimeMillis()
-      RateStreamOffset(
-        this.start.partitionToValueAndRunTimeMs.map {
-          case startOffset @ (part, (currentVal, currentReadTime)) =>
-            // Calculate the number of rows we should advance in this 
partition (based on the
-            // current time), and output a corresponding offset.
-            val readInterval = currentTime - currentReadTime
-            val numNewRows = readInterval / msPerPartitionBetweenRows
-            if (numNewRows <= 0) {
-              startOffset
-            } else {
-              (part,
-                (currentVal + (numNewRows * numPartitions),
-                currentReadTime + (numNewRows * msPerPartitionBetweenRows)))
-            }
-        }
-      )
-    }.asInstanceOf[RateStreamOffset]
-  }
-
-  override def getStartOffset(): Offset = {
-    if (start == null) throw new IllegalStateException("start offset not set")
-    start
-  }
-  override def getEndOffset(): Offset = {
-    if (end == null) throw new IllegalStateException("end offset not set")
-    end
-  }
-
-  override def deserializeOffset(json: String): Offset = {
-    RateStreamOffset(Serialization.read[Map[Int, (Long, Long)]](json))
-  }
-
-  override def createReadTasks(): java.util.List[ReadTask[Row]] = {
-    val startMap = start.partitionToValueAndRunTimeMs
-    val endMap = end.partitionToValueAndRunTimeMs
-    endMap.keys.toSeq.map { part =>
-      val (endVal, _) = endMap(part)
-      val (startVal, startTimeMs) = startMap(part)
-
-      val packedRows = mutable.ListBuffer[(Long, Long)]()
-      var outVal = startVal + numPartitions
-      var outTimeMs = startTimeMs + msPerPartitionBetweenRows
-      while (outVal <= endVal) {
-        packedRows.append((outTimeMs, outVal))
-        outVal += numPartitions
-        outTimeMs += msPerPartitionBetweenRows
-      }
-
-      RateStreamBatchTask(packedRows).asInstanceOf[ReadTask[Row]]
-    }.toList.asJava
-  }
-
-  override def commit(end: Offset): Unit = {}
-  override def stop(): Unit = {}
-}
-
-case class RateStreamBatchTask(vals: Seq[(Long, Long)]) extends ReadTask[Row] {
-  override def createDataReader(): DataReader[Row] = new 
RateStreamBatchReader(vals)
-}
-
-class RateStreamBatchReader(vals: Seq[(Long, Long)]) extends DataReader[Row] {
-  var currentIndex = -1
-
-  override def next(): Boolean = {
-    // Return true as long as the new index is in the seq.
-    currentIndex += 1
-    currentIndex < vals.size
-  }
-
-  override def get(): Row = {
-    Row(
-      
DateTimeUtils.toJavaTimestamp(DateTimeUtils.fromMillis(vals(currentIndex)._1)),
-      vals(currentIndex)._2)
-  }
-
-  override def close(): Unit = {}
-}
-
-object RateStreamSourceV2 {
-  val NUM_PARTITIONS = "numPartitions"
-  val ROWS_PER_SECOND = "rowsPerSecond"
-
-  private[sql] def createInitialOffset(numPartitions: Int, creationTimeMs: 
Long) = {
-    RateStreamOffset(
-      Range(0, numPartitions).map { i =>
-        // Note that the starting offset is exclusive, so we have to decrement 
the starting value
-        // by the increment that will later be applied. The first row output 
in each
-        // partition will have a value equal to the partition index.
-        (i,
-          ((i - numPartitions).toLong,
-            creationTimeMs))
-      }.toMap)
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/7798c9e6/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/SerializedOffset.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/SerializedOffset.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/SerializedOffset.scala
new file mode 100644
index 0000000..129cfed
--- /dev/null
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/SerializedOffset.scala
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+/**
+ * Used when loading a JSON serialized offset from external storage.
+ * We are currently not responsible for converting JSON serialized
+ * data into an internal (i.e., object) representation. Sources should
+ * define a factory method in their source Offset companion objects
+ * that accepts a [[SerializedOffset]] for doing the conversion.
+ */
+case class SerializedOffset(override val json: String) extends Offset
+
+

http://git-wip-us.apache.org/repos/asf/spark/blob/7798c9e6/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Source.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Source.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Source.scala
index dbb408f..311942f 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Source.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Source.scala
@@ -18,7 +18,6 @@
 package org.apache.spark.sql.execution.streaming
 
 import org.apache.spark.sql.DataFrame
-import org.apache.spark.sql.sources.v2.reader.Offset
 import org.apache.spark.sql.types.StructType
 
 /**

http://git-wip-us.apache.org/repos/asf/spark/blob/7798c9e6/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index 7946889..129995d 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -36,7 +36,6 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution.QueryExecution
 import org.apache.spark.sql.execution.command.StreamingExplainCommand
 import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.sources.v2.reader.Offset
 import org.apache.spark.sql.streaming._
 import org.apache.spark.util.{Clock, UninterruptibleThread, Utils}
 

http://git-wip-us.apache.org/repos/asf/spark/blob/7798c9e6/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamProgress.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamProgress.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamProgress.scala
index 770db40..a3f3662 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamProgress.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamProgress.scala
@@ -19,8 +19,6 @@ package org.apache.spark.sql.execution.streaming
 
 import scala.collection.{immutable, GenTraversableOnce}
 
-import org.apache.spark.sql.sources.v2.reader.Offset
-
 /**
  * A helper class that looks like a Map[Source, Offset].
  */

http://git-wip-us.apache.org/repos/asf/spark/blob/7798c9e6/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousRateStreamSource.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousRateStreamSource.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousRateStreamSource.scala
index 77fc267..4c3a1ee 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousRateStreamSource.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousRateStreamSource.scala
@@ -25,7 +25,8 @@ import org.json4s.jackson.Serialization
 import org.apache.spark.sql.Row
 import org.apache.spark.sql.catalyst.expressions.UnsafeRow
 import org.apache.spark.sql.catalyst.util.DateTimeUtils
-import org.apache.spark.sql.execution.streaming._
+import org.apache.spark.sql.execution.streaming.{RateSourceProvider, 
RateStreamOffset}
+import org.apache.spark.sql.execution.streaming.sources.RateStreamSourceV2
 import org.apache.spark.sql.sources.v2.{ContinuousReadSupport, DataSourceV2, 
DataSourceV2Options}
 import org.apache.spark.sql.sources.v2.reader._
 import org.apache.spark.sql.types.{LongType, StructField, StructType, 
TimestampType}

http://git-wip-us.apache.org/repos/asf/spark/blob/7798c9e6/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
index db07175..3041d4d 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
@@ -32,7 +32,6 @@ import org.apache.spark.sql.catalyst.expressions.Attribute
 import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LocalRelation, 
Statistics}
 import org.apache.spark.sql.catalyst.streaming.InternalOutputModes._
 import org.apache.spark.sql.execution.SQLExecution
-import org.apache.spark.sql.sources.v2.reader.Offset
 import org.apache.spark.sql.streaming.OutputMode
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.util.Utils

http://git-wip-us.apache.org/repos/asf/spark/blob/7798c9e6/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memoryV2.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memoryV2.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memoryV2.scala
deleted file mode 100644
index 437040c..0000000
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memoryV2.scala
+++ /dev/null
@@ -1,178 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution.streaming
-
-import javax.annotation.concurrent.GuardedBy
-
-import scala.collection.mutable
-import scala.collection.mutable.ArrayBuffer
-import scala.util.control.NonFatal
-
-import org.apache.spark.internal.Logging
-import org.apache.spark.sql.Row
-import org.apache.spark.sql.catalyst.streaming.InternalOutputModes.{Append, 
Complete, Update}
-import org.apache.spark.sql.sources.v2.{ContinuousWriteSupport, DataSourceV2, 
DataSourceV2Options, MicroBatchWriteSupport}
-import org.apache.spark.sql.sources.v2.writer._
-import org.apache.spark.sql.streaming.OutputMode
-import org.apache.spark.sql.types.StructType
-
-/**
- * A sink that stores the results in memory. This [[Sink]] is primarily 
intended for use in unit
- * tests and does not provide durability.
- */
-class MemorySinkV2 extends DataSourceV2
-  with MicroBatchWriteSupport with ContinuousWriteSupport with Logging {
-
-  override def createMicroBatchWriter(
-      queryId: String,
-      batchId: Long,
-      schema: StructType,
-      mode: OutputMode,
-      options: DataSourceV2Options): java.util.Optional[DataSourceV2Writer] = {
-    java.util.Optional.of(new MemoryWriter(this, batchId, mode))
-  }
-
-  override def createContinuousWriter(
-      queryId: String,
-      schema: StructType,
-      mode: OutputMode,
-      options: DataSourceV2Options): java.util.Optional[ContinuousWriter] = {
-    java.util.Optional.of(new ContinuousMemoryWriter(this, mode))
-  }
-
-  private case class AddedData(batchId: Long, data: Array[Row])
-
-  /** An order list of batches that have been written to this [[Sink]]. */
-  @GuardedBy("this")
-  private val batches = new ArrayBuffer[AddedData]()
-
-  /** Returns all rows that are stored in this [[Sink]]. */
-  def allData: Seq[Row] = synchronized {
-    batches.flatMap(_.data)
-  }
-
-  def latestBatchId: Option[Long] = synchronized {
-    batches.lastOption.map(_.batchId)
-  }
-
-  def latestBatchData: Seq[Row] = synchronized {
-    batches.lastOption.toSeq.flatten(_.data)
-  }
-
-  def toDebugString: String = synchronized {
-    batches.map { case AddedData(batchId, data) =>
-      val dataStr = try data.mkString(" ") catch {
-        case NonFatal(e) => "[Error converting to string]"
-      }
-      s"$batchId: $dataStr"
-    }.mkString("\n")
-  }
-
-  def write(batchId: Long, outputMode: OutputMode, newRows: Array[Row]): Unit 
= {
-    val notCommitted = synchronized {
-      latestBatchId.isEmpty || batchId > latestBatchId.get
-    }
-    if (notCommitted) {
-      logDebug(s"Committing batch $batchId to $this")
-      outputMode match {
-        case Append | Update =>
-          val rows = AddedData(batchId, newRows)
-          synchronized { batches += rows }
-
-        case Complete =>
-          val rows = AddedData(batchId, newRows)
-          synchronized {
-            batches.clear()
-            batches += rows
-          }
-
-        case _ =>
-          throw new IllegalArgumentException(
-            s"Output mode $outputMode is not supported by MemorySink")
-      }
-    } else {
-      logDebug(s"Skipping already committed batch: $batchId")
-    }
-  }
-
-  def clear(): Unit = synchronized {
-    batches.clear()
-  }
-
-  override def toString(): String = "MemorySink"
-}
-
-case class MemoryWriterCommitMessage(partition: Int, data: Seq[Row]) extends 
WriterCommitMessage {}
-
-class MemoryWriter(sink: MemorySinkV2, batchId: Long, outputMode: OutputMode)
-  extends DataSourceV2Writer with Logging {
-
-  override def createWriterFactory: MemoryWriterFactory = 
MemoryWriterFactory(outputMode)
-
-  def commit(messages: Array[WriterCommitMessage]): Unit = {
-    val newRows = messages.flatMap {
-      case message: MemoryWriterCommitMessage => message.data
-    }
-    sink.write(batchId, outputMode, newRows)
-  }
-
-  override def abort(messages: Array[WriterCommitMessage]): Unit = {
-    // Don't accept any of the new input.
-  }
-}
-
-class ContinuousMemoryWriter(val sink: MemorySinkV2, outputMode: OutputMode)
-  extends ContinuousWriter {
-
-  override def createWriterFactory: MemoryWriterFactory = 
MemoryWriterFactory(outputMode)
-
-  override def commit(epochId: Long, messages: Array[WriterCommitMessage]): 
Unit = {
-    val newRows = messages.flatMap {
-      case message: MemoryWriterCommitMessage => message.data
-    }
-    sink.write(epochId, outputMode, newRows)
-  }
-
-  override def abort(messages: Array[WriterCommitMessage]): Unit = {
-    // Don't accept any of the new input.
-  }
-}
-
-case class MemoryWriterFactory(outputMode: OutputMode) extends 
DataWriterFactory[Row] {
-  def createDataWriter(partitionId: Int, attemptNumber: Int): DataWriter[Row] 
= {
-    new MemoryDataWriter(partitionId, outputMode)
-  }
-}
-
-class MemoryDataWriter(partition: Int, outputMode: OutputMode)
-  extends DataWriter[Row] with Logging {
-
-  private val data = mutable.Buffer[Row]()
-
-  override def write(row: Row): Unit = {
-    data.append(row)
-  }
-
-  override def commit(): MemoryWriterCommitMessage = {
-    val msg = MemoryWriterCommitMessage(partition, data.clone())
-    data.clear()
-    msg
-  }
-
-  override def abort(): Unit = {}
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/7798c9e6/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/socket.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/socket.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/socket.scala
index 440cae0..0b22cbc 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/socket.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/socket.scala
@@ -31,7 +31,6 @@ import org.apache.spark.internal.Logging
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.sources.{DataSourceRegister, StreamSourceProvider}
-import org.apache.spark.sql.sources.v2.reader.Offset
 import org.apache.spark.sql.types.{StringType, StructField, StructType, 
TimestampType}
 import org.apache.spark.unsafe.types.UTF8String
 

http://git-wip-us.apache.org/repos/asf/spark/blob/7798c9e6/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamSourceV2.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamSourceV2.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamSourceV2.scala
new file mode 100644
index 0000000..45dc7d7
--- /dev/null
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamSourceV2.scala
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.sources
+
+import java.util.Optional
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.json4s.DefaultFormats
+import org.json4s.jackson.Serialization
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.execution.streaming.RateStreamOffset
+import org.apache.spark.sql.sources.v2.DataSourceV2Options
+import org.apache.spark.sql.sources.v2.reader._
+import org.apache.spark.sql.types.{LongType, StructField, StructType, 
TimestampType}
+import org.apache.spark.util.SystemClock
+
+class RateStreamV2Reader(options: DataSourceV2Options)
+  extends MicroBatchReader {
+  implicit val defaultFormats: DefaultFormats = DefaultFormats
+
+  val clock = new SystemClock
+
+  private val numPartitions =
+    options.get(RateStreamSourceV2.NUM_PARTITIONS).orElse("5").toInt
+  private val rowsPerSecond =
+    options.get(RateStreamSourceV2.ROWS_PER_SECOND).orElse("6").toLong
+
+  // The interval (in milliseconds) between rows in each partition.
+  // e.g. if there are 4 global rows per second, and 2 partitions, each 
partition
+  // should output rows every (1000 * 2 / 4) = 500 ms.
+  private val msPerPartitionBetweenRows = (1000 * numPartitions) / 
rowsPerSecond
+
+  override def readSchema(): StructType = {
+    StructType(
+      StructField("timestamp", TimestampType, false) ::
+      StructField("value", LongType, false) :: Nil)
+  }
+
+  val creationTimeMs = clock.getTimeMillis()
+
+  private var start: RateStreamOffset = _
+  private var end: RateStreamOffset = _
+
+  override def setOffsetRange(
+      start: Optional[Offset],
+      end: Optional[Offset]): Unit = {
+    this.start = start.orElse(
+      RateStreamSourceV2.createInitialOffset(numPartitions, creationTimeMs))
+      .asInstanceOf[RateStreamOffset]
+
+    this.end = end.orElse {
+      val currentTime = clock.getTimeMillis()
+      RateStreamOffset(
+        this.start.partitionToValueAndRunTimeMs.map {
+          case startOffset @ (part, (currentVal, currentReadTime)) =>
+            // Calculate the number of rows we should advance in this 
partition (based on the
+            // current time), and output a corresponding offset.
+            val readInterval = currentTime - currentReadTime
+            val numNewRows = readInterval / msPerPartitionBetweenRows
+            if (numNewRows <= 0) {
+              startOffset
+            } else {
+              (part,
+                (currentVal + (numNewRows * numPartitions),
+                currentReadTime + (numNewRows * msPerPartitionBetweenRows)))
+            }
+        }
+      )
+    }.asInstanceOf[RateStreamOffset]
+  }
+
+  override def getStartOffset(): Offset = {
+    if (start == null) throw new IllegalStateException("start offset not set")
+    start
+  }
+  override def getEndOffset(): Offset = {
+    if (end == null) throw new IllegalStateException("end offset not set")
+    end
+  }
+
+  override def deserializeOffset(json: String): Offset = {
+    RateStreamOffset(Serialization.read[Map[Int, (Long, Long)]](json))
+  }
+
+  override def createReadTasks(): java.util.List[ReadTask[Row]] = {
+    val startMap = start.partitionToValueAndRunTimeMs
+    val endMap = end.partitionToValueAndRunTimeMs
+    endMap.keys.toSeq.map { part =>
+      val (endVal, _) = endMap(part)
+      val (startVal, startTimeMs) = startMap(part)
+
+      val packedRows = mutable.ListBuffer[(Long, Long)]()
+      var outVal = startVal + numPartitions
+      var outTimeMs = startTimeMs + msPerPartitionBetweenRows
+      while (outVal <= endVal) {
+        packedRows.append((outTimeMs, outVal))
+        outVal += numPartitions
+        outTimeMs += msPerPartitionBetweenRows
+      }
+
+      RateStreamBatchTask(packedRows).asInstanceOf[ReadTask[Row]]
+    }.toList.asJava
+  }
+
+  override def commit(end: Offset): Unit = {}
+  override def stop(): Unit = {}
+}
+
+case class RateStreamBatchTask(vals: Seq[(Long, Long)]) extends ReadTask[Row] {
+  override def createDataReader(): DataReader[Row] = new 
RateStreamBatchReader(vals)
+}
+
+class RateStreamBatchReader(vals: Seq[(Long, Long)]) extends DataReader[Row] {
+  var currentIndex = -1
+
+  override def next(): Boolean = {
+    // Return true as long as the new index is in the seq.
+    currentIndex += 1
+    currentIndex < vals.size
+  }
+
+  override def get(): Row = {
+    Row(
+      
DateTimeUtils.toJavaTimestamp(DateTimeUtils.fromMillis(vals(currentIndex)._1)),
+      vals(currentIndex)._2)
+  }
+
+  override def close(): Unit = {}
+}
+
+object RateStreamSourceV2 {
+  val NUM_PARTITIONS = "numPartitions"
+  val ROWS_PER_SECOND = "rowsPerSecond"
+
+  private[sql] def createInitialOffset(numPartitions: Int, creationTimeMs: 
Long) = {
+    RateStreamOffset(
+      Range(0, numPartitions).map { i =>
+        // Note that the starting offset is exclusive, so we have to decrement 
the starting value
+        // by the increment that will later be applied. The first row output 
in each
+        // partition will have a value equal to the partition index.
+        (i,
+          ((i - numPartitions).toLong,
+            creationTimeMs))
+      }.toMap)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/7798c9e6/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/memoryV2.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/memoryV2.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/memoryV2.scala
new file mode 100644
index 0000000..94c5dd6
--- /dev/null
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/memoryV2.scala
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.sources
+
+import javax.annotation.concurrent.GuardedBy
+
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+import scala.util.control.NonFatal
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.catalyst.streaming.InternalOutputModes.{Append, 
Complete, Update}
+import org.apache.spark.sql.execution.streaming.Sink
+import org.apache.spark.sql.sources.v2.{ContinuousWriteSupport, DataSourceV2, 
DataSourceV2Options, MicroBatchWriteSupport}
+import org.apache.spark.sql.sources.v2.writer._
+import org.apache.spark.sql.streaming.OutputMode
+import org.apache.spark.sql.types.StructType
+
+/**
+ * A sink that stores the results in memory. This [[Sink]] is primarily 
intended for use in unit
+ * tests and does not provide durability.
+ */
+class MemorySinkV2 extends DataSourceV2
+  with MicroBatchWriteSupport with ContinuousWriteSupport with Logging {
+
+  override def createMicroBatchWriter(
+      queryId: String,
+      batchId: Long,
+      schema: StructType,
+      mode: OutputMode,
+      options: DataSourceV2Options): java.util.Optional[DataSourceV2Writer] = {
+    java.util.Optional.of(new MemoryWriter(this, batchId, mode))
+  }
+
+  override def createContinuousWriter(
+      queryId: String,
+      schema: StructType,
+      mode: OutputMode,
+      options: DataSourceV2Options): java.util.Optional[ContinuousWriter] = {
+    java.util.Optional.of(new ContinuousMemoryWriter(this, mode))
+  }
+
+  private case class AddedData(batchId: Long, data: Array[Row])
+
+  /** An order list of batches that have been written to this [[Sink]]. */
+  @GuardedBy("this")
+  private val batches = new ArrayBuffer[AddedData]()
+
+  /** Returns all rows that are stored in this [[Sink]]. */
+  def allData: Seq[Row] = synchronized {
+    batches.flatMap(_.data)
+  }
+
+  def latestBatchId: Option[Long] = synchronized {
+    batches.lastOption.map(_.batchId)
+  }
+
+  def latestBatchData: Seq[Row] = synchronized {
+    batches.lastOption.toSeq.flatten(_.data)
+  }
+
+  def toDebugString: String = synchronized {
+    batches.map { case AddedData(batchId, data) =>
+      val dataStr = try data.mkString(" ") catch {
+        case NonFatal(e) => "[Error converting to string]"
+      }
+      s"$batchId: $dataStr"
+    }.mkString("\n")
+  }
+
+  def write(batchId: Long, outputMode: OutputMode, newRows: Array[Row]): Unit 
= {
+    val notCommitted = synchronized {
+      latestBatchId.isEmpty || batchId > latestBatchId.get
+    }
+    if (notCommitted) {
+      logDebug(s"Committing batch $batchId to $this")
+      outputMode match {
+        case Append | Update =>
+          val rows = AddedData(batchId, newRows)
+          synchronized { batches += rows }
+
+        case Complete =>
+          val rows = AddedData(batchId, newRows)
+          synchronized {
+            batches.clear()
+            batches += rows
+          }
+
+        case _ =>
+          throw new IllegalArgumentException(
+            s"Output mode $outputMode is not supported by MemorySink")
+      }
+    } else {
+      logDebug(s"Skipping already committed batch: $batchId")
+    }
+  }
+
+  def clear(): Unit = synchronized {
+    batches.clear()
+  }
+
+  override def toString(): String = "MemorySink"
+}
+
+case class MemoryWriterCommitMessage(partition: Int, data: Seq[Row]) extends 
WriterCommitMessage {}
+
+class MemoryWriter(sink: MemorySinkV2, batchId: Long, outputMode: OutputMode)
+  extends DataSourceV2Writer with Logging {
+
+  override def createWriterFactory: MemoryWriterFactory = 
MemoryWriterFactory(outputMode)
+
+  def commit(messages: Array[WriterCommitMessage]): Unit = {
+    val newRows = messages.flatMap {
+      case message: MemoryWriterCommitMessage => message.data
+    }
+    sink.write(batchId, outputMode, newRows)
+  }
+
+  override def abort(messages: Array[WriterCommitMessage]): Unit = {
+    // Don't accept any of the new input.
+  }
+}
+
+class ContinuousMemoryWriter(val sink: MemorySinkV2, outputMode: OutputMode)
+  extends ContinuousWriter {
+
+  override def createWriterFactory: MemoryWriterFactory = 
MemoryWriterFactory(outputMode)
+
+  override def commit(epochId: Long, messages: Array[WriterCommitMessage]): 
Unit = {
+    val newRows = messages.flatMap {
+      case message: MemoryWriterCommitMessage => message.data
+    }
+    sink.write(epochId, outputMode, newRows)
+  }
+
+  override def abort(messages: Array[WriterCommitMessage]): Unit = {
+    // Don't accept any of the new input.
+  }
+}
+
+case class MemoryWriterFactory(outputMode: OutputMode) extends 
DataWriterFactory[Row] {
+  def createDataWriter(partitionId: Int, attemptNumber: Int): DataWriter[Row] 
= {
+    new MemoryDataWriter(partitionId, outputMode)
+  }
+}
+
+class MemoryDataWriter(partition: Int, outputMode: OutputMode)
+  extends DataWriter[Row] with Logging {
+
+  private val data = mutable.Buffer[Row]()
+
+  override def write(row: Row): Unit = {
+    data.append(row)
+  }
+
+  override def commit(): MemoryWriterCommitMessage = {
+    val msg = MemoryWriterCommitMessage(partition, data.clone())
+    data.clear()
+    msg
+  }
+
+  override def abort(): Unit = {}
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/7798c9e6/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/MemorySinkV2Suite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/MemorySinkV2Suite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/MemorySinkV2Suite.scala
index be4b490..00d4f0b 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/MemorySinkV2Suite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/MemorySinkV2Suite.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.execution.streaming
 import org.scalatest.BeforeAndAfter
 
 import org.apache.spark.sql.Row
+import org.apache.spark.sql.execution.streaming.sources._
 import org.apache.spark.sql.streaming.{OutputMode, StreamTest}
 
 class MemorySinkV2Suite extends StreamTest with BeforeAndAfter {

http://git-wip-us.apache.org/repos/asf/spark/blob/7798c9e6/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/RateSourceV2Suite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/RateSourceV2Suite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/RateSourceV2Suite.scala
index ef801ce..6514c5f 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/RateSourceV2Suite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/RateSourceV2Suite.scala
@@ -24,6 +24,7 @@ import scala.collection.JavaConverters._
 import org.apache.spark.sql.Row
 import org.apache.spark.sql.execution.datasources.DataSource
 import org.apache.spark.sql.execution.streaming.continuous._
+import org.apache.spark.sql.execution.streaming.sources.{RateStreamBatchTask, 
RateStreamSourceV2, RateStreamV2Reader}
 import org.apache.spark.sql.sources.v2.{ContinuousReadSupport, 
DataSourceV2Options, MicroBatchReadSupport}
 import org.apache.spark.sql.streaming.StreamTest
 

http://git-wip-us.apache.org/repos/asf/spark/blob/7798c9e6/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
index f4fa7fa..39bb572 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
@@ -32,7 +32,6 @@ import org.apache.spark.sql.catalyst.util._
 import org.apache.spark.sql.execution.streaming._
 import org.apache.spark.sql.execution.streaming.FileStreamSource.{FileEntry, 
SeenFilesMap}
 import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.sources.v2.reader.Offset
 import org.apache.spark.sql.streaming.ExistsThrowsExceptionFileSystem._
 import org.apache.spark.sql.streaming.util.StreamManualClock
 import org.apache.spark.sql.test.SharedSQLContext

http://git-wip-us.apache.org/repos/asf/spark/blob/7798c9e6/sql/core/src/test/scala/org/apache/spark/sql/streaming/OffsetSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/OffsetSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/OffsetSuite.scala
index 4297482..f208f9b 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/OffsetSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/OffsetSuite.scala
@@ -18,8 +18,7 @@
 package org.apache.spark.sql.streaming
 
 import org.apache.spark.SparkFunSuite
-import org.apache.spark.sql.execution.streaming.{LongOffset, SerializedOffset}
-import org.apache.spark.sql.sources.v2.reader.Offset
+import org.apache.spark.sql.execution.streaming.{LongOffset, Offset, 
SerializedOffset}
 
 trait OffsetSuite extends SparkFunSuite {
   /** Creates test to check all the comparisons of offsets given a `one` that 
is less than `two`. */

http://git-wip-us.apache.org/repos/asf/spark/blob/7798c9e6/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
index fa4b2dd..7554903 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
@@ -39,7 +39,6 @@ import 
org.apache.spark.sql.execution.streaming.state.{StateStore, StateStoreCon
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.sources.StreamSourceProvider
-import org.apache.spark.sql.sources.v2.reader.Offset
 import org.apache.spark.sql.streaming.util.StreamManualClock
 import org.apache.spark.sql.types.{IntegerType, StructField, StructType}
 import org.apache.spark.util.Utils

http://git-wip-us.apache.org/repos/asf/spark/blob/7798c9e6/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
index fb88c5d..71a474e 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
@@ -39,7 +39,6 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.catalyst.util._
 import org.apache.spark.sql.execution.streaming._
 import org.apache.spark.sql.execution.streaming.state.StateStore
-import org.apache.spark.sql.sources.v2.reader.Offset
 import org.apache.spark.sql.streaming.StreamingQueryListener._
 import org.apache.spark.sql.test.SharedSQLContext
 import org.apache.spark.util.{Clock, SystemClock, Utils}

http://git-wip-us.apache.org/repos/asf/spark/blob/7798c9e6/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala
index 38aa517..97e0651 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala
@@ -34,7 +34,6 @@ import org.apache.spark.sql.execution.streaming._
 import org.apache.spark.sql.execution.streaming.state.StateStore
 import org.apache.spark.sql.expressions.scalalang.typed
 import org.apache.spark.sql.functions._
-import org.apache.spark.sql.sources.v2.reader.Offset
 import org.apache.spark.sql.streaming.OutputMode._
 import org.apache.spark.sql.streaming.util.{MockSourceProvider, 
StreamManualClock}
 import org.apache.spark.sql.types.StructType

http://git-wip-us.apache.org/repos/asf/spark/blob/7798c9e6/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
index fc9ac2a..9ff02de 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
@@ -33,7 +33,6 @@ import org.apache.spark.scheduler._
 import org.apache.spark.sql.{Encoder, SparkSession}
 import org.apache.spark.sql.execution.streaming._
 import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.sources.v2.reader.Offset
 import org.apache.spark.sql.streaming.StreamingQueryListener._
 import org.apache.spark.sql.streaming.util.StreamManualClock
 import org.apache.spark.util.JsonProtocol

http://git-wip-us.apache.org/repos/asf/spark/blob/7798c9e6/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
index ad4d3ab..2fa4595 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
@@ -33,12 +33,10 @@ import org.apache.spark.sql.{DataFrame, Dataset}
 import org.apache.spark.sql.execution.streaming._
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.sources.v2.reader.Offset
 import org.apache.spark.sql.streaming.util.{BlockingSource, 
MockSourceProvider, StreamManualClock}
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.util.ManualClock
 
-
 class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging 
with MockitoSugar {
 
   import AwaitTerminationTester._

http://git-wip-us.apache.org/repos/asf/spark/blob/7798c9e6/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala
index 952908f..aa163d2 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala
@@ -32,7 +32,6 @@ import org.apache.spark.sql._
 import org.apache.spark.sql.execution.streaming._
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.sources.{StreamSinkProvider, StreamSourceProvider}
-import org.apache.spark.sql.sources.v2.reader.Offset
 import org.apache.spark.sql.streaming.{ProcessingTime => 
DeprecatedProcessingTime, _}
 import org.apache.spark.sql.streaming.Trigger._
 import org.apache.spark.sql.types._

http://git-wip-us.apache.org/repos/asf/spark/blob/7798c9e6/sql/core/src/test/scala/org/apache/spark/sql/streaming/util/BlockingSource.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/util/BlockingSource.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/util/BlockingSource.scala
index 9a35f09..19ab2ff 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/util/BlockingSource.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/util/BlockingSource.scala
@@ -20,9 +20,8 @@ package org.apache.spark.sql.streaming.util
 import java.util.concurrent.CountDownLatch
 
 import org.apache.spark.sql.{SQLContext, _}
-import org.apache.spark.sql.execution.streaming.{LongOffset, Sink, Source}
+import org.apache.spark.sql.execution.streaming.{LongOffset, Offset, Sink, 
Source}
 import org.apache.spark.sql.sources.{StreamSinkProvider, StreamSourceProvider}
-import org.apache.spark.sql.sources.v2.reader.Offset
 import org.apache.spark.sql.streaming.OutputMode
 import org.apache.spark.sql.types.{IntegerType, StructField, StructType}
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to