[SPARK-23268][SQL] Reorganize packages in data source V2

## What changes were proposed in this pull request?
1. create a new package for partitioning/distribution related classes.
    As Spark will add new concrete implementations of `Distribution` in new 
releases, it is good to
    have a new package for partitioning/distribution related classes.

2. move streaming related class to package 
`org.apache.spark.sql.sources.v2.reader/writer.streaming`, instead of 
`org.apache.spark.sql.sources.v2.streaming.reader/writer`.
So that the there won't be package reader/writer inside package streaming, 
which is quite confusing.
Before change:
```
v2
├── reader
├── streaming
│   ├── reader
│   └── writer
└── writer
```

After change:
```
v2
├── reader
│   └── streaming
└── writer
    └── streaming
```
## How was this patch tested?
Unit test.

Author: Wang Gengliang <ltn...@gmail.com>

Closes #20435 from gengliangwang/new_pkg.

(cherry picked from commit 56ae32657e9e5d1e30b62afe77d9e14eb07cf4fb)
Signed-off-by: gatorsmile <gatorsm...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/59e89a29
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/59e89a29
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/59e89a29

Branch: refs/heads/branch-2.3
Commit: 59e89a2990e4f66839c91f48f41157dac6e670ad
Parents: 0d0f579
Author: Wang Gengliang <ltn...@gmail.com>
Authored: Wed Jan 31 20:33:51 2018 -0800
Committer: gatorsmile <gatorsm...@gmail.com>
Committed: Wed Jan 31 20:34:01 2018 -0800

----------------------------------------------------------------------
 .../sql/kafka010/KafkaContinuousReader.scala    |  2 +-
 .../spark/sql/kafka010/KafkaSourceOffset.scala  |  2 +-
 .../sql/kafka010/KafkaSourceProvider.scala      |  5 +-
 .../spark/sql/kafka010/KafkaStreamWriter.scala  |  2 +-
 .../v2/reader/ClusteredDistribution.java        | 38 ----------
 .../v2/reader/ContinuousReadSupport.java        | 48 ++++++++++++
 .../sql/sources/v2/reader/Distribution.java     | 39 ----------
 .../v2/reader/MicroBatchReadSupport.java        | 54 +++++++++++++
 .../sql/sources/v2/reader/Partitioning.java     | 46 ------------
 .../v2/reader/SupportsReportPartitioning.java   |  1 +
 .../partitioning/ClusteredDistribution.java     | 39 ++++++++++
 .../v2/reader/partitioning/Distribution.java    | 40 ++++++++++
 .../v2/reader/partitioning/Partitioning.java    | 48 ++++++++++++
 .../reader/streaming/ContinuousDataReader.java  | 36 +++++++++
 .../v2/reader/streaming/ContinuousReader.java   | 79 ++++++++++++++++++++
 .../v2/reader/streaming/MicroBatchReader.java   | 75 +++++++++++++++++++
 .../sql/sources/v2/reader/streaming/Offset.java | 69 +++++++++++++++++
 .../v2/reader/streaming/PartitionOffset.java    | 32 ++++++++
 .../v2/streaming/ContinuousReadSupport.java     | 48 ------------
 .../v2/streaming/MicroBatchReadSupport.java     | 54 -------------
 .../v2/streaming/StreamWriteSupport.java        | 54 -------------
 .../streaming/reader/ContinuousDataReader.java  | 36 ---------
 .../v2/streaming/reader/ContinuousReader.java   | 79 --------------------
 .../v2/streaming/reader/MicroBatchReader.java   | 75 -------------------
 .../sql/sources/v2/streaming/reader/Offset.java | 69 -----------------
 .../v2/streaming/reader/PartitionOffset.java    | 32 --------
 .../v2/streaming/writer/StreamWriter.java       | 72 ------------------
 .../sql/sources/v2/writer/DataSourceWriter.java |  2 +-
 .../sources/v2/writer/StreamWriteSupport.java   | 53 +++++++++++++
 .../v2/writer/streaming/StreamWriter.java       | 72 ++++++++++++++++++
 .../datasources/v2/DataSourcePartitioning.scala |  2 +-
 .../datasources/v2/DataSourceV2ScanExec.scala   |  2 +-
 .../datasources/v2/WriteToDataSourceV2.scala    |  2 +-
 .../streaming/MicroBatchExecution.scala         |  6 +-
 .../streaming/RateSourceProvider.scala          |  5 +-
 .../execution/streaming/RateStreamOffset.scala  |  2 +-
 .../execution/streaming/StreamingRelation.scala |  2 +-
 .../spark/sql/execution/streaming/console.scala |  4 +-
 .../ContinuousDataSourceRDDIter.scala           | 10 +--
 .../continuous/ContinuousExecution.scala        |  5 +-
 .../continuous/ContinuousRateStreamSource.scala |  2 +-
 .../streaming/continuous/EpochCoordinator.scala |  4 +-
 .../streaming/sources/ConsoleWriter.scala       |  2 +-
 .../streaming/sources/MicroBatchWriter.scala    |  2 +-
 .../streaming/sources/RateStreamSourceV2.scala  |  3 +-
 .../execution/streaming/sources/memoryV2.scala  |  3 +-
 .../spark/sql/streaming/DataStreamReader.scala  |  2 +-
 .../spark/sql/streaming/DataStreamWriter.scala  |  2 +-
 .../sql/streaming/StreamingQueryManager.scala   |  2 +-
 .../v2/JavaPartitionAwareDataSource.java        |  3 +
 .../execution/streaming/RateSourceV2Suite.scala |  2 +-
 .../sql/sources/v2/DataSourceV2Suite.scala      |  1 +
 .../sources/StreamingDataSourceV2Suite.scala    |  8 +-
 53 files changed, 690 insertions(+), 687 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/59e89a29/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala
 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala
index 8c73342..41c443b 100644
--- 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala
+++ 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala
@@ -31,7 +31,7 @@ import 
org.apache.spark.sql.catalyst.expressions.codegen.{BufferHolder, UnsafeRo
 import org.apache.spark.sql.catalyst.util.DateTimeUtils
 import 
org.apache.spark.sql.kafka010.KafkaSource.{INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE,
 INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE}
 import org.apache.spark.sql.sources.v2.reader._
-import org.apache.spark.sql.sources.v2.streaming.reader.{ContinuousDataReader, 
ContinuousReader, Offset, PartitionOffset}
+import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousDataReader, 
ContinuousReader, Offset, PartitionOffset}
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.unsafe.types.UTF8String
 

http://git-wip-us.apache.org/repos/asf/spark/blob/59e89a29/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceOffset.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceOffset.scala
 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceOffset.scala
index c82154c..8d41c0d 100644
--- 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceOffset.scala
+++ 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceOffset.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.kafka010
 import org.apache.kafka.common.TopicPartition
 
 import org.apache.spark.sql.execution.streaming.{Offset, SerializedOffset}
-import org.apache.spark.sql.sources.v2.streaming.reader.{Offset => OffsetV2, 
PartitionOffset}
+import org.apache.spark.sql.sources.v2.reader.streaming.{Offset => OffsetV2, 
PartitionOffset}
 
 /**
  * An [[Offset]] for the [[KafkaSource]]. This one tracks all partitions of 
subscribed topics and

http://git-wip-us.apache.org/repos/asf/spark/blob/59e89a29/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
index 85e96b6..694ca76 100644
--- 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
+++ 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
@@ -31,8 +31,9 @@ import org.apache.spark.sql.{AnalysisException, DataFrame, 
SaveMode, SparkSessio
 import org.apache.spark.sql.execution.streaming.{Sink, Source}
 import org.apache.spark.sql.sources._
 import org.apache.spark.sql.sources.v2.DataSourceOptions
-import org.apache.spark.sql.sources.v2.streaming.{ContinuousReadSupport, 
StreamWriteSupport}
-import org.apache.spark.sql.sources.v2.streaming.writer.StreamWriter
+import org.apache.spark.sql.sources.v2.reader.ContinuousReadSupport
+import org.apache.spark.sql.sources.v2.writer.StreamWriteSupport
+import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter
 import org.apache.spark.sql.streaming.OutputMode
 import org.apache.spark.sql.types.StructType
 

http://git-wip-us.apache.org/repos/asf/spark/blob/59e89a29/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaStreamWriter.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaStreamWriter.scala
 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaStreamWriter.scala
index a24efde..9307bfc 100644
--- 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaStreamWriter.scala
+++ 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaStreamWriter.scala
@@ -22,8 +22,8 @@ import scala.collection.JavaConverters._
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.Attribute
 import org.apache.spark.sql.kafka010.KafkaWriter.validateQuery
-import org.apache.spark.sql.sources.v2.streaming.writer.StreamWriter
 import org.apache.spark.sql.sources.v2.writer._
+import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter
 import org.apache.spark.sql.types.StructType
 
 /**

http://git-wip-us.apache.org/repos/asf/spark/blob/59e89a29/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ClusteredDistribution.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ClusteredDistribution.java
 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ClusteredDistribution.java
deleted file mode 100644
index 27905e3..0000000
--- 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ClusteredDistribution.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.sources.v2.reader;
-
-import org.apache.spark.annotation.InterfaceStability;
-
-/**
- * A concrete implementation of {@link Distribution}. Represents a 
distribution where records that
- * share the same values for the {@link #clusteredColumns} will be produced by 
the same
- * {@link DataReader}.
- */
-@InterfaceStability.Evolving
-public class ClusteredDistribution implements Distribution {
-
-  /**
-   * The names of the clustered columns. Note that they are order insensitive.
-   */
-  public final String[] clusteredColumns;
-
-  public ClusteredDistribution(String[] clusteredColumns) {
-    this.clusteredColumns = clusteredColumns;
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/59e89a29/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ContinuousReadSupport.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ContinuousReadSupport.java
 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ContinuousReadSupport.java
new file mode 100644
index 0000000..0c1d5d1
--- /dev/null
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ContinuousReadSupport.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2.reader;
+
+import java.util.Optional;
+
+import org.apache.spark.annotation.InterfaceStability;
+import org.apache.spark.sql.sources.v2.DataSourceV2;
+import org.apache.spark.sql.sources.v2.DataSourceOptions;
+import org.apache.spark.sql.sources.v2.reader.streaming.ContinuousReader;
+import org.apache.spark.sql.types.StructType;
+
+/**
+ * A mix-in interface for {@link DataSourceV2}. Data sources can implement 
this interface to
+ * provide data reading ability for continuous stream processing.
+ */
+@InterfaceStability.Evolving
+public interface ContinuousReadSupport extends DataSourceV2 {
+  /**
+   * Creates a {@link ContinuousReader} to scan the data from this data source.
+   *
+   * @param schema the user provided schema, or empty() if none was provided
+   * @param checkpointLocation a path to Hadoop FS scratch space that can be 
used for failure
+   *                           recovery. Readers for the same logical source 
in the same query
+   *                           will be given the same checkpointLocation.
+   * @param options the options for the returned data source reader, which is 
an immutable
+   *                case-insensitive string-to-string map.
+   */
+  ContinuousReader createContinuousReader(
+    Optional<StructType> schema,
+    String checkpointLocation,
+    DataSourceOptions options);
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/59e89a29/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/Distribution.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/Distribution.java
 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/Distribution.java
deleted file mode 100644
index b375621..0000000
--- 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/Distribution.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.sources.v2.reader;
-
-import org.apache.spark.annotation.InterfaceStability;
-
-/**
- * An interface to represent data distribution requirement, which specifies 
how the records should
- * be distributed among the data partitions(one {@link DataReader} outputs 
data for one partition).
- * Note that this interface has nothing to do with the data ordering inside one
- * partition(the output records of a single {@link DataReader}).
- *
- * The instance of this interface is created and provided by Spark, then 
consumed by
- * {@link Partitioning#satisfy(Distribution)}. This means data source 
developers don't need to
- * implement this interface, but need to catch as more concrete 
implementations of this interface
- * as possible in {@link Partitioning#satisfy(Distribution)}.
- *
- * Concrete implementations until now:
- * <ul>
- *   <li>{@link ClusteredDistribution}</li>
- * </ul>
- */
-@InterfaceStability.Evolving
-public interface Distribution {}

http://git-wip-us.apache.org/repos/asf/spark/blob/59e89a29/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/MicroBatchReadSupport.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/MicroBatchReadSupport.java
 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/MicroBatchReadSupport.java
new file mode 100644
index 0000000..5e8f0c0
--- /dev/null
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/MicroBatchReadSupport.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2.reader;
+
+import java.util.Optional;
+
+import org.apache.spark.annotation.InterfaceStability;
+import org.apache.spark.sql.sources.v2.DataSourceOptions;
+import org.apache.spark.sql.sources.v2.DataSourceV2;
+import org.apache.spark.sql.sources.v2.reader.streaming.MicroBatchReader;
+import org.apache.spark.sql.types.StructType;
+
+/**
+ * A mix-in interface for {@link DataSourceV2}. Data sources can implement 
this interface to
+ * provide streaming micro-batch data reading ability.
+ */
+@InterfaceStability.Evolving
+public interface MicroBatchReadSupport extends DataSourceV2 {
+  /**
+   * Creates a {@link MicroBatchReader} to read batches of data from this data 
source in a
+   * streaming query.
+   *
+   * The execution engine will create a micro-batch reader at the start of a 
streaming query,
+   * alternate calls to setOffsetRange and createDataReaderFactories for each 
batch to process, and
+   * then call stop() when the execution is complete. Note that a single query 
may have multiple
+   * executions due to restart or failure recovery.
+   *
+   * @param schema the user provided schema, or empty() if none was provided
+   * @param checkpointLocation a path to Hadoop FS scratch space that can be 
used for failure
+   *                           recovery. Readers for the same logical source 
in the same query
+   *                           will be given the same checkpointLocation.
+   * @param options the options for the returned data source reader, which is 
an immutable
+   *                case-insensitive string-to-string map.
+   */
+  MicroBatchReader createMicroBatchReader(
+      Optional<StructType> schema,
+      String checkpointLocation,
+      DataSourceOptions options);
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/59e89a29/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/Partitioning.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/Partitioning.java
 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/Partitioning.java
deleted file mode 100644
index 5e334d1..0000000
--- 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/Partitioning.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.sources.v2.reader;
-
-import org.apache.spark.annotation.InterfaceStability;
-
-/**
- * An interface to represent the output data partitioning for a data source, 
which is returned by
- * {@link SupportsReportPartitioning#outputPartitioning()}. Note that this 
should work like a
- * snapshot. Once created, it should be deterministic and always report the 
same number of
- * partitions and the same "satisfy" result for a certain distribution.
- */
-@InterfaceStability.Evolving
-public interface Partitioning {
-
-  /**
-   * Returns the number of partitions(i.e., {@link DataReaderFactory}s) the 
data source outputs.
-   */
-  int numPartitions();
-
-  /**
-   * Returns true if this partitioning can satisfy the given distribution, 
which means Spark does
-   * not need to shuffle the output data of this data source for some certain 
operations.
-   *
-   * Note that, Spark may add new concrete implementations of {@link 
Distribution} in new releases.
-   * This method should be aware of it and always return false for 
unrecognized distributions. It's
-   * recommended to check every Spark new release and support new 
distributions if possible, to
-   * avoid shuffle at Spark side for more cases.
-   */
-  boolean satisfy(Distribution distribution);
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/59e89a29/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsReportPartitioning.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsReportPartitioning.java
 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsReportPartitioning.java
index a2383a9..5405a91 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsReportPartitioning.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsReportPartitioning.java
@@ -18,6 +18,7 @@
 package org.apache.spark.sql.sources.v2.reader;
 
 import org.apache.spark.annotation.InterfaceStability;
+import org.apache.spark.sql.sources.v2.reader.partitioning.Partitioning;
 
 /**
  * A mix in interface for {@link DataSourceReader}. Data source readers can 
implement this

http://git-wip-us.apache.org/repos/asf/spark/blob/59e89a29/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/partitioning/ClusteredDistribution.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/partitioning/ClusteredDistribution.java
 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/partitioning/ClusteredDistribution.java
new file mode 100644
index 0000000..2d0ee50
--- /dev/null
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/partitioning/ClusteredDistribution.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2.reader.partitioning;
+
+import org.apache.spark.annotation.InterfaceStability;
+import org.apache.spark.sql.sources.v2.reader.DataReader;
+
+/**
+ * A concrete implementation of {@link Distribution}. Represents a 
distribution where records that
+ * share the same values for the {@link #clusteredColumns} will be produced by 
the same
+ * {@link DataReader}.
+ */
+@InterfaceStability.Evolving
+public class ClusteredDistribution implements Distribution {
+
+  /**
+   * The names of the clustered columns. Note that they are order insensitive.
+   */
+  public final String[] clusteredColumns;
+
+  public ClusteredDistribution(String[] clusteredColumns) {
+    this.clusteredColumns = clusteredColumns;
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/59e89a29/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/partitioning/Distribution.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/partitioning/Distribution.java
 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/partitioning/Distribution.java
new file mode 100644
index 0000000..f6b111f
--- /dev/null
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/partitioning/Distribution.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2.reader.partitioning;
+
+import org.apache.spark.annotation.InterfaceStability;
+import org.apache.spark.sql.sources.v2.reader.DataReader;
+
+/**
+ * An interface to represent data distribution requirement, which specifies 
how the records should
+ * be distributed among the data partitions(one {@link DataReader} outputs 
data for one partition).
+ * Note that this interface has nothing to do with the data ordering inside one
+ * partition(the output records of a single {@link DataReader}).
+ *
+ * The instance of this interface is created and provided by Spark, then 
consumed by
+ * {@link Partitioning#satisfy(Distribution)}. This means data source 
developers don't need to
+ * implement this interface, but need to catch as more concrete 
implementations of this interface
+ * as possible in {@link Partitioning#satisfy(Distribution)}.
+ *
+ * Concrete implementations until now:
+ * <ul>
+ *   <li>{@link ClusteredDistribution}</li>
+ * </ul>
+ */
+@InterfaceStability.Evolving
+public interface Distribution {}

http://git-wip-us.apache.org/repos/asf/spark/blob/59e89a29/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/partitioning/Partitioning.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/partitioning/Partitioning.java
 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/partitioning/Partitioning.java
new file mode 100644
index 0000000..309d9e5
--- /dev/null
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/partitioning/Partitioning.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2.reader.partitioning;
+
+import org.apache.spark.annotation.InterfaceStability;
+import org.apache.spark.sql.sources.v2.reader.DataReaderFactory;
+import org.apache.spark.sql.sources.v2.reader.SupportsReportPartitioning;
+
+/**
+ * An interface to represent the output data partitioning for a data source, 
which is returned by
+ * {@link SupportsReportPartitioning#outputPartitioning()}. Note that this 
should work like a
+ * snapshot. Once created, it should be deterministic and always report the 
same number of
+ * partitions and the same "satisfy" result for a certain distribution.
+ */
+@InterfaceStability.Evolving
+public interface Partitioning {
+
+  /**
+   * Returns the number of partitions(i.e., {@link DataReaderFactory}s) the 
data source outputs.
+   */
+  int numPartitions();
+
+  /**
+   * Returns true if this partitioning can satisfy the given distribution, 
which means Spark does
+   * not need to shuffle the output data of this data source for some certain 
operations.
+   *
+   * Note that, Spark may add new concrete implementations of {@link 
Distribution} in new releases.
+   * This method should be aware of it and always return false for 
unrecognized distributions. It's
+   * recommended to check every Spark new release and support new 
distributions if possible, to
+   * avoid shuffle at Spark side for more cases.
+   */
+  boolean satisfy(Distribution distribution);
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/59e89a29/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/ContinuousDataReader.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/ContinuousDataReader.java
 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/ContinuousDataReader.java
new file mode 100644
index 0000000..47d2644
--- /dev/null
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/ContinuousDataReader.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2.reader.streaming;
+
+import org.apache.spark.annotation.InterfaceStability;
+import org.apache.spark.sql.sources.v2.reader.DataReader;
+
+/**
+ * A variation on {@link DataReader} for use with streaming in continuous 
processing mode.
+ */
+@InterfaceStability.Evolving
+public interface ContinuousDataReader<T> extends DataReader<T> {
+    /**
+     * Get the offset of the current record, or the start offset if no records 
have been read.
+     *
+     * The execution engine will call this method along with get() to keep 
track of the current
+     * offset. When an epoch ends, the offset of the previous record in each 
partition will be saved
+     * as a restart checkpoint.
+     */
+    PartitionOffset getOffset();
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/59e89a29/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/ContinuousReader.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/ContinuousReader.java
 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/ContinuousReader.java
new file mode 100644
index 0000000..d1d1e7f
--- /dev/null
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/ContinuousReader.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2.reader.streaming;
+
+import org.apache.spark.annotation.InterfaceStability;
+import org.apache.spark.sql.execution.streaming.BaseStreamingSource;
+import org.apache.spark.sql.sources.v2.reader.DataSourceReader;
+
+import java.util.Optional;
+
+/**
+ * A mix-in interface for {@link DataSourceReader}. Data source readers can 
implement this
+ * interface to allow reading in a continuous processing mode stream.
+ *
+ * Implementations must ensure each reader factory output is a {@link 
ContinuousDataReader}.
+ *
+ * Note: This class currently extends {@link BaseStreamingSource} to maintain 
compatibility with
+ * DataSource V1 APIs. This extension will be removed once we get rid of V1 
completely.
+ */
+@InterfaceStability.Evolving
+public interface ContinuousReader extends BaseStreamingSource, 
DataSourceReader {
+    /**
+     * Merge partitioned offsets coming from {@link ContinuousDataReader} 
instances for each
+     * partition to a single global offset.
+     */
+    Offset mergeOffsets(PartitionOffset[] offsets);
+
+    /**
+     * Deserialize a JSON string into an Offset of the implementation-defined 
offset type.
+     * @throws IllegalArgumentException if the JSON does not encode a valid 
offset for this reader
+     */
+    Offset deserializeOffset(String json);
+
+    /**
+     * Set the desired start offset for reader factories created from this 
reader. The scan will
+     * start from the first record after the provided offset, or from an 
implementation-defined
+     * inferred starting point if no offset is provided.
+     */
+    void setOffset(Optional<Offset> start);
+
+    /**
+     * Return the specified or inferred start offset for this reader.
+     *
+     * @throws IllegalStateException if setOffset has not been called
+     */
+    Offset getStartOffset();
+
+    /**
+     * The execution engine will call this method in every epoch to determine 
if new reader
+     * factories need to be generated, which may be required if for example 
the underlying
+     * source system has had partitions added or removed.
+     *
+     * If true, the query will be shut down and restarted with a new reader.
+     */
+    default boolean needsReconfiguration() {
+        return false;
+    }
+
+    /**
+     * Informs the source that Spark has completed processing all data for 
offsets less than or
+     * equal to `end` and will only request offsets greater than `end` in the 
future.
+     */
+    void commit(Offset end);
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/59e89a29/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/MicroBatchReader.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/MicroBatchReader.java
 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/MicroBatchReader.java
new file mode 100644
index 0000000..67ebde3
--- /dev/null
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/MicroBatchReader.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2.reader.streaming;
+
+import org.apache.spark.annotation.InterfaceStability;
+import org.apache.spark.sql.sources.v2.reader.DataSourceReader;
+import org.apache.spark.sql.execution.streaming.BaseStreamingSource;
+
+import java.util.Optional;
+
+/**
+ * A mix-in interface for {@link DataSourceReader}. Data source readers can 
implement this
+ * interface to indicate they allow micro-batch streaming reads.
+ *
+ * Note: This class currently extends {@link BaseStreamingSource} to maintain 
compatibility with
+ * DataSource V1 APIs. This extension will be removed once we get rid of V1 
completely.
+ */
+@InterfaceStability.Evolving
+public interface MicroBatchReader extends DataSourceReader, 
BaseStreamingSource {
+    /**
+     * Set the desired offset range for reader factories created from this 
reader. Reader factories
+     * will generate only data within (`start`, `end`]; that is, from the 
first record after `start`
+     * to the record with offset `end`.
+     *
+     * @param start The initial offset to scan from. If not specified, scan 
from an
+     *              implementation-specified start point, such as the earliest 
available record.
+     * @param end The last offset to include in the scan. If not specified, 
scan up to an
+     *            implementation-defined endpoint, such as the last available 
offset
+     *            or the start offset plus a target batch size.
+     */
+    void setOffsetRange(Optional<Offset> start, Optional<Offset> end);
+
+    /**
+     * Returns the specified (if explicitly set through setOffsetRange) or 
inferred start offset
+     * for this reader.
+     *
+     * @throws IllegalStateException if setOffsetRange has not been called
+     */
+    Offset getStartOffset();
+
+    /**
+     * Return the specified (if explicitly set through setOffsetRange) or 
inferred end offset
+     * for this reader.
+     *
+     * @throws IllegalStateException if setOffsetRange has not been called
+     */
+    Offset getEndOffset();
+
+    /**
+     * Deserialize a JSON string into an Offset of the implementation-defined 
offset type.
+     * @throws IllegalArgumentException if the JSON does not encode a valid 
offset for this reader
+     */
+    Offset deserializeOffset(String json);
+
+    /**
+     * Informs the source that Spark has completed processing all data for 
offsets less than or
+     * equal to `end` and will only request offsets greater than `end` in the 
future.
+     */
+    void commit(Offset end);
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/59e89a29/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/Offset.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/Offset.java
 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/Offset.java
new file mode 100644
index 0000000..e41c035
--- /dev/null
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/Offset.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2.reader.streaming;
+
+import org.apache.spark.annotation.InterfaceStability;
+
+/**
+ * An abstract representation of progress through a {@link MicroBatchReader} or
+ * {@link ContinuousReader}.
+ * During execution, offsets provided by the data source implementation will 
be logged and used as
+ * restart checkpoints. Each source should provide an offset implementation 
which the source can use
+ * to reconstruct a position in the stream up to which data has been 
seen/processed.
+ *
+ * Note: This class currently extends {@link 
org.apache.spark.sql.execution.streaming.Offset} to
+ * maintain compatibility with DataSource V1 APIs. This extension will be 
removed once we
+ * get rid of V1 completely.
+ */
+@InterfaceStability.Evolving
+public abstract class Offset extends 
org.apache.spark.sql.execution.streaming.Offset {
+    /**
+     * A JSON-serialized representation of an Offset that is
+     * used for saving offsets to the offset log.
+     * Note: We assume that equivalent/equal offsets serialize to
+     * identical JSON strings.
+     *
+     * @return JSON string encoding
+     */
+    public abstract String json();
+
+    /**
+     * Equality based on JSON string representation. We leverage the
+     * JSON representation for normalization between the Offset's
+     * in deserialized and serialized representations.
+     */
+    @Override
+    public boolean equals(Object obj) {
+        if (obj instanceof org.apache.spark.sql.execution.streaming.Offset) {
+            return this.json()
+                .equals(((org.apache.spark.sql.execution.streaming.Offset) 
obj).json());
+        } else {
+            return false;
+        }
+    }
+
+    @Override
+    public int hashCode() {
+        return this.json().hashCode();
+    }
+
+    @Override
+    public String toString() {
+        return this.json();
+    }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/59e89a29/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/PartitionOffset.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/PartitionOffset.java
 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/PartitionOffset.java
new file mode 100644
index 0000000..383e73d
--- /dev/null
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/PartitionOffset.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2.reader.streaming;
+
+import java.io.Serializable;
+
+import org.apache.spark.annotation.InterfaceStability;
+
+/**
+ * Used for per-partition offsets in continuous processing. ContinuousReader 
implementations will
+ * provide a method to merge these into a global Offset.
+ *
+ * These offsets must be serializable.
+ */
+@InterfaceStability.Evolving
+public interface PartitionOffset extends Serializable {
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/59e89a29/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/ContinuousReadSupport.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/ContinuousReadSupport.java
 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/ContinuousReadSupport.java
deleted file mode 100644
index f79424e..0000000
--- 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/ContinuousReadSupport.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.sources.v2.streaming;
-
-import java.util.Optional;
-
-import org.apache.spark.annotation.InterfaceStability;
-import org.apache.spark.sql.sources.v2.DataSourceV2;
-import org.apache.spark.sql.sources.v2.DataSourceOptions;
-import org.apache.spark.sql.sources.v2.streaming.reader.ContinuousReader;
-import org.apache.spark.sql.types.StructType;
-
-/**
- * A mix-in interface for {@link DataSourceV2}. Data sources can implement 
this interface to
- * provide data reading ability for continuous stream processing.
- */
-@InterfaceStability.Evolving
-public interface ContinuousReadSupport extends DataSourceV2 {
-  /**
-   * Creates a {@link ContinuousReader} to scan the data from this data source.
-   *
-   * @param schema the user provided schema, or empty() if none was provided
-   * @param checkpointLocation a path to Hadoop FS scratch space that can be 
used for failure
-   *                           recovery. Readers for the same logical source 
in the same query
-   *                           will be given the same checkpointLocation.
-   * @param options the options for the returned data source reader, which is 
an immutable
-   *                case-insensitive string-to-string map.
-   */
-  ContinuousReader createContinuousReader(
-    Optional<StructType> schema,
-    String checkpointLocation,
-    DataSourceOptions options);
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/59e89a29/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/MicroBatchReadSupport.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/MicroBatchReadSupport.java
 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/MicroBatchReadSupport.java
deleted file mode 100644
index 22660e4..0000000
--- 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/MicroBatchReadSupport.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.sources.v2.streaming;
-
-import java.util.Optional;
-
-import org.apache.spark.annotation.InterfaceStability;
-import org.apache.spark.sql.sources.v2.DataSourceOptions;
-import org.apache.spark.sql.sources.v2.DataSourceV2;
-import org.apache.spark.sql.sources.v2.streaming.reader.MicroBatchReader;
-import org.apache.spark.sql.types.StructType;
-
-/**
- * A mix-in interface for {@link DataSourceV2}. Data sources can implement 
this interface to
- * provide streaming micro-batch data reading ability.
- */
-@InterfaceStability.Evolving
-public interface MicroBatchReadSupport extends DataSourceV2 {
-  /**
-   * Creates a {@link MicroBatchReader} to read batches of data from this data 
source in a
-   * streaming query.
-   *
-   * The execution engine will create a micro-batch reader at the start of a 
streaming query,
-   * alternate calls to setOffsetRange and createDataReaderFactories for each 
batch to process, and
-   * then call stop() when the execution is complete. Note that a single query 
may have multiple
-   * executions due to restart or failure recovery.
-   *
-   * @param schema the user provided schema, or empty() if none was provided
-   * @param checkpointLocation a path to Hadoop FS scratch space that can be 
used for failure
-   *                           recovery. Readers for the same logical source 
in the same query
-   *                           will be given the same checkpointLocation.
-   * @param options the options for the returned data source reader, which is 
an immutable
-   *                case-insensitive string-to-string map.
-   */
-  MicroBatchReader createMicroBatchReader(
-      Optional<StructType> schema,
-      String checkpointLocation,
-      DataSourceOptions options);
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/59e89a29/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/StreamWriteSupport.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/StreamWriteSupport.java
 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/StreamWriteSupport.java
deleted file mode 100644
index 7c5f304..0000000
--- 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/StreamWriteSupport.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.sources.v2.streaming;
-
-import org.apache.spark.annotation.InterfaceStability;
-import org.apache.spark.sql.execution.streaming.BaseStreamingSink;
-import org.apache.spark.sql.sources.v2.DataSourceOptions;
-import org.apache.spark.sql.sources.v2.DataSourceV2;
-import org.apache.spark.sql.sources.v2.streaming.writer.StreamWriter;
-import org.apache.spark.sql.sources.v2.writer.DataSourceWriter;
-import org.apache.spark.sql.streaming.OutputMode;
-import org.apache.spark.sql.types.StructType;
-
-/**
- * A mix-in interface for {@link DataSourceV2}. Data sources can implement 
this interface to
- * provide data writing ability for structured streaming.
- */
-@InterfaceStability.Evolving
-public interface StreamWriteSupport extends DataSourceV2, BaseStreamingSink {
-
-    /**
-     * Creates an optional {@link StreamWriter} to save the data to this data 
source. Data
-     * sources can return None if there is no writing needed to be done.
-     *
-     * @param queryId A unique string for the writing query. It's possible 
that there are many
-     *                writing queries running at the same time, and the 
returned
-     *                {@link DataSourceWriter} can use this id to distinguish 
itself from others.
-     * @param schema the schema of the data to be written.
-     * @param mode the output mode which determines what successive epoch 
output means to this
-     *             sink, please refer to {@link OutputMode} for more details.
-     * @param options the options for the returned data source writer, which 
is an immutable
-     *                case-insensitive string-to-string map.
-     */
-    StreamWriter createStreamWriter(
-        String queryId,
-        StructType schema,
-        OutputMode mode,
-        DataSourceOptions options);
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/59e89a29/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/reader/ContinuousDataReader.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/reader/ContinuousDataReader.java
 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/reader/ContinuousDataReader.java
deleted file mode 100644
index 3f13a4d..0000000
--- 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/reader/ContinuousDataReader.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.sources.v2.streaming.reader;
-
-import org.apache.spark.annotation.InterfaceStability;
-import org.apache.spark.sql.sources.v2.reader.DataReader;
-
-/**
- * A variation on {@link DataReader} for use with streaming in continuous 
processing mode.
- */
-@InterfaceStability.Evolving
-public interface ContinuousDataReader<T> extends DataReader<T> {
-    /**
-     * Get the offset of the current record, or the start offset if no records 
have been read.
-     *
-     * The execution engine will call this method along with get() to keep 
track of the current
-     * offset. When an epoch ends, the offset of the previous record in each 
partition will be saved
-     * as a restart checkpoint.
-     */
-    PartitionOffset getOffset();
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/59e89a29/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/reader/ContinuousReader.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/reader/ContinuousReader.java
 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/reader/ContinuousReader.java
deleted file mode 100644
index 6e5177e..0000000
--- 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/reader/ContinuousReader.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.sources.v2.streaming.reader;
-
-import org.apache.spark.annotation.InterfaceStability;
-import org.apache.spark.sql.execution.streaming.BaseStreamingSource;
-import org.apache.spark.sql.sources.v2.reader.DataSourceReader;
-
-import java.util.Optional;
-
-/**
- * A mix-in interface for {@link DataSourceReader}. Data source readers can 
implement this
- * interface to allow reading in a continuous processing mode stream.
- *
- * Implementations must ensure each reader factory output is a {@link 
ContinuousDataReader}.
- *
- * Note: This class currently extends {@link BaseStreamingSource} to maintain 
compatibility with
- * DataSource V1 APIs. This extension will be removed once we get rid of V1 
completely.
- */
-@InterfaceStability.Evolving
-public interface ContinuousReader extends BaseStreamingSource, 
DataSourceReader {
-    /**
-     * Merge partitioned offsets coming from {@link ContinuousDataReader} 
instances for each
-     * partition to a single global offset.
-     */
-    Offset mergeOffsets(PartitionOffset[] offsets);
-
-    /**
-     * Deserialize a JSON string into an Offset of the implementation-defined 
offset type.
-     * @throws IllegalArgumentException if the JSON does not encode a valid 
offset for this reader
-     */
-    Offset deserializeOffset(String json);
-
-    /**
-     * Set the desired start offset for reader factories created from this 
reader. The scan will
-     * start from the first record after the provided offset, or from an 
implementation-defined
-     * inferred starting point if no offset is provided.
-     */
-    void setOffset(Optional<Offset> start);
-
-    /**
-     * Return the specified or inferred start offset for this reader.
-     *
-     * @throws IllegalStateException if setOffset has not been called
-     */
-    Offset getStartOffset();
-
-    /**
-     * The execution engine will call this method in every epoch to determine 
if new reader
-     * factories need to be generated, which may be required if for example 
the underlying
-     * source system has had partitions added or removed.
-     *
-     * If true, the query will be shut down and restarted with a new reader.
-     */
-    default boolean needsReconfiguration() {
-        return false;
-    }
-
-    /**
-     * Informs the source that Spark has completed processing all data for 
offsets less than or
-     * equal to `end` and will only request offsets greater than `end` in the 
future.
-     */
-    void commit(Offset end);
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/59e89a29/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/reader/MicroBatchReader.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/reader/MicroBatchReader.java
 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/reader/MicroBatchReader.java
deleted file mode 100644
index fcec446..0000000
--- 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/reader/MicroBatchReader.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.sources.v2.streaming.reader;
-
-import org.apache.spark.annotation.InterfaceStability;
-import org.apache.spark.sql.sources.v2.reader.DataSourceReader;
-import org.apache.spark.sql.execution.streaming.BaseStreamingSource;
-
-import java.util.Optional;
-
-/**
- * A mix-in interface for {@link DataSourceReader}. Data source readers can 
implement this
- * interface to indicate they allow micro-batch streaming reads.
- *
- * Note: This class currently extends {@link BaseStreamingSource} to maintain 
compatibility with
- * DataSource V1 APIs. This extension will be removed once we get rid of V1 
completely.
- */
-@InterfaceStability.Evolving
-public interface MicroBatchReader extends DataSourceReader, 
BaseStreamingSource {
-    /**
-     * Set the desired offset range for reader factories created from this 
reader. Reader factories
-     * will generate only data within (`start`, `end`]; that is, from the 
first record after `start`
-     * to the record with offset `end`.
-     *
-     * @param start The initial offset to scan from. If not specified, scan 
from an
-     *              implementation-specified start point, such as the earliest 
available record.
-     * @param end The last offset to include in the scan. If not specified, 
scan up to an
-     *            implementation-defined endpoint, such as the last available 
offset
-     *            or the start offset plus a target batch size.
-     */
-    void setOffsetRange(Optional<Offset> start, Optional<Offset> end);
-
-    /**
-     * Returns the specified (if explicitly set through setOffsetRange) or 
inferred start offset
-     * for this reader.
-     *
-     * @throws IllegalStateException if setOffsetRange has not been called
-     */
-    Offset getStartOffset();
-
-    /**
-     * Return the specified (if explicitly set through setOffsetRange) or 
inferred end offset
-     * for this reader.
-     *
-     * @throws IllegalStateException if setOffsetRange has not been called
-     */
-    Offset getEndOffset();
-
-    /**
-     * Deserialize a JSON string into an Offset of the implementation-defined 
offset type.
-     * @throws IllegalArgumentException if the JSON does not encode a valid 
offset for this reader
-     */
-    Offset deserializeOffset(String json);
-
-    /**
-     * Informs the source that Spark has completed processing all data for 
offsets less than or
-     * equal to `end` and will only request offsets greater than `end` in the 
future.
-     */
-    void commit(Offset end);
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/59e89a29/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/reader/Offset.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/reader/Offset.java
 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/reader/Offset.java
deleted file mode 100644
index abba3e7..0000000
--- 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/reader/Offset.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.sources.v2.streaming.reader;
-
-import org.apache.spark.annotation.InterfaceStability;
-
-/**
- * An abstract representation of progress through a {@link MicroBatchReader} or
- * {@link ContinuousReader}.
- * During execution, offsets provided by the data source implementation will 
be logged and used as
- * restart checkpoints. Each source should provide an offset implementation 
which the source can use
- * to reconstruct a position in the stream up to which data has been 
seen/processed.
- *
- * Note: This class currently extends {@link 
org.apache.spark.sql.execution.streaming.Offset} to
- * maintain compatibility with DataSource V1 APIs. This extension will be 
removed once we
- * get rid of V1 completely.
- */
-@InterfaceStability.Evolving
-public abstract class Offset extends 
org.apache.spark.sql.execution.streaming.Offset {
-    /**
-     * A JSON-serialized representation of an Offset that is
-     * used for saving offsets to the offset log.
-     * Note: We assume that equivalent/equal offsets serialize to
-     * identical JSON strings.
-     *
-     * @return JSON string encoding
-     */
-    public abstract String json();
-
-    /**
-     * Equality based on JSON string representation. We leverage the
-     * JSON representation for normalization between the Offset's
-     * in deserialized and serialized representations.
-     */
-    @Override
-    public boolean equals(Object obj) {
-        if (obj instanceof org.apache.spark.sql.execution.streaming.Offset) {
-            return this.json()
-                .equals(((org.apache.spark.sql.execution.streaming.Offset) 
obj).json());
-        } else {
-            return false;
-        }
-    }
-
-    @Override
-    public int hashCode() {
-        return this.json().hashCode();
-    }
-
-    @Override
-    public String toString() {
-        return this.json();
-    }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/59e89a29/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/reader/PartitionOffset.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/reader/PartitionOffset.java
 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/reader/PartitionOffset.java
deleted file mode 100644
index 4688b85..0000000
--- 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/reader/PartitionOffset.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.sources.v2.streaming.reader;
-
-import java.io.Serializable;
-
-import org.apache.spark.annotation.InterfaceStability;
-
-/**
- * Used for per-partition offsets in continuous processing. ContinuousReader 
implementations will
- * provide a method to merge these into a global Offset.
- *
- * These offsets must be serializable.
- */
-@InterfaceStability.Evolving
-public interface PartitionOffset extends Serializable {
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/59e89a29/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/writer/StreamWriter.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/writer/StreamWriter.java
 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/writer/StreamWriter.java
deleted file mode 100644
index 915ee6c..0000000
--- 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/writer/StreamWriter.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.sources.v2.streaming.writer;
-
-import org.apache.spark.annotation.InterfaceStability;
-import org.apache.spark.sql.sources.v2.writer.DataSourceWriter;
-import org.apache.spark.sql.sources.v2.writer.DataWriter;
-import org.apache.spark.sql.sources.v2.writer.WriterCommitMessage;
-
-/**
- * A {@link DataSourceWriter} for use with structured streaming. This writer 
handles commits and
- * aborts relative to an epoch ID determined by the execution engine.
- *
- * {@link DataWriter} implementations generated by a StreamWriter may be 
reused for multiple epochs,
- * and so must reset any internal state after a successful commit.
- */
-@InterfaceStability.Evolving
-public interface StreamWriter extends DataSourceWriter {
-  /**
-   * Commits this writing job for the specified epoch with a list of commit 
messages. The commit
-   * messages are collected from successful data writers and are produced by
-   * {@link DataWriter#commit()}.
-   *
-   * If this method fails (by throwing an exception), this writing job is 
considered to have been
-   * failed, and the execution engine will attempt to call {@link 
#abort(WriterCommitMessage[])}.
-   *
-   * To support exactly-once processing, writer implementations should ensure 
that this method is
-   * idempotent. The execution engine may call commit() multiple times for the 
same epoch
-   * in some circumstances.
-   */
-  void commit(long epochId, WriterCommitMessage[] messages);
-
-  /**
-   * Aborts this writing job because some data writers are failed and keep 
failing when retry, or
-   * the Spark job fails with some unknown reasons, or {@link 
#commit(WriterCommitMessage[])} fails.
-   *
-   * If this method fails (by throwing an exception), the underlying data 
source may require manual
-   * cleanup.
-   *
-   * Unless the abort is triggered by the failure of commit, the given 
messages should have some
-   * null slots as there maybe only a few data writers that are committed 
before the abort
-   * happens, or some data writers were committed but their commit messages 
haven't reached the
-   * driver when the abort is triggered. So this is just a "best effort" for 
data sources to
-   * clean up the data left by data writers.
-   */
-  void abort(long epochId, WriterCommitMessage[] messages);
-
-  default void commit(WriterCommitMessage[] messages) {
-    throw new UnsupportedOperationException(
-        "Commit without epoch should not be called with StreamWriter");
-  }
-
-  default void abort(WriterCommitMessage[] messages) {
-    throw new UnsupportedOperationException(
-        "Abort without epoch should not be called with StreamWriter");
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/59e89a29/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataSourceWriter.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataSourceWriter.java
 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataSourceWriter.java
index d89d27d..7096aec 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataSourceWriter.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataSourceWriter.java
@@ -28,7 +28,7 @@ import org.apache.spark.sql.types.StructType;
 /**
  * A data source writer that is returned by
  * {@link WriteSupport#createWriter(String, StructType, SaveMode, 
DataSourceOptions)}/
- * {@link 
org.apache.spark.sql.sources.v2.streaming.StreamWriteSupport#createStreamWriter(
+ * {@link StreamWriteSupport#createStreamWriter(
  * String, StructType, OutputMode, DataSourceOptions)}.
  * It can mix in various writing optimization interfaces to speed up the data 
saving. The actual
  * writing logic is delegated to {@link DataWriter}.

http://git-wip-us.apache.org/repos/asf/spark/blob/59e89a29/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/StreamWriteSupport.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/StreamWriteSupport.java
 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/StreamWriteSupport.java
new file mode 100644
index 0000000..1c0e2e1
--- /dev/null
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/StreamWriteSupport.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2.writer;
+
+import org.apache.spark.annotation.InterfaceStability;
+import org.apache.spark.sql.execution.streaming.BaseStreamingSink;
+import org.apache.spark.sql.sources.v2.DataSourceOptions;
+import org.apache.spark.sql.sources.v2.DataSourceV2;
+import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter;
+import org.apache.spark.sql.streaming.OutputMode;
+import org.apache.spark.sql.types.StructType;
+
+/**
+ * A mix-in interface for {@link DataSourceV2}. Data sources can implement 
this interface to
+ * provide data writing ability for structured streaming.
+ */
+@InterfaceStability.Evolving
+public interface StreamWriteSupport extends DataSourceV2, BaseStreamingSink {
+
+    /**
+     * Creates an optional {@link StreamWriter} to save the data to this data 
source. Data
+     * sources can return None if there is no writing needed to be done.
+     *
+     * @param queryId A unique string for the writing query. It's possible 
that there are many
+     *                writing queries running at the same time, and the 
returned
+     *                {@link DataSourceWriter} can use this id to distinguish 
itself from others.
+     * @param schema the schema of the data to be written.
+     * @param mode the output mode which determines what successive epoch 
output means to this
+     *             sink, please refer to {@link OutputMode} for more details.
+     * @param options the options for the returned data source writer, which 
is an immutable
+     *                case-insensitive string-to-string map.
+     */
+    StreamWriter createStreamWriter(
+        String queryId,
+        StructType schema,
+        OutputMode mode,
+        DataSourceOptions options);
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/59e89a29/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/streaming/StreamWriter.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/streaming/StreamWriter.java
 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/streaming/StreamWriter.java
new file mode 100644
index 0000000..4913341
--- /dev/null
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/streaming/StreamWriter.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2.writer.streaming;
+
+import org.apache.spark.annotation.InterfaceStability;
+import org.apache.spark.sql.sources.v2.writer.DataSourceWriter;
+import org.apache.spark.sql.sources.v2.writer.DataWriter;
+import org.apache.spark.sql.sources.v2.writer.WriterCommitMessage;
+
+/**
+ * A {@link DataSourceWriter} for use with structured streaming. This writer 
handles commits and
+ * aborts relative to an epoch ID determined by the execution engine.
+ *
+ * {@link DataWriter} implementations generated by a StreamWriter may be 
reused for multiple epochs,
+ * and so must reset any internal state after a successful commit.
+ */
+@InterfaceStability.Evolving
+public interface StreamWriter extends DataSourceWriter {
+  /**
+   * Commits this writing job for the specified epoch with a list of commit 
messages. The commit
+   * messages are collected from successful data writers and are produced by
+   * {@link DataWriter#commit()}.
+   *
+   * If this method fails (by throwing an exception), this writing job is 
considered to have been
+   * failed, and the execution engine will attempt to call {@link 
#abort(WriterCommitMessage[])}.
+   *
+   * To support exactly-once processing, writer implementations should ensure 
that this method is
+   * idempotent. The execution engine may call commit() multiple times for the 
same epoch
+   * in some circumstances.
+   */
+  void commit(long epochId, WriterCommitMessage[] messages);
+
+  /**
+   * Aborts this writing job because some data writers are failed and keep 
failing when retry, or
+   * the Spark job fails with some unknown reasons, or {@link 
#commit(WriterCommitMessage[])} fails.
+   *
+   * If this method fails (by throwing an exception), the underlying data 
source may require manual
+   * cleanup.
+   *
+   * Unless the abort is triggered by the failure of commit, the given 
messages should have some
+   * null slots as there maybe only a few data writers that are committed 
before the abort
+   * happens, or some data writers were committed but their commit messages 
haven't reached the
+   * driver when the abort is triggered. So this is just a "best effort" for 
data sources to
+   * clean up the data left by data writers.
+   */
+  void abort(long epochId, WriterCommitMessage[] messages);
+
+  default void commit(WriterCommitMessage[] messages) {
+    throw new UnsupportedOperationException(
+        "Commit without epoch should not be called with StreamWriter");
+  }
+
+  default void abort(WriterCommitMessage[] messages) {
+    throw new UnsupportedOperationException(
+        "Abort without epoch should not be called with StreamWriter");
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/59e89a29/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourcePartitioning.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourcePartitioning.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourcePartitioning.scala
index 943d010..017a673 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourcePartitioning.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourcePartitioning.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.datasources.v2
 
 import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, 
Expression}
 import org.apache.spark.sql.catalyst.plans.physical
-import org.apache.spark.sql.sources.v2.reader.{ClusteredDistribution, 
Partitioning}
+import 
org.apache.spark.sql.sources.v2.reader.partitioning.{ClusteredDistribution, 
Partitioning}
 
 /**
  * An adapter from public data source partitioning to catalyst internal 
`Partitioning`.

http://git-wip-us.apache.org/repos/asf/spark/blob/59e89a29/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala
index ee08582..df469af 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala
@@ -28,7 +28,7 @@ import org.apache.spark.sql.catalyst.plans.physical
 import org.apache.spark.sql.execution.{ColumnarBatchScan, LeafExecNode, 
WholeStageCodegenExec}
 import org.apache.spark.sql.execution.streaming.continuous._
 import org.apache.spark.sql.sources.v2.reader._
-import org.apache.spark.sql.sources.v2.streaming.reader.ContinuousReader
+import org.apache.spark.sql.sources.v2.reader.streaming.ContinuousReader
 import org.apache.spark.sql.types.StructType
 
 /**

http://git-wip-us.apache.org/repos/asf/spark/blob/59e89a29/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala
index c544adb..6592bd7 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala
@@ -27,8 +27,8 @@ import org.apache.spark.sql.catalyst.expressions.Attribute
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution.SparkPlan
 import 
org.apache.spark.sql.execution.streaming.continuous.{CommitPartitionEpoch, 
ContinuousExecution, EpochCoordinatorRef, SetWriterPartitions}
-import org.apache.spark.sql.sources.v2.streaming.writer.StreamWriter
 import org.apache.spark.sql.sources.v2.writer._
+import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.util.Utils
 

http://git-wip-us.apache.org/repos/asf/spark/blob/59e89a29/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
index 93572f7..d9aa857 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
@@ -30,9 +30,9 @@ import org.apache.spark.sql.execution.SQLExecution
 import 
org.apache.spark.sql.execution.datasources.v2.{StreamingDataSourceV2Relation, 
WriteToDataSourceV2}
 import 
org.apache.spark.sql.execution.streaming.sources.{InternalRowMicroBatchWriter, 
MicroBatchWriter}
 import org.apache.spark.sql.sources.v2.DataSourceOptions
-import org.apache.spark.sql.sources.v2.streaming.{MicroBatchReadSupport, 
StreamWriteSupport}
-import org.apache.spark.sql.sources.v2.streaming.reader.{MicroBatchReader, 
Offset => OffsetV2}
-import org.apache.spark.sql.sources.v2.writer.SupportsWriteInternalRow
+import org.apache.spark.sql.sources.v2.reader.MicroBatchReadSupport
+import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchReader, 
Offset => OffsetV2}
+import org.apache.spark.sql.sources.v2.writer.{StreamWriteSupport, 
SupportsWriteInternalRow}
 import org.apache.spark.sql.streaming.{OutputMode, ProcessingTime, Trigger}
 import org.apache.spark.util.{Clock, Utils}
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to