[SPARK-22732] Add Structured Streaming APIs to DataSourceV2

## What changes were proposed in this pull request?

This PR provides DataSourceV2 API support for structured streaming, including 
new pieces needed to support continuous processing [SPARK-20928]. High level 
summary:

- DataSourceV2 includes new mixins to support micro-batch and continuous reads 
and writes. For reads, we accept an optional user specified schema rather than 
using the ReadSupportWithSchema model, because doing so would severely 
complicate the interface.

- DataSourceV2Reader includes new interfaces to read a specific microbatch or 
read continuously from a given offset. These follow the same setter pattern as 
the existing Supports* mixins so that they can work with SupportsScanUnsafeRow.

- DataReader (the per-partition reader) has a new subinterface 
ContinuousDataReader only for continuous processing. This reader has a special 
method to check progress, and next() blocks for new input rather than returning 
false.

- Offset, an abstract representation of position in a streaming query, is 
ported to the public API. (Each type of reader will define its own Offset 
implementation.)

- DataSourceV2Writer has a new subinterface ContinuousWriter only for 
continuous processing. Commits to this interface come tagged with an epoch 
number, as the execution engine will continue to produce new epoch commits as 
the task continues indefinitely.

Note that this PR does not propose to change the existing DataSourceV2 batch 
API, or deprecate the existing streaming source/sink internal APIs in 
spark.sql.execution.streaming.

## How was this patch tested?

Toy implementations of the new interfaces with unit tests.

Author: Jose Torres <[email protected]>

Closes #19925 from joseph-torres/continuous-api.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f8c7c1f2
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f8c7c1f2
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f8c7c1f2

Branch: refs/heads/master
Commit: f8c7c1f21aa9d1fd38b584ca8c4adf397966e9f7
Parents: 1e44dd0
Author: Jose Torres <[email protected]>
Authored: Wed Dec 13 22:31:39 2017 -0800
Committer: Shixiong Zhu <[email protected]>
Committed: Wed Dec 13 22:31:39 2017 -0800

----------------------------------------------------------------------
 .../apache/spark/sql/kafka010/KafkaSource.scala |   1 +
 .../spark/sql/kafka010/KafkaSourceOffset.scala  |   3 +-
 .../spark/sql/kafka010/KafkaSourceSuite.scala   |   1 +
 .../sql/sources/v2/ContinuousReadSupport.java   |  42 +++++
 .../sql/sources/v2/ContinuousWriteSupport.java  |  54 ++++++
 .../sql/sources/v2/DataSourceV2Options.java     |   8 +
 .../sql/sources/v2/MicroBatchReadSupport.java   |  52 ++++++
 .../sql/sources/v2/MicroBatchWriteSupport.java  |  58 ++++++
 .../sources/v2/reader/ContinuousDataReader.java |  36 ++++
 .../sql/sources/v2/reader/ContinuousReader.java |  68 +++++++
 .../sql/sources/v2/reader/MicroBatchReader.java |  64 +++++++
 .../spark/sql/sources/v2/reader/Offset.java     |  60 +++++++
 .../sql/sources/v2/reader/PartitionOffset.java  |  30 ++++
 .../sql/sources/v2/writer/ContinuousWriter.java |  41 +++++
 .../execution/streaming/BaseStreamingSink.java  |  27 +++
 .../streaming/BaseStreamingSource.java          |  37 ++++
 .../execution/streaming/FileStreamSource.scala  |   1 +
 .../streaming/FileStreamSourceOffset.scala      |   3 +
 .../sql/execution/streaming/LongOffset.scala    |   2 +
 .../spark/sql/execution/streaming/Offset.scala  |  34 +---
 .../sql/execution/streaming/OffsetSeq.scala     |   1 +
 .../sql/execution/streaming/OffsetSeqLog.scala  |   1 +
 .../streaming/RateSourceProvider.scala          |  22 ++-
 .../execution/streaming/RateStreamOffset.scala  |  29 +++
 .../streaming/RateStreamSourceV2.scala          | 162 +++++++++++++++++
 .../spark/sql/execution/streaming/Source.scala  |   1 +
 .../execution/streaming/StreamExecution.scala   |   1 +
 .../execution/streaming/StreamProgress.scala    |   2 +
 .../continuous/ContinuousRateStreamSource.scala | 152 ++++++++++++++++
 .../spark/sql/execution/streaming/memory.scala  |   1 +
 .../sql/execution/streaming/memoryV2.scala      | 178 +++++++++++++++++++
 .../spark/sql/execution/streaming/socket.scala  |   1 +
 .../execution/streaming/MemorySinkV2Suite.scala |  82 +++++++++
 .../execution/streaming/OffsetSeqLogSuite.scala |   1 +
 .../execution/streaming/RateSourceSuite.scala   |   1 +
 .../execution/streaming/RateSourceV2Suite.scala | 155 ++++++++++++++++
 .../sql/streaming/FileStreamSourceSuite.scala   |   1 +
 .../spark/sql/streaming/OffsetSuite.scala       |   3 +-
 .../spark/sql/streaming/StreamSuite.scala       |   1 +
 .../apache/spark/sql/streaming/StreamTest.scala |   1 +
 .../streaming/StreamingAggregationSuite.scala   |   1 +
 .../streaming/StreamingQueryListenerSuite.scala |   1 +
 .../sql/streaming/StreamingQuerySuite.scala     |   1 +
 .../test/DataStreamReaderWriterSuite.scala      |   1 +
 .../sql/streaming/util/BlockingSource.scala     |   3 +-
 45 files changed, 1390 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/f8c7c1f2/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
index e9cff04..87f31fc 100644
--- 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
+++ 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
@@ -32,6 +32,7 @@ import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.util.DateTimeUtils
 import org.apache.spark.sql.execution.streaming._
 import org.apache.spark.sql.kafka010.KafkaSource._
+import org.apache.spark.sql.sources.v2.reader.Offset
 import org.apache.spark.sql.types._
 import org.apache.spark.unsafe.types.UTF8String
 

http://git-wip-us.apache.org/repos/asf/spark/blob/f8c7c1f2/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceOffset.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceOffset.scala
 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceOffset.scala
index b5da415..6e24423 100644
--- 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceOffset.scala
+++ 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceOffset.scala
@@ -19,7 +19,8 @@ package org.apache.spark.sql.kafka010
 
 import org.apache.kafka.common.TopicPartition
 
-import org.apache.spark.sql.execution.streaming.{Offset, SerializedOffset}
+import org.apache.spark.sql.execution.streaming.SerializedOffset
+import org.apache.spark.sql.sources.v2.reader.Offset
 
 /**
  * An [[Offset]] for the [[KafkaSource]]. This one tracks all partitions of 
subscribed topics and

http://git-wip-us.apache.org/repos/asf/spark/blob/f8c7c1f2/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
index 2034b9b..9cac0e5 100644
--- 
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
+++ 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
@@ -38,6 +38,7 @@ import org.apache.spark.sql.ForeachWriter
 import org.apache.spark.sql.execution.streaming._
 import org.apache.spark.sql.functions.{count, window}
 import org.apache.spark.sql.kafka010.KafkaSourceProvider._
+import org.apache.spark.sql.sources.v2.reader.Offset
 import org.apache.spark.sql.streaming.{ProcessingTime, StreamTest}
 import org.apache.spark.sql.streaming.util.StreamManualClock
 import org.apache.spark.sql.test.{SharedSQLContext, TestSparkSession}

http://git-wip-us.apache.org/repos/asf/spark/blob/f8c7c1f2/sql/core/src/main/java/org/apache/spark/sql/sources/v2/ContinuousReadSupport.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/ContinuousReadSupport.java
 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/ContinuousReadSupport.java
new file mode 100644
index 0000000..ae4f858
--- /dev/null
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/ContinuousReadSupport.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2;
+
+import java.util.Optional;
+
+import org.apache.spark.sql.sources.v2.reader.ContinuousReader;
+import org.apache.spark.sql.sources.v2.reader.DataSourceV2Reader;
+import org.apache.spark.sql.types.StructType;
+
+/**
+ * A mix-in interface for {@link DataSourceV2}. Data sources can implement 
this interface to
+ * provide data reading ability for continuous stream processing.
+ */
+public interface ContinuousReadSupport extends DataSourceV2 {
+  /**
+   * Creates a {@link ContinuousReader} to scan the data from this data source.
+   *
+   * @param schema the user provided schema, or empty() if none was provided
+   * @param checkpointLocation a path to Hadoop FS scratch space that can be 
used for failure
+   *                           recovery. Readers for the same logical source 
in the same query
+   *                           will be given the same checkpointLocation.
+   * @param options the options for the returned data source reader, which is 
an immutable
+   *                case-insensitive string-to-string map.
+   */
+  ContinuousReader createContinuousReader(Optional<StructType> schema, String 
checkpointLocation, DataSourceV2Options options);
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/f8c7c1f2/sql/core/src/main/java/org/apache/spark/sql/sources/v2/ContinuousWriteSupport.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/ContinuousWriteSupport.java
 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/ContinuousWriteSupport.java
new file mode 100644
index 0000000..362d5f5
--- /dev/null
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/ContinuousWriteSupport.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2;
+
+import java.util.Optional;
+
+import org.apache.spark.annotation.InterfaceStability;
+import org.apache.spark.sql.execution.streaming.BaseStreamingSink;
+import org.apache.spark.sql.sources.v2.writer.ContinuousWriter;
+import org.apache.spark.sql.sources.v2.writer.DataSourceV2Writer;
+import org.apache.spark.sql.streaming.OutputMode;
+import org.apache.spark.sql.types.StructType;
+
+/**
+ * A mix-in interface for {@link DataSourceV2}. Data sources can implement 
this interface to
+ * provide data writing ability for continuous stream processing.
+ */
[email protected]
+public interface ContinuousWriteSupport extends BaseStreamingSink {
+
+    /**
+     * Creates an optional {@link ContinuousWriter} to save the data to this 
data source. Data
+     * sources can return None if there is no writing needed to be done.
+     *
+     * @param queryId A unique string for the writing query. It's possible 
that there are many writing
+     *                queries running at the same time, and the returned 
{@link DataSourceV2Writer}
+     *                can use this id to distinguish itself from others.
+     * @param schema the schema of the data to be written.
+     * @param mode the output mode which determines what successive epoch 
output means to this
+     *             sink, please refer to {@link OutputMode} for more details.
+     * @param options the options for the returned data source writer, which 
is an immutable
+     *                case-insensitive string-to-string map.
+     */
+    Optional<ContinuousWriter> createContinuousWriter(
+        String queryId,
+        StructType schema,
+        OutputMode mode,
+        DataSourceV2Options options);
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/f8c7c1f2/sql/core/src/main/java/org/apache/spark/sql/sources/v2/DataSourceV2Options.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/DataSourceV2Options.java
 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/DataSourceV2Options.java
index e98c045..ddc2acc 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/DataSourceV2Options.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/DataSourceV2Options.java
@@ -36,6 +36,10 @@ public class DataSourceV2Options {
     return key.toLowerCase(Locale.ROOT);
   }
 
+  public static DataSourceV2Options empty() {
+    return new DataSourceV2Options(new HashMap<>());
+  }
+
   public DataSourceV2Options(Map<String, String> originalMap) {
     keyLowerCasedMap = new HashMap<>(originalMap.size());
     for (Map.Entry<String, String> entry : originalMap.entrySet()) {
@@ -43,6 +47,10 @@ public class DataSourceV2Options {
     }
   }
 
+  public Map<String, String> asMap() {
+    return new HashMap<>(keyLowerCasedMap);
+  }
+
   /**
    * Returns the option value to which the specified key is mapped, 
case-insensitively.
    */

http://git-wip-us.apache.org/repos/asf/spark/blob/f8c7c1f2/sql/core/src/main/java/org/apache/spark/sql/sources/v2/MicroBatchReadSupport.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/MicroBatchReadSupport.java
 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/MicroBatchReadSupport.java
new file mode 100644
index 0000000..442cad0
--- /dev/null
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/MicroBatchReadSupport.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2;
+
+import java.util.Optional;
+
+import org.apache.spark.annotation.InterfaceStability;
+import org.apache.spark.sql.sources.v2.reader.MicroBatchReader;
+import org.apache.spark.sql.types.StructType;
+
+/**
+ * A mix-in interface for {@link DataSourceV2}. Data sources can implement 
this interface to
+ * provide streaming micro-batch data reading ability.
+ */
[email protected]
+public interface MicroBatchReadSupport extends DataSourceV2 {
+  /**
+   * Creates a {@link MicroBatchReader} to read batches of data from this data 
source in a
+   * streaming query.
+   *
+   * The execution engine will create a micro-batch reader at the start of a 
streaming query,
+   * alternate calls to setOffsetRange and createReadTasks for each batch to 
process, and then
+   * call stop() when the execution is complete. Note that a single query may 
have multiple
+   * executions due to restart or failure recovery.
+   *
+   * @param schema the user provided schema, or empty() if none was provided
+   * @param checkpointLocation a path to Hadoop FS scratch space that can be 
used for failure
+   *                           recovery. Readers for the same logical source 
in the same query
+   *                           will be given the same checkpointLocation.
+   * @param options the options for the returned data source reader, which is 
an immutable
+   *                case-insensitive string-to-string map.
+   */
+  MicroBatchReader createMicroBatchReader(
+      Optional<StructType> schema,
+      String checkpointLocation,
+      DataSourceV2Options options);
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/f8c7c1f2/sql/core/src/main/java/org/apache/spark/sql/sources/v2/MicroBatchWriteSupport.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/MicroBatchWriteSupport.java
 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/MicroBatchWriteSupport.java
new file mode 100644
index 0000000..6364077
--- /dev/null
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/MicroBatchWriteSupport.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2;
+
+import java.util.Optional;
+
+import org.apache.spark.annotation.InterfaceStability;
+import org.apache.spark.sql.execution.streaming.BaseStreamingSink;
+import org.apache.spark.sql.sources.v2.writer.DataSourceV2Writer;
+import org.apache.spark.sql.streaming.OutputMode;
+import org.apache.spark.sql.types.StructType;
+
+/**
+ * A mix-in interface for {@link DataSourceV2}. Data sources can implement 
this interface to
+ * provide data writing ability and save the data from a microbatch to the 
data source.
+ */
[email protected]
+public interface MicroBatchWriteSupport extends BaseStreamingSink {
+
+  /**
+   * Creates an optional {@link DataSourceV2Writer} to save the data to this 
data source. Data
+   * sources can return None if there is no writing needed to be done.
+   *
+   * @param queryId A unique string for the writing query. It's possible that 
there are many writing
+   *                queries running at the same time, and the returned {@link 
DataSourceV2Writer}
+   *                can use this id to distinguish itself from others.
+   * @param epochId The uniquenumeric ID of the batch within this writing 
query. This is an
+   *                incrementing counter representing a consistent set of 
data; the same batch may
+   *                be started multiple times in failure recovery scenarios, 
but it will always
+   *                contain the same records.
+   * @param schema the schema of the data to be written.
+   * @param mode the output mode which determines what successive batch output 
means to this
+   *             sink, please refer to {@link OutputMode} for more details.
+   * @param options the options for the returned data source writer, which is 
an immutable
+   *                case-insensitive string-to-string map.
+   */
+  Optional<DataSourceV2Writer> createMicroBatchWriter(
+      String queryId,
+      long epochId,
+      StructType schema,
+      OutputMode mode,
+      DataSourceV2Options options);
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/f8c7c1f2/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ContinuousDataReader.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ContinuousDataReader.java
 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ContinuousDataReader.java
new file mode 100644
index 0000000..11b99a9
--- /dev/null
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ContinuousDataReader.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2.reader;
+
+import org.apache.spark.sql.sources.v2.reader.PartitionOffset;
+
+import java.io.IOException;
+
+/**
+ * A variation on {@link DataReader} for use with streaming in continuous 
processing mode.
+ */
+public interface ContinuousDataReader<T> extends DataReader<T> {
+    /**
+     * Get the offset of the current record, or the start offset if no records 
have been read.
+     *
+     * The execution engine will call this method along with get() to keep 
track of the current
+     * offset. When an epoch ends, the offset of the previous record in each 
partition will be saved
+     * as a restart checkpoint.
+     */
+    PartitionOffset getOffset();
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/f8c7c1f2/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ContinuousReader.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ContinuousReader.java
 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ContinuousReader.java
new file mode 100644
index 0000000..1baf82c
--- /dev/null
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ContinuousReader.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2.reader;
+
+import org.apache.spark.sql.sources.v2.reader.PartitionOffset;
+import org.apache.spark.sql.execution.streaming.BaseStreamingSource;
+
+import java.util.Optional;
+
+/**
+ * A mix-in interface for {@link DataSourceV2Reader}. Data source readers can 
implement this
+ * interface to allow reading in a continuous processing mode stream.
+ *
+ * Implementations must ensure each read task output is a {@link 
ContinuousDataReader}.
+ */
+public interface ContinuousReader extends BaseStreamingSource, 
DataSourceV2Reader {
+    /**
+     * Merge offsets coming from {@link ContinuousDataReader} instances in 
each partition to
+     * a single global offset.
+     */
+    Offset mergeOffsets(PartitionOffset[] offsets);
+
+    /**
+     * Deserialize a JSON string into an Offset of the implementation-defined 
offset type.
+     * @throws IllegalArgumentException if the JSON does not encode a valid 
offset for this reader
+     */
+    Offset deserializeOffset(String json);
+
+    /**
+     * Set the desired start offset for read tasks created from this reader. 
The scan will start
+     * from the first record after the provided offset, or from an 
implementation-defined inferred
+     * starting point if no offset is provided.
+     */
+    void setOffset(Optional<Offset> start);
+
+    /**
+     * Return the specified or inferred start offset for this reader.
+     *
+     * @throws IllegalStateException if setOffset has not been called
+     */
+    Offset getStartOffset();
+
+    /**
+     * The execution engine will call this method in every epoch to determine 
if new read tasks need
+     * to be generated, which may be required if for example the underlying 
source system has had
+     * partitions added or removed.
+     *
+     * If true, the query will be shut down and restarted with a new reader.
+     */
+    default boolean needsReconfiguration() {
+        return false;
+    }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/f8c7c1f2/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/MicroBatchReader.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/MicroBatchReader.java
 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/MicroBatchReader.java
new file mode 100644
index 0000000..438e3f5
--- /dev/null
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/MicroBatchReader.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2.reader;
+
+import org.apache.spark.sql.sources.v2.reader.Offset;
+import org.apache.spark.sql.execution.streaming.BaseStreamingSource;
+
+import java.util.Optional;
+
+/**
+ * A mix-in interface for {@link DataSourceV2Reader}. Data source readers can 
implement this
+ * interface to indicate they allow micro-batch streaming reads.
+ */
+public interface MicroBatchReader extends DataSourceV2Reader, 
BaseStreamingSource {
+    /**
+     * Set the desired offset range for read tasks created from this reader. 
Read tasks will
+     * generate only data within (`start`, `end`]; that is, from the first 
record after `start` to
+     * the record with offset `end`.
+     *
+     * @param start The initial offset to scan from. If not specified, scan 
from an
+     *              implementation-specified start point, such as the earliest 
available record.
+     * @param end The last offset to include in the scan. If not specified, 
scan up to an
+     *            implementation-defined endpoint, such as the last available 
offset
+     *            or the start offset plus a target batch size.
+     */
+    void setOffsetRange(Optional<Offset> start, Optional<Offset> end);
+
+    /**
+     * Returns the specified (if explicitly set through setOffsetRange) or 
inferred start offset
+     * for this reader.
+     *
+     * @throws IllegalStateException if setOffsetRange has not been called
+     */
+    Offset getStartOffset();
+
+    /**
+     * Return the specified (if explicitly set through setOffsetRange) or 
inferred end offset
+     * for this reader.
+     *
+     * @throws IllegalStateException if setOffsetRange has not been called
+     */
+    Offset getEndOffset();
+
+    /**
+     * Deserialize a JSON string into an Offset of the implementation-defined 
offset type.
+     * @throws IllegalArgumentException if the JSON does not encode a valid 
offset for this reader
+     */
+    Offset deserializeOffset(String json);
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/f8c7c1f2/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/Offset.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/Offset.java 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/Offset.java
new file mode 100644
index 0000000..1ebd353
--- /dev/null
+++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/Offset.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2.reader;
+
+/**
+ * An abstract representation of progress through a [[MicroBatchReader]] or 
[[ContinuousReader]].
+ * During execution, Offsets provided by the data source implementation will 
be logged and used as
+ * restart checkpoints. Sources should provide an Offset implementation which 
they can use to
+ * reconstruct the stream position where the offset was taken.
+ */
+public abstract class Offset {
+    /**
+     * A JSON-serialized representation of an Offset that is
+     * used for saving offsets to the offset log.
+     * Note: We assume that equivalent/equal offsets serialize to
+     * identical JSON strings.
+     *
+     * @return JSON string encoding
+     */
+    public abstract String json();
+
+    /**
+     * Equality based on JSON string representation. We leverage the
+     * JSON representation for normalization between the Offset's
+     * in memory and on disk representations.
+     */
+    @Override
+    public boolean equals(Object obj) {
+        if (obj instanceof Offset) {
+            return this.json().equals(((Offset) obj).json());
+        } else {
+            return false;
+        }
+    }
+
+    @Override
+    public int hashCode() {
+        return this.json().hashCode();
+    }
+
+    @Override
+    public String toString() {
+        return this.json();
+    }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/f8c7c1f2/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/PartitionOffset.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/PartitionOffset.java
 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/PartitionOffset.java
new file mode 100644
index 0000000..07826b6
--- /dev/null
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/PartitionOffset.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2.reader;
+
+import java.io.Serializable;
+
+/**
+ * Used for per-partition offsets in continuous processing. ContinuousReader 
implementations will
+ * provide a method to merge these into a global Offset.
+ *
+ * These offsets must be serializable.
+ */
+public interface PartitionOffset extends Serializable {
+    
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/f8c7c1f2/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/ContinuousWriter.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/ContinuousWriter.java
 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/ContinuousWriter.java
new file mode 100644
index 0000000..618f47e
--- /dev/null
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/ContinuousWriter.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2.writer;
+
+import org.apache.spark.annotation.InterfaceStability;
+
+/**
+ * A {@link DataSourceV2Writer} for use with continuous stream processing.
+ */
[email protected]
+public interface ContinuousWriter extends DataSourceV2Writer {
+  /**
+   * Commits this writing job for the specified epoch with a list of commit 
messages. The commit
+   * messages are collected from successful data writers and are produced by
+   * {@link DataWriter#commit()}.
+   *
+   * If this method fails (by throwing an exception), this writing job is 
considered to have been
+   * failed, and the execution engine will attempt to call {@link 
#abort(WriterCommitMessage[])}.
+   */
+  void commit(long epochId, WriterCommitMessage[] messages);
+
+  default void commit(WriterCommitMessage[] messages) {
+    throw new UnsupportedOperationException(
+       "Commit without epoch should not be called with ContinuousWriter");
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/f8c7c1f2/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/BaseStreamingSink.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/BaseStreamingSink.java
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/BaseStreamingSink.java
new file mode 100644
index 0000000..ac96c27
--- /dev/null
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/BaseStreamingSink.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming;
+
+/**
+ * The shared interface between V1 and V2 streaming sinks.
+ *
+ * This is a temporary interface for compatibility during migration. It should 
not be implemented
+ * directly, and will be removed in future versions.
+ */
+public interface BaseStreamingSink {
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/f8c7c1f2/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/BaseStreamingSource.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/BaseStreamingSource.java
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/BaseStreamingSource.java
new file mode 100644
index 0000000..3a02cbf
--- /dev/null
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/BaseStreamingSource.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming;
+
+import org.apache.spark.sql.sources.v2.reader.Offset;
+
+/**
+ * The shared interface between V1 streaming sources and V2 streaming readers.
+ *
+ * This is a temporary interface for compatibility during migration. It should 
not be implemented
+ * directly, and will be removed in future versions.
+ */
+public interface BaseStreamingSource {
+  /**
+   * Informs the source that Spark has completed processing all data for 
offsets less than or
+   * equal to `end` and will only request offsets greater than `end` in the 
future.
+   */
+  void commit(Offset end);
+
+  /** Stop this source and free any resources it has allocated. */
+  void stop();
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/f8c7c1f2/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
index 0debd7d..a33b785 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
@@ -27,6 +27,7 @@ import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
 import org.apache.spark.sql.execution.datasources.{DataSource, 
InMemoryFileIndex, LogicalRelation}
+import org.apache.spark.sql.sources.v2.reader.Offset
 import org.apache.spark.sql.types.StructType
 
 /**

http://git-wip-us.apache.org/repos/asf/spark/blob/f8c7c1f2/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceOffset.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceOffset.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceOffset.scala
index 06d0fe6..431e5b9 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceOffset.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceOffset.scala
@@ -22,8 +22,11 @@ import scala.util.control.Exception._
 import org.json4s.NoTypeHints
 import org.json4s.jackson.Serialization
 
+import org.apache.spark.sql.sources.v2.reader.Offset
+
 /**
  * Offset for the [[FileStreamSource]].
+ *
  * @param logOffset  Position in the [[FileStreamSourceLog]]
  */
 case class FileStreamSourceOffset(logOffset: Long) extends Offset {

http://git-wip-us.apache.org/repos/asf/spark/blob/f8c7c1f2/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/LongOffset.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/LongOffset.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/LongOffset.scala
index 5f0b195..7ea3146 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/LongOffset.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/LongOffset.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.sql.execution.streaming
 
+import org.apache.spark.sql.sources.v2.reader.Offset
+
 /**
  * A simple offset for sources that produce a single linear stream of data.
  */

http://git-wip-us.apache.org/repos/asf/spark/blob/f8c7c1f2/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Offset.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Offset.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Offset.scala
index 4efcee0..73f0c62 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Offset.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Offset.scala
@@ -17,38 +17,8 @@
 
 package org.apache.spark.sql.execution.streaming
 
-/**
- * An offset is a monotonically increasing metric used to track progress in 
the computation of a
- * stream. Since offsets are retrieved from a [[Source]] by a single thread, 
we know the global
- * ordering of two [[Offset]] instances.  We do assume that if two offsets are 
`equal` then no
- * new data has arrived.
- */
-abstract class Offset {
-
-  /**
-   * Equality based on JSON string representation. We leverage the
-   * JSON representation for normalization between the Offset's
-   * in memory and on disk representations.
-   */
-  override def equals(obj: Any): Boolean = obj match {
-    case o: Offset => this.json == o.json
-    case _ => false
-  }
-
-  override def hashCode(): Int = this.json.hashCode
+import org.apache.spark.sql.sources.v2.reader.Offset
 
-  override def toString(): String = this.json.toString
-
-  /**
-   * A JSON-serialized representation of an Offset that is
-   * used for saving offsets to the offset log.
-   * Note: We assume that equivalent/equal offsets serialize to
-   * identical JSON strings.
-   *
-   * @return JSON string encoding
-   */
-  def json: String
-}
 
 /**
  * Used when loading a JSON serialized offset from external storage.
@@ -58,3 +28,5 @@ abstract class Offset {
  * that accepts a [[SerializedOffset]] for doing the conversion.
  */
 case class SerializedOffset(override val json: String) extends Offset
+
+

http://git-wip-us.apache.org/repos/asf/spark/blob/f8c7c1f2/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeq.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeq.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeq.scala
index 4e0a468..dcc5935 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeq.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeq.scala
@@ -23,6 +23,7 @@ import org.json4s.jackson.Serialization
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.RuntimeConfig
 import org.apache.spark.sql.internal.SQLConf.{SHUFFLE_PARTITIONS, 
STATE_STORE_PROVIDER_CLASS}
+import org.apache.spark.sql.sources.v2.reader.Offset
 
 /**
  * An ordered collection of offsets, used to track the progress of processing 
data from one or more

http://git-wip-us.apache.org/repos/asf/spark/blob/f8c7c1f2/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLog.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLog.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLog.scala
index e3f4abc..bfdbc65 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLog.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLog.scala
@@ -24,6 +24,7 @@ import java.nio.charset.StandardCharsets._
 import scala.io.{Source => IOSource}
 
 import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.sources.v2.reader.Offset
 
 /**
  * This class is used to log offsets to persistent files in HDFS.

http://git-wip-us.apache.org/repos/asf/spark/blob/f8c7c1f2/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/RateSourceProvider.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/RateSourceProvider.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/RateSourceProvider.scala
index 077a477..50671a4 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/RateSourceProvider.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/RateSourceProvider.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.streaming
 
 import java.io._
 import java.nio.charset.StandardCharsets
+import java.util.Optional
 import java.util.concurrent.TimeUnit
 
 import org.apache.commons.io.IOUtils
@@ -28,7 +29,10 @@ import org.apache.spark.network.util.JavaUtils
 import org.apache.spark.sql.{AnalysisException, DataFrame, SQLContext}
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils}
+import 
org.apache.spark.sql.execution.streaming.continuous.ContinuousRateStreamReader
 import org.apache.spark.sql.sources.{DataSourceRegister, StreamSourceProvider}
+import org.apache.spark.sql.sources.v2._
+import org.apache.spark.sql.sources.v2.reader.{ContinuousReader, 
MicroBatchReader, Offset}
 import org.apache.spark.sql.types._
 import org.apache.spark.util.{ManualClock, SystemClock}
 
@@ -46,7 +50,8 @@ import org.apache.spark.util.{ManualClock, SystemClock}
  *    generated rows. The source will try its best to reach `rowsPerSecond`, 
but the query may
  *    be resource constrained, and `numPartitions` can be tweaked to help 
reach the desired speed.
  */
-class RateSourceProvider extends StreamSourceProvider with DataSourceRegister {
+class RateSourceProvider extends StreamSourceProvider with DataSourceRegister
+  with DataSourceV2 with MicroBatchReadSupport with ContinuousReadSupport{
 
   override def sourceSchema(
       sqlContext: SQLContext,
@@ -100,6 +105,21 @@ class RateSourceProvider extends StreamSourceProvider with 
DataSourceRegister {
       params.get("useManualClock").map(_.toBoolean).getOrElse(false) // Only 
for testing
     )
   }
+
+  override def createMicroBatchReader(
+      schema: Optional[StructType],
+      checkpointLocation: String,
+      options: DataSourceV2Options): MicroBatchReader = {
+    new RateStreamV2Reader(options)
+  }
+
+  override def createContinuousReader(
+      schema: Optional[StructType],
+      checkpointLocation: String,
+      options: DataSourceV2Options): ContinuousReader = {
+    new ContinuousRateStreamReader(options)
+  }
+
   override def shortName(): String = "rate"
 }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/f8c7c1f2/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/RateStreamOffset.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/RateStreamOffset.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/RateStreamOffset.scala
new file mode 100644
index 0000000..13679df
--- /dev/null
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/RateStreamOffset.scala
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import org.json4s.DefaultFormats
+import org.json4s.jackson.Serialization
+
+import org.apache.spark.sql.sources.v2.reader.Offset
+
+case class RateStreamOffset(partitionToValueAndRunTimeMs: Map[Int, (Long, 
Long)])
+  extends Offset {
+  implicit val defaultFormats: DefaultFormats = DefaultFormats
+  override val json = Serialization.write(partitionToValueAndRunTimeMs)
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/f8c7c1f2/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/RateStreamSourceV2.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/RateStreamSourceV2.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/RateStreamSourceV2.scala
new file mode 100644
index 0000000..102551c
--- /dev/null
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/RateStreamSourceV2.scala
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import java.util.Optional
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.json4s.DefaultFormats
+import org.json4s.jackson.Serialization
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.sources.v2.DataSourceV2Options
+import org.apache.spark.sql.sources.v2.reader._
+import org.apache.spark.sql.types.{LongType, StructField, StructType, 
TimestampType}
+import org.apache.spark.util.SystemClock
+
+class RateStreamV2Reader(options: DataSourceV2Options)
+  extends MicroBatchReader {
+  implicit val defaultFormats: DefaultFormats = DefaultFormats
+
+  val clock = new SystemClock
+
+  private val numPartitions =
+    options.get(RateStreamSourceV2.NUM_PARTITIONS).orElse("5").toInt
+  private val rowsPerSecond =
+    options.get(RateStreamSourceV2.ROWS_PER_SECOND).orElse("6").toLong
+
+  // The interval (in milliseconds) between rows in each partition.
+  // e.g. if there are 4 global rows per second, and 2 partitions, each 
partition
+  // should output rows every (1000 * 2 / 4) = 500 ms.
+  private val msPerPartitionBetweenRows = (1000 * numPartitions) / 
rowsPerSecond
+
+  override def readSchema(): StructType = {
+    StructType(
+      StructField("timestamp", TimestampType, false) ::
+      StructField("value", LongType, false) :: Nil)
+  }
+
+  val creationTimeMs = clock.getTimeMillis()
+
+  private var start: RateStreamOffset = _
+  private var end: RateStreamOffset = _
+
+  override def setOffsetRange(start: Optional[Offset], end: Optional[Offset]): 
Unit = {
+    this.start = start.orElse(
+      RateStreamSourceV2.createInitialOffset(numPartitions, creationTimeMs))
+      .asInstanceOf[RateStreamOffset]
+
+    this.end = end.orElse {
+      val currentTime = clock.getTimeMillis()
+      RateStreamOffset(
+        this.start.partitionToValueAndRunTimeMs.map {
+          case startOffset @ (part, (currentVal, currentReadTime)) =>
+            // Calculate the number of rows we should advance in this 
partition (based on the
+            // current time), and output a corresponding offset.
+            val readInterval = currentTime - currentReadTime
+            val numNewRows = readInterval / msPerPartitionBetweenRows
+            if (numNewRows <= 0) {
+              startOffset
+            } else {
+              (part,
+                (currentVal + (numNewRows * numPartitions),
+                currentReadTime + (numNewRows * msPerPartitionBetweenRows)))
+            }
+        }
+      )
+    }.asInstanceOf[RateStreamOffset]
+  }
+
+  override def getStartOffset(): Offset = {
+    if (start == null) throw new IllegalStateException("start offset not set")
+    start
+  }
+  override def getEndOffset(): Offset = {
+    if (end == null) throw new IllegalStateException("end offset not set")
+    end
+  }
+
+  override def deserializeOffset(json: String): Offset = {
+    RateStreamOffset(Serialization.read[Map[Int, (Long, Long)]](json))
+  }
+
+  override def createReadTasks(): java.util.List[ReadTask[Row]] = {
+    val startMap = start.partitionToValueAndRunTimeMs
+    val endMap = end.partitionToValueAndRunTimeMs
+    endMap.keys.toSeq.map { part =>
+      val (endVal, _) = endMap(part)
+      val (startVal, startTimeMs) = startMap(part)
+
+      val packedRows = mutable.ListBuffer[(Long, Long)]()
+      var outVal = startVal + numPartitions
+      var outTimeMs = startTimeMs + msPerPartitionBetweenRows
+      while (outVal <= endVal) {
+        packedRows.append((outTimeMs, outVal))
+        outVal += numPartitions
+        outTimeMs += msPerPartitionBetweenRows
+      }
+
+      RateStreamBatchTask(packedRows).asInstanceOf[ReadTask[Row]]
+    }.toList.asJava
+  }
+
+  override def commit(end: Offset): Unit = {}
+  override def stop(): Unit = {}
+}
+
+case class RateStreamBatchTask(vals: Seq[(Long, Long)]) extends ReadTask[Row] {
+  override def createDataReader(): DataReader[Row] = new 
RateStreamBatchReader(vals)
+}
+
+class RateStreamBatchReader(vals: Seq[(Long, Long)]) extends DataReader[Row] {
+  var currentIndex = -1
+
+  override def next(): Boolean = {
+    // Return true as long as the new index is in the seq.
+    currentIndex += 1
+    currentIndex < vals.size
+  }
+
+  override def get(): Row = {
+    Row(
+      
DateTimeUtils.toJavaTimestamp(DateTimeUtils.fromMillis(vals(currentIndex)._1)),
+      vals(currentIndex)._2)
+  }
+
+  override def close(): Unit = {}
+}
+
+object RateStreamSourceV2 {
+  val NUM_PARTITIONS = "numPartitions"
+  val ROWS_PER_SECOND = "rowsPerSecond"
+
+  private[sql] def createInitialOffset(numPartitions: Int, creationTimeMs: 
Long) = {
+    RateStreamOffset(
+      Range(0, numPartitions).map { i =>
+        // Note that the starting offset is exclusive, so we have to decrement 
the starting value
+        // by the increment that will later be applied. The first row output 
in each
+        // partition will have a value equal to the partition index.
+        (i,
+          ((i - numPartitions).toLong,
+            creationTimeMs))
+      }.toMap)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/f8c7c1f2/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Source.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Source.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Source.scala
index 311942f..dbb408f 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Source.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Source.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.sql.execution.streaming
 
 import org.apache.spark.sql.DataFrame
+import org.apache.spark.sql.sources.v2.reader.Offset
 import org.apache.spark.sql.types.StructType
 
 /**

http://git-wip-us.apache.org/repos/asf/spark/blob/f8c7c1f2/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index 406560c..16063c0 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -39,6 +39,7 @@ import 
org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
 import org.apache.spark.sql.execution.{QueryExecution, SQLExecution}
 import org.apache.spark.sql.execution.command.StreamingExplainCommand
 import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.sources.v2.reader.Offset
 import org.apache.spark.sql.streaming._
 import org.apache.spark.util.{Clock, UninterruptibleThread, Utils}
 

http://git-wip-us.apache.org/repos/asf/spark/blob/f8c7c1f2/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamProgress.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamProgress.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamProgress.scala
index a3f3662..770db40 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamProgress.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamProgress.scala
@@ -19,6 +19,8 @@ package org.apache.spark.sql.execution.streaming
 
 import scala.collection.{immutable, GenTraversableOnce}
 
+import org.apache.spark.sql.sources.v2.reader.Offset
+
 /**
  * A helper class that looks like a Map[Source, Offset].
  */

http://git-wip-us.apache.org/repos/asf/spark/blob/f8c7c1f2/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousRateStreamSource.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousRateStreamSource.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousRateStreamSource.scala
new file mode 100644
index 0000000..77fc267
--- /dev/null
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousRateStreamSource.scala
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.continuous
+
+import scala.collection.JavaConverters._
+
+import org.json4s.DefaultFormats
+import org.json4s.jackson.Serialization
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.execution.streaming._
+import org.apache.spark.sql.sources.v2.{ContinuousReadSupport, DataSourceV2, 
DataSourceV2Options}
+import org.apache.spark.sql.sources.v2.reader._
+import org.apache.spark.sql.types.{LongType, StructField, StructType, 
TimestampType}
+
+case class ContinuousRateStreamPartitionOffset(
+   partition: Int, currentValue: Long, currentTimeMs: Long) extends 
PartitionOffset
+
+class ContinuousRateStreamReader(options: DataSourceV2Options)
+  extends ContinuousReader {
+  implicit val defaultFormats: DefaultFormats = DefaultFormats
+
+  val creationTime = System.currentTimeMillis()
+
+  val numPartitions = 
options.get(RateStreamSourceV2.NUM_PARTITIONS).orElse("5").toInt
+  val rowsPerSecond = 
options.get(RateStreamSourceV2.ROWS_PER_SECOND).orElse("6").toLong
+  val perPartitionRate = rowsPerSecond.toDouble / numPartitions.toDouble
+
+  override def mergeOffsets(offsets: Array[PartitionOffset]): Offset = {
+    assert(offsets.length == numPartitions)
+    val tuples = offsets.map {
+      case ContinuousRateStreamPartitionOffset(i, currVal, nextRead) => (i, 
(currVal, nextRead))
+    }
+    RateStreamOffset(Map(tuples: _*))
+  }
+
+  override def deserializeOffset(json: String): Offset = {
+    RateStreamOffset(Serialization.read[Map[Int, (Long, Long)]](json))
+  }
+
+  override def readSchema(): StructType = RateSourceProvider.SCHEMA
+
+  private var offset: Offset = _
+
+  override def setOffset(offset: java.util.Optional[Offset]): Unit = {
+    this.offset = 
offset.orElse(RateStreamSourceV2.createInitialOffset(numPartitions, 
creationTime))
+  }
+
+  override def getStartOffset(): Offset = offset
+
+  override def createReadTasks(): java.util.List[ReadTask[Row]] = {
+    val partitionStartMap = offset match {
+      case off: RateStreamOffset => off.partitionToValueAndRunTimeMs
+      case off =>
+        throw new IllegalArgumentException(
+          s"invalid offset type ${off.getClass()} for ContinuousRateSource")
+    }
+    if (partitionStartMap.keySet.size != numPartitions) {
+      throw new IllegalArgumentException(
+        s"The previous run contained ${partitionStartMap.keySet.size} 
partitions, but" +
+        s" $numPartitions partitions are currently configured. The 
numPartitions option" +
+        " cannot be changed.")
+    }
+
+    Range(0, numPartitions).map { i =>
+      val start = partitionStartMap(i)
+      // Have each partition advance by numPartitions each row, with starting 
points staggered
+      // by their partition index.
+      RateStreamReadTask(
+        start._1, // starting row value
+        start._2, // starting time in ms
+        i,
+        numPartitions,
+        perPartitionRate)
+        .asInstanceOf[ReadTask[Row]]
+    }.asJava
+  }
+
+  override def commit(end: Offset): Unit = {}
+  override def stop(): Unit = {}
+
+}
+
+case class RateStreamReadTask(
+    startValue: Long,
+    startTimeMs: Long,
+    partitionIndex: Int,
+    increment: Long,
+    rowsPerSecond: Double)
+  extends ReadTask[Row] {
+  override def createDataReader(): DataReader[Row] =
+    new RateStreamDataReader(startValue, startTimeMs, partitionIndex, 
increment, rowsPerSecond)
+}
+
+class RateStreamDataReader(
+    startValue: Long,
+    startTimeMs: Long,
+    partitionIndex: Int,
+    increment: Long,
+    rowsPerSecond: Double)
+  extends ContinuousDataReader[Row] {
+  private var nextReadTime: Long = startTimeMs
+  private val readTimeIncrement: Long = (1000 / rowsPerSecond).toLong
+
+  private var currentValue = startValue
+  private var currentRow: Row = null
+
+  override def next(): Boolean = {
+    currentValue += increment
+    nextReadTime += readTimeIncrement
+
+    try {
+      while (System.currentTimeMillis < nextReadTime) {
+        Thread.sleep(nextReadTime - System.currentTimeMillis)
+      }
+    } catch {
+      case _: InterruptedException =>
+        // Someone's trying to end the task; just let them.
+        return false
+    }
+
+    currentRow = Row(
+      DateTimeUtils.toJavaTimestamp(DateTimeUtils.fromMillis(nextReadTime)),
+      currentValue)
+
+    true
+  }
+
+  override def get: Row = currentRow
+
+  override def close(): Unit = {}
+
+  override def getOffset(): PartitionOffset =
+    ContinuousRateStreamPartitionOffset(partitionIndex, currentValue, 
nextReadTime)
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/f8c7c1f2/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
index 3041d4d..db07175 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
@@ -32,6 +32,7 @@ import org.apache.spark.sql.catalyst.expressions.Attribute
 import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LocalRelation, 
Statistics}
 import org.apache.spark.sql.catalyst.streaming.InternalOutputModes._
 import org.apache.spark.sql.execution.SQLExecution
+import org.apache.spark.sql.sources.v2.reader.Offset
 import org.apache.spark.sql.streaming.OutputMode
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.util.Utils

http://git-wip-us.apache.org/repos/asf/spark/blob/f8c7c1f2/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memoryV2.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memoryV2.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memoryV2.scala
new file mode 100644
index 0000000..437040c
--- /dev/null
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memoryV2.scala
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import javax.annotation.concurrent.GuardedBy
+
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+import scala.util.control.NonFatal
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.catalyst.streaming.InternalOutputModes.{Append, 
Complete, Update}
+import org.apache.spark.sql.sources.v2.{ContinuousWriteSupport, DataSourceV2, 
DataSourceV2Options, MicroBatchWriteSupport}
+import org.apache.spark.sql.sources.v2.writer._
+import org.apache.spark.sql.streaming.OutputMode
+import org.apache.spark.sql.types.StructType
+
+/**
+ * A sink that stores the results in memory. This [[Sink]] is primarily 
intended for use in unit
+ * tests and does not provide durability.
+ */
+class MemorySinkV2 extends DataSourceV2
+  with MicroBatchWriteSupport with ContinuousWriteSupport with Logging {
+
+  override def createMicroBatchWriter(
+      queryId: String,
+      batchId: Long,
+      schema: StructType,
+      mode: OutputMode,
+      options: DataSourceV2Options): java.util.Optional[DataSourceV2Writer] = {
+    java.util.Optional.of(new MemoryWriter(this, batchId, mode))
+  }
+
+  override def createContinuousWriter(
+      queryId: String,
+      schema: StructType,
+      mode: OutputMode,
+      options: DataSourceV2Options): java.util.Optional[ContinuousWriter] = {
+    java.util.Optional.of(new ContinuousMemoryWriter(this, mode))
+  }
+
+  private case class AddedData(batchId: Long, data: Array[Row])
+
+  /** An order list of batches that have been written to this [[Sink]]. */
+  @GuardedBy("this")
+  private val batches = new ArrayBuffer[AddedData]()
+
+  /** Returns all rows that are stored in this [[Sink]]. */
+  def allData: Seq[Row] = synchronized {
+    batches.flatMap(_.data)
+  }
+
+  def latestBatchId: Option[Long] = synchronized {
+    batches.lastOption.map(_.batchId)
+  }
+
+  def latestBatchData: Seq[Row] = synchronized {
+    batches.lastOption.toSeq.flatten(_.data)
+  }
+
+  def toDebugString: String = synchronized {
+    batches.map { case AddedData(batchId, data) =>
+      val dataStr = try data.mkString(" ") catch {
+        case NonFatal(e) => "[Error converting to string]"
+      }
+      s"$batchId: $dataStr"
+    }.mkString("\n")
+  }
+
+  def write(batchId: Long, outputMode: OutputMode, newRows: Array[Row]): Unit 
= {
+    val notCommitted = synchronized {
+      latestBatchId.isEmpty || batchId > latestBatchId.get
+    }
+    if (notCommitted) {
+      logDebug(s"Committing batch $batchId to $this")
+      outputMode match {
+        case Append | Update =>
+          val rows = AddedData(batchId, newRows)
+          synchronized { batches += rows }
+
+        case Complete =>
+          val rows = AddedData(batchId, newRows)
+          synchronized {
+            batches.clear()
+            batches += rows
+          }
+
+        case _ =>
+          throw new IllegalArgumentException(
+            s"Output mode $outputMode is not supported by MemorySink")
+      }
+    } else {
+      logDebug(s"Skipping already committed batch: $batchId")
+    }
+  }
+
+  def clear(): Unit = synchronized {
+    batches.clear()
+  }
+
+  override def toString(): String = "MemorySink"
+}
+
+case class MemoryWriterCommitMessage(partition: Int, data: Seq[Row]) extends 
WriterCommitMessage {}
+
+class MemoryWriter(sink: MemorySinkV2, batchId: Long, outputMode: OutputMode)
+  extends DataSourceV2Writer with Logging {
+
+  override def createWriterFactory: MemoryWriterFactory = 
MemoryWriterFactory(outputMode)
+
+  def commit(messages: Array[WriterCommitMessage]): Unit = {
+    val newRows = messages.flatMap {
+      case message: MemoryWriterCommitMessage => message.data
+    }
+    sink.write(batchId, outputMode, newRows)
+  }
+
+  override def abort(messages: Array[WriterCommitMessage]): Unit = {
+    // Don't accept any of the new input.
+  }
+}
+
+class ContinuousMemoryWriter(val sink: MemorySinkV2, outputMode: OutputMode)
+  extends ContinuousWriter {
+
+  override def createWriterFactory: MemoryWriterFactory = 
MemoryWriterFactory(outputMode)
+
+  override def commit(epochId: Long, messages: Array[WriterCommitMessage]): 
Unit = {
+    val newRows = messages.flatMap {
+      case message: MemoryWriterCommitMessage => message.data
+    }
+    sink.write(epochId, outputMode, newRows)
+  }
+
+  override def abort(messages: Array[WriterCommitMessage]): Unit = {
+    // Don't accept any of the new input.
+  }
+}
+
+case class MemoryWriterFactory(outputMode: OutputMode) extends 
DataWriterFactory[Row] {
+  def createDataWriter(partitionId: Int, attemptNumber: Int): DataWriter[Row] 
= {
+    new MemoryDataWriter(partitionId, outputMode)
+  }
+}
+
+class MemoryDataWriter(partition: Int, outputMode: OutputMode)
+  extends DataWriter[Row] with Logging {
+
+  private val data = mutable.Buffer[Row]()
+
+  override def write(row: Row): Unit = {
+    data.append(row)
+  }
+
+  override def commit(): MemoryWriterCommitMessage = {
+    val msg = MemoryWriterCommitMessage(partition, data.clone())
+    data.clear()
+    msg
+  }
+
+  override def abort(): Unit = {}
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/f8c7c1f2/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/socket.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/socket.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/socket.scala
index 0b22cbc..440cae0 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/socket.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/socket.scala
@@ -31,6 +31,7 @@ import org.apache.spark.internal.Logging
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.sources.{DataSourceRegister, StreamSourceProvider}
+import org.apache.spark.sql.sources.v2.reader.Offset
 import org.apache.spark.sql.types.{StringType, StructField, StructType, 
TimestampType}
 import org.apache.spark.unsafe.types.UTF8String
 

http://git-wip-us.apache.org/repos/asf/spark/blob/f8c7c1f2/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/MemorySinkV2Suite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/MemorySinkV2Suite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/MemorySinkV2Suite.scala
new file mode 100644
index 0000000..be4b490
--- /dev/null
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/MemorySinkV2Suite.scala
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import org.scalatest.BeforeAndAfter
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.streaming.{OutputMode, StreamTest}
+
+class MemorySinkV2Suite extends StreamTest with BeforeAndAfter {
+  test("data writer") {
+    val partition = 1234
+    val writer = new MemoryDataWriter(partition, OutputMode.Append())
+    writer.write(Row(1))
+    writer.write(Row(2))
+    writer.write(Row(44))
+    val msg = writer.commit()
+    assert(msg.data.map(_.getInt(0)) == Seq(1, 2, 44))
+    assert(msg.partition == partition)
+
+    // Buffer should be cleared, so repeated commits should give empty.
+    assert(writer.commit().data.isEmpty)
+  }
+
+  test("continuous writer") {
+    val sink = new MemorySinkV2
+    val writer = new ContinuousMemoryWriter(sink, OutputMode.Append())
+    writer.commit(0,
+      Array(
+        MemoryWriterCommitMessage(0, Seq(Row(1), Row(2))),
+        MemoryWriterCommitMessage(1, Seq(Row(3), Row(4))),
+        MemoryWriterCommitMessage(2, Seq(Row(6), Row(7)))
+      ))
+    assert(sink.latestBatchId.contains(0))
+    assert(sink.latestBatchData.map(_.getInt(0)).sorted == Seq(1, 2, 3, 4, 6, 
7))
+    writer.commit(19,
+      Array(
+        MemoryWriterCommitMessage(3, Seq(Row(11), Row(22))),
+        MemoryWriterCommitMessage(0, Seq(Row(33)))
+      ))
+    assert(sink.latestBatchId.contains(19))
+    assert(sink.latestBatchData.map(_.getInt(0)).sorted == Seq(11, 22, 33))
+
+    assert(sink.allData.map(_.getInt(0)).sorted == Seq(1, 2, 3, 4, 6, 7, 11, 
22, 33))
+  }
+
+  test("microbatch writer") {
+    val sink = new MemorySinkV2
+    new MemoryWriter(sink, 0, OutputMode.Append()).commit(
+      Array(
+        MemoryWriterCommitMessage(0, Seq(Row(1), Row(2))),
+        MemoryWriterCommitMessage(1, Seq(Row(3), Row(4))),
+        MemoryWriterCommitMessage(2, Seq(Row(6), Row(7)))
+      ))
+    assert(sink.latestBatchId.contains(0))
+    assert(sink.latestBatchData.map(_.getInt(0)).sorted == Seq(1, 2, 3, 4, 6, 
7))
+    new MemoryWriter(sink, 19, OutputMode.Append()).commit(
+      Array(
+        MemoryWriterCommitMessage(3, Seq(Row(11), Row(22))),
+        MemoryWriterCommitMessage(0, Seq(Row(33)))
+      ))
+    assert(sink.latestBatchId.contains(19))
+    assert(sink.latestBatchData.map(_.getInt(0)).sorted == Seq(11, 22, 33))
+
+    assert(sink.allData.map(_.getInt(0)).sorted == Seq(1, 2, 3, 4, 6, 7, 11, 
22, 33))
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/f8c7c1f2/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLogSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLogSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLogSuite.scala
index e6cdc06..4868ba4 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLogSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLogSuite.scala
@@ -22,6 +22,7 @@ import java.io.File
 import org.apache.spark.SparkFunSuite
 import org.apache.spark.sql.catalyst.util.stringToFile
 import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.sources.v2.reader.Offset
 import org.apache.spark.sql.test.SharedSQLContext
 
 class OffsetSeqLogSuite extends SparkFunSuite with SharedSQLContext {

http://git-wip-us.apache.org/repos/asf/spark/blob/f8c7c1f2/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/RateSourceSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/RateSourceSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/RateSourceSuite.scala
index 03d0f63..ceba27b 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/RateSourceSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/RateSourceSuite.scala
@@ -21,6 +21,7 @@ import java.util.concurrent.TimeUnit
 
 import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.functions._
+import org.apache.spark.sql.sources.v2.reader.Offset
 import org.apache.spark.sql.streaming.{StreamingQueryException, StreamTest}
 import org.apache.spark.util.ManualClock
 

http://git-wip-us.apache.org/repos/asf/spark/blob/f8c7c1f2/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/RateSourceV2Suite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/RateSourceV2Suite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/RateSourceV2Suite.scala
new file mode 100644
index 0000000..ef801ce
--- /dev/null
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/RateSourceV2Suite.scala
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import java.util.Optional
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.execution.datasources.DataSource
+import org.apache.spark.sql.execution.streaming.continuous._
+import org.apache.spark.sql.sources.v2.{ContinuousReadSupport, 
DataSourceV2Options, MicroBatchReadSupport}
+import org.apache.spark.sql.streaming.StreamTest
+
+class RateSourceV2Suite extends StreamTest {
+  test("microbatch in registry") {
+    DataSource.lookupDataSource("rate", spark.sqlContext.conf).newInstance() 
match {
+      case ds: MicroBatchReadSupport =>
+        val reader = ds.createMicroBatchReader(Optional.empty(), "", 
DataSourceV2Options.empty())
+        assert(reader.isInstanceOf[RateStreamV2Reader])
+      case _ =>
+        throw new IllegalStateException("Could not find v2 read support for 
rate")
+    }
+  }
+
+  test("microbatch - numPartitions propagated") {
+    val reader = new RateStreamV2Reader(
+      new DataSourceV2Options(Map("numPartitions" -> "11", "rowsPerSecond" -> 
"33").asJava))
+    reader.setOffsetRange(Optional.empty(), Optional.empty())
+    val tasks = reader.createReadTasks()
+    assert(tasks.size == 11)
+  }
+
+  test("microbatch - set offset") {
+    val reader = new RateStreamV2Reader(DataSourceV2Options.empty())
+    val startOffset = RateStreamOffset(Map((0, (0, 1000))))
+    val endOffset = RateStreamOffset(Map((0, (0, 2000))))
+    reader.setOffsetRange(Optional.of(startOffset), Optional.of(endOffset))
+    assert(reader.getStartOffset() == startOffset)
+    assert(reader.getEndOffset() == endOffset)
+  }
+
+  test("microbatch - infer offsets") {
+    val reader = new RateStreamV2Reader(
+      new DataSourceV2Options(Map("numPartitions" -> "1", "rowsPerSecond" -> 
"100").asJava))
+    reader.clock.waitTillTime(reader.clock.getTimeMillis() + 100)
+    reader.setOffsetRange(Optional.empty(), Optional.empty())
+    reader.getStartOffset() match {
+      case r: RateStreamOffset =>
+        assert(r.partitionToValueAndRunTimeMs(0)._2 == reader.creationTimeMs)
+      case _ => throw new IllegalStateException("unexpected offset type")
+    }
+    reader.getEndOffset() match {
+      case r: RateStreamOffset =>
+        // End offset may be a bit beyond 100 ms/9 rows after creation if the 
wait lasted
+        // longer than 100ms. It should never be early.
+        assert(r.partitionToValueAndRunTimeMs(0)._1 >= 9)
+        assert(r.partitionToValueAndRunTimeMs(0)._2 >= reader.creationTimeMs + 
100)
+
+      case _ => throw new IllegalStateException("unexpected offset type")
+    }
+  }
+
+  test("microbatch - predetermined batch size") {
+    val reader = new RateStreamV2Reader(
+      new DataSourceV2Options(Map("numPartitions" -> "1", "rowsPerSecond" -> 
"20").asJava))
+    val startOffset = RateStreamOffset(Map((0, (0, 1000))))
+    val endOffset = RateStreamOffset(Map((0, (20, 2000))))
+    reader.setOffsetRange(Optional.of(startOffset), Optional.of(endOffset))
+    val tasks = reader.createReadTasks()
+    assert(tasks.size == 1)
+    assert(tasks.get(0).asInstanceOf[RateStreamBatchTask].vals.size == 20)
+  }
+
+  test("microbatch - data read") {
+    val reader = new RateStreamV2Reader(
+      new DataSourceV2Options(Map("numPartitions" -> "11", "rowsPerSecond" -> 
"33").asJava))
+    val startOffset = RateStreamSourceV2.createInitialOffset(11, 
reader.creationTimeMs)
+    val endOffset = 
RateStreamOffset(startOffset.partitionToValueAndRunTimeMs.toSeq.map {
+      case (part, (currentVal, currentReadTime)) =>
+        (part, (currentVal + 33, currentReadTime + 1000))
+    }.toMap)
+
+    reader.setOffsetRange(Optional.of(startOffset), Optional.of(endOffset))
+    val tasks = reader.createReadTasks()
+    assert(tasks.size == 11)
+
+    val readData = tasks.asScala
+      .map(_.createDataReader())
+      .flatMap { reader =>
+        val buf = scala.collection.mutable.ListBuffer[Row]()
+        while (reader.next()) buf.append(reader.get())
+        buf
+      }
+
+    assert(readData.map(_.getLong(1)).sorted == Range(0, 33))
+  }
+
+  test("continuous in registry") {
+    DataSource.lookupDataSource("rate", spark.sqlContext.conf).newInstance() 
match {
+      case ds: ContinuousReadSupport =>
+        val reader = ds.createContinuousReader(Optional.empty(), "", 
DataSourceV2Options.empty())
+        assert(reader.isInstanceOf[ContinuousRateStreamReader])
+      case _ =>
+        throw new IllegalStateException("Could not find v2 read support for 
rate")
+    }
+  }
+
+  test("continuous data") {
+    val reader = new ContinuousRateStreamReader(
+      new DataSourceV2Options(Map("numPartitions" -> "2", "rowsPerSecond" -> 
"20").asJava))
+    reader.setOffset(Optional.empty())
+    val tasks = reader.createReadTasks()
+    assert(tasks.size == 2)
+
+    val data = scala.collection.mutable.ListBuffer[Row]()
+    tasks.asScala.foreach {
+      case t: RateStreamReadTask =>
+        val startTimeMs = reader.getStartOffset()
+          .asInstanceOf[RateStreamOffset]
+          .partitionToValueAndRunTimeMs(t.partitionIndex)
+          ._2
+        val r = t.createDataReader().asInstanceOf[RateStreamDataReader]
+        for (rowIndex <- 0 to 9) {
+          r.next()
+          data.append(r.get())
+          assert(r.getOffset() ==
+            ContinuousRateStreamPartitionOffset(
+              t.partitionIndex,
+              t.partitionIndex + rowIndex * 2,
+              startTimeMs + (rowIndex + 1) * 100))
+        }
+        assert(System.currentTimeMillis() >= startTimeMs + 1000)
+
+      case _ => throw new IllegalStateException("Unexpected task type")
+    }
+
+    assert(data.map(_.getLong(1)).toSeq.sorted == Range(0, 20))
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to