http://git-wip-us.apache.org/repos/asf/spark/blob/15d2e9d7/sql/core/src/main/java/org/apache/spark/sql/sources/v2/BatchReadSupportProvider.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/BatchReadSupportProvider.java
 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/BatchReadSupportProvider.java
deleted file mode 100644
index f403dc6..0000000
--- 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/BatchReadSupportProvider.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.sources.v2;
-
-import org.apache.spark.annotation.InterfaceStability;
-import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Utils;
-import org.apache.spark.sql.sources.v2.reader.BatchReadSupport;
-import org.apache.spark.sql.types.StructType;
-
-/**
- * A mix-in interface for {@link DataSourceV2}. Data sources can implement 
this interface to
- * provide data reading ability for batch processing.
- *
- * This interface is used to create {@link BatchReadSupport} instances when 
end users run
- * {@code SparkSession.read.format(...).option(...).load()}.
- */
-@InterfaceStability.Evolving
-public interface BatchReadSupportProvider extends DataSourceV2 {
-
-  /**
-   * Creates a {@link BatchReadSupport} instance to load the data from this 
data source with a user
-   * specified schema, which is called by Spark at the beginning of each batch 
query.
-   *
-   * Spark will call this method at the beginning of each batch query to 
create a
-   * {@link BatchReadSupport} instance.
-   *
-   * By default this method throws {@link UnsupportedOperationException}, 
implementations should
-   * override this method to handle user specified schema.
-   *
-   * @param schema the user specified schema.
-   * @param options the options for the returned data source reader, which is 
an immutable
-   *                case-insensitive string-to-string map.
-   */
-  default BatchReadSupport createBatchReadSupport(StructType schema, 
DataSourceOptions options) {
-    return DataSourceV2Utils.failForUserSpecifiedSchema(this);
-  }
-
-  /**
-   * Creates a {@link BatchReadSupport} instance to scan the data from this 
data source, which is
-   * called by Spark at the beginning of each batch query.
-   *
-   * @param options the options for the returned data source reader, which is 
an immutable
-   *                case-insensitive string-to-string map.
-   */
-  BatchReadSupport createBatchReadSupport(DataSourceOptions options);
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/15d2e9d7/sql/core/src/main/java/org/apache/spark/sql/sources/v2/BatchWriteSupportProvider.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/BatchWriteSupportProvider.java
 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/BatchWriteSupportProvider.java
deleted file mode 100644
index bd10c33..0000000
--- 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/BatchWriteSupportProvider.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.sources.v2;
-
-import java.util.Optional;
-
-import org.apache.spark.annotation.InterfaceStability;
-import org.apache.spark.sql.SaveMode;
-import org.apache.spark.sql.sources.v2.writer.BatchWriteSupport;
-import org.apache.spark.sql.types.StructType;
-
-/**
- * A mix-in interface for {@link DataSourceV2}. Data sources can implement 
this interface to
- * provide data writing ability for batch processing.
- *
- * This interface is used to create {@link BatchWriteSupport} instances when 
end users run
- * {@code Dataset.write.format(...).option(...).save()}.
- */
-@InterfaceStability.Evolving
-public interface BatchWriteSupportProvider extends DataSourceV2 {
-
-  /**
-   * Creates an optional {@link BatchWriteSupport} instance to save the data 
to this data source,
-   * which is called by Spark at the beginning of each batch query.
-   *
-   * Data sources can return None if there is no writing needed to be done 
according to the save
-   * mode.
-   *
-   * @param queryId A unique string for the writing query. It's possible that 
there are many
-   *                writing queries running at the same time, and the returned
-   *                {@link BatchWriteSupport} can use this id to distinguish 
itself from others.
-   * @param schema the schema of the data to be written.
-   * @param mode the save mode which determines what to do when the data are 
already in this data
-   *             source, please refer to {@link SaveMode} for more details.
-   * @param options the options for the returned data source writer, which is 
an immutable
-   *                case-insensitive string-to-string map.
-   * @return a write support to write data to this data source.
-   */
-  Optional<BatchWriteSupport> createBatchWriteSupport(
-      String queryId,
-      StructType schema,
-      SaveMode mode,
-      DataSourceOptions options);
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/15d2e9d7/sql/core/src/main/java/org/apache/spark/sql/sources/v2/ContinuousReadSupport.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/ContinuousReadSupport.java
 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/ContinuousReadSupport.java
new file mode 100644
index 0000000..7df5a45
--- /dev/null
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/ContinuousReadSupport.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2;
+
+import java.util.Optional;
+
+import org.apache.spark.annotation.InterfaceStability;
+import org.apache.spark.sql.sources.v2.reader.streaming.ContinuousReader;
+import org.apache.spark.sql.types.StructType;
+
+/**
+ * A mix-in interface for {@link DataSourceV2}. Data sources can implement 
this interface to
+ * provide data reading ability for continuous stream processing.
+ */
+@InterfaceStability.Evolving
+public interface ContinuousReadSupport extends DataSourceV2 {
+  /**
+   * Creates a {@link ContinuousReader} to scan the data from this data source.
+   *
+   * @param schema the user provided schema, or empty() if none was provided
+   * @param checkpointLocation a path to Hadoop FS scratch space that can be 
used for failure
+   *                           recovery. Readers for the same logical source 
in the same query
+   *                           will be given the same checkpointLocation.
+   * @param options the options for the returned data source reader, which is 
an immutable
+   *                case-insensitive string-to-string map.
+   */
+  ContinuousReader createContinuousReader(
+    Optional<StructType> schema,
+    String checkpointLocation,
+    DataSourceOptions options);
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/15d2e9d7/sql/core/src/main/java/org/apache/spark/sql/sources/v2/ContinuousReadSupportProvider.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/ContinuousReadSupportProvider.java
 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/ContinuousReadSupportProvider.java
deleted file mode 100644
index 824c290..0000000
--- 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/ContinuousReadSupportProvider.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.sources.v2;
-
-import org.apache.spark.annotation.InterfaceStability;
-import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Utils;
-import org.apache.spark.sql.sources.v2.reader.streaming.ContinuousReadSupport;
-import org.apache.spark.sql.types.StructType;
-
-/**
- * A mix-in interface for {@link DataSourceV2}. Data sources can implement 
this interface to
- * provide data reading ability for continuous stream processing.
- *
- * This interface is used to create {@link ContinuousReadSupport} instances 
when end users run
- * {@code SparkSession.readStream.format(...).option(...).load()} with a 
continuous trigger.
- */
-@InterfaceStability.Evolving
-public interface ContinuousReadSupportProvider extends DataSourceV2 {
-
-  /**
-   * Creates a {@link ContinuousReadSupport} instance to scan the data from 
this streaming data
-   * source with a user specified schema, which is called by Spark at the 
beginning of each
-   * continuous streaming query.
-   *
-   * By default this method throws {@link UnsupportedOperationException}, 
implementations should
-   * override this method to handle user specified schema.
-   *
-   * @param schema the user provided schema.
-   * @param checkpointLocation a path to Hadoop FS scratch space that can be 
used for failure
-   *                           recovery. Readers for the same logical source 
in the same query
-   *                           will be given the same checkpointLocation.
-   * @param options the options for the returned data source reader, which is 
an immutable
-   *                case-insensitive string-to-string map.
-   */
-  default ContinuousReadSupport createContinuousReadSupport(
-      StructType schema,
-      String checkpointLocation,
-      DataSourceOptions options) {
-    return DataSourceV2Utils.failForUserSpecifiedSchema(this);
-  }
-
-  /**
-   * Creates a {@link ContinuousReadSupport} instance to scan the data from 
this streaming data
-   * source, which is called by Spark at the beginning of each continuous 
streaming query.
-   *
-   * @param checkpointLocation a path to Hadoop FS scratch space that can be 
used for failure
-   *                           recovery. Readers for the same logical source 
in the same query
-   *                           will be given the same checkpointLocation.
-   * @param options the options for the returned data source reader, which is 
an immutable
-   *                case-insensitive string-to-string map.
-   */
-  ContinuousReadSupport createContinuousReadSupport(
-      String checkpointLocation,
-      DataSourceOptions options);
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/15d2e9d7/sql/core/src/main/java/org/apache/spark/sql/sources/v2/DataSourceV2.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/DataSourceV2.java 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/DataSourceV2.java
index 6e31e84..6234071 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/DataSourceV2.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/DataSourceV2.java
@@ -22,13 +22,9 @@ import org.apache.spark.annotation.InterfaceStability;
 /**
  * The base interface for data source v2. Implementations must have a public, 
0-arg constructor.
  *
- * Note that this is an empty interface. Data source implementations must mix 
in interfaces such as
- * {@link BatchReadSupportProvider} or {@link BatchWriteSupportProvider}, 
which can provide
- * batch or streaming read/write support instances. Otherwise it's just a 
dummy data source which
- * is un-readable/writable.
- *
- * If Spark fails to execute any methods in the implementations of this 
interface (by throwing an
- * exception), the read action will fail and no Spark job will be submitted.
+ * Note that this is an empty interface. Data source implementations should 
mix-in at least one of
+ * the plug-in interfaces like {@link ReadSupport} and {@link WriteSupport}. 
Otherwise it's just
+ * a dummy data source which is un-readable/writable.
  */
 @InterfaceStability.Evolving
 public interface DataSourceV2 {}

http://git-wip-us.apache.org/repos/asf/spark/blob/15d2e9d7/sql/core/src/main/java/org/apache/spark/sql/sources/v2/MicroBatchReadSupport.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/MicroBatchReadSupport.java
 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/MicroBatchReadSupport.java
new file mode 100644
index 0000000..7f4a2c9
--- /dev/null
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/MicroBatchReadSupport.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2;
+
+import java.util.Optional;
+
+import org.apache.spark.annotation.InterfaceStability;
+import org.apache.spark.sql.sources.v2.reader.streaming.MicroBatchReader;
+import org.apache.spark.sql.types.StructType;
+
+/**
+ * A mix-in interface for {@link DataSourceV2}. Data sources can implement 
this interface to
+ * provide streaming micro-batch data reading ability.
+ */
+@InterfaceStability.Evolving
+public interface MicroBatchReadSupport extends DataSourceV2 {
+  /**
+   * Creates a {@link MicroBatchReader} to read batches of data from this data 
source in a
+   * streaming query.
+   *
+   * The execution engine will create a micro-batch reader at the start of a 
streaming query,
+   * alternate calls to setOffsetRange and planInputPartitions for each batch 
to process, and
+   * then call stop() when the execution is complete. Note that a single query 
may have multiple
+   * executions due to restart or failure recovery.
+   *
+   * @param schema the user provided schema, or empty() if none was provided
+   * @param checkpointLocation a path to Hadoop FS scratch space that can be 
used for failure
+   *                           recovery. Readers for the same logical source 
in the same query
+   *                           will be given the same checkpointLocation.
+   * @param options the options for the returned data source reader, which is 
an immutable
+   *                case-insensitive string-to-string map.
+   */
+  MicroBatchReader createMicroBatchReader(
+      Optional<StructType> schema,
+      String checkpointLocation,
+      DataSourceOptions options);
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/15d2e9d7/sql/core/src/main/java/org/apache/spark/sql/sources/v2/MicroBatchReadSupportProvider.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/MicroBatchReadSupportProvider.java
 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/MicroBatchReadSupportProvider.java
deleted file mode 100644
index 61c08e7..0000000
--- 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/MicroBatchReadSupportProvider.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.sources.v2;
-
-import org.apache.spark.annotation.InterfaceStability;
-import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Utils;
-import org.apache.spark.sql.sources.v2.reader.streaming.MicroBatchReadSupport;
-import org.apache.spark.sql.types.StructType;
-
-/**
- * A mix-in interface for {@link DataSourceV2}. Data sources can implement 
this interface to
- * provide data reading ability for micro-batch stream processing.
- *
- * This interface is used to create {@link MicroBatchReadSupport} instances 
when end users run
- * {@code SparkSession.readStream.format(...).option(...).load()} with a 
micro-batch trigger.
- */
-@InterfaceStability.Evolving
-public interface MicroBatchReadSupportProvider extends DataSourceV2 {
-
-  /**
-   * Creates a {@link MicroBatchReadSupport} instance to scan the data from 
this streaming data
-   * source with a user specified schema, which is called by Spark at the 
beginning of each
-   * micro-batch streaming query.
-   *
-   * By default this method throws {@link UnsupportedOperationException}, 
implementations should
-   * override this method to handle user specified schema.
-   *
-   * @param schema the user provided schema.
-   * @param checkpointLocation a path to Hadoop FS scratch space that can be 
used for failure
-   *                           recovery. Readers for the same logical source 
in the same query
-   *                           will be given the same checkpointLocation.
-   * @param options the options for the returned data source reader, which is 
an immutable
-   *                case-insensitive string-to-string map.
-   */
-  default MicroBatchReadSupport createMicroBatchReadSupport(
-      StructType schema,
-      String checkpointLocation,
-      DataSourceOptions options) {
-    return DataSourceV2Utils.failForUserSpecifiedSchema(this);
-  }
-
-  /**
-   * Creates a {@link MicroBatchReadSupport} instance to scan the data from 
this streaming data
-   * source, which is called by Spark at the beginning of each micro-batch 
streaming query.
-   *
-   * @param checkpointLocation a path to Hadoop FS scratch space that can be 
used for failure
-   *                           recovery. Readers for the same logical source 
in the same query
-   *                           will be given the same checkpointLocation.
-   * @param options the options for the returned data source reader, which is 
an immutable
-   *                case-insensitive string-to-string map.
-   */
-  MicroBatchReadSupport createMicroBatchReadSupport(
-      String checkpointLocation,
-      DataSourceOptions options);
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/15d2e9d7/sql/core/src/main/java/org/apache/spark/sql/sources/v2/ReadSupport.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/ReadSupport.java 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/ReadSupport.java
new file mode 100644
index 0000000..80ac08e
--- /dev/null
+++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/ReadSupport.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2;
+
+import org.apache.spark.annotation.InterfaceStability;
+import org.apache.spark.sql.sources.DataSourceRegister;
+import org.apache.spark.sql.sources.v2.reader.DataSourceReader;
+import org.apache.spark.sql.types.StructType;
+
+/**
+ * A mix-in interface for {@link DataSourceV2}. Data sources can implement 
this interface to
+ * provide data reading ability and scan the data from the data source.
+ */
+@InterfaceStability.Evolving
+public interface ReadSupport extends DataSourceV2 {
+
+  /**
+   * Creates a {@link DataSourceReader} to scan the data from this data source.
+   *
+   * If this method fails (by throwing an exception), the action will fail and 
no Spark job will be
+   * submitted.
+   *
+   * @param schema the user specified schema.
+   * @param options the options for the returned data source reader, which is 
an immutable
+   *                case-insensitive string-to-string map.
+   *
+   * By default this method throws {@link UnsupportedOperationException}, 
implementations should
+   * override this method to handle user specified schema.
+   */
+  default DataSourceReader createReader(StructType schema, DataSourceOptions 
options) {
+    String name;
+    if (this instanceof DataSourceRegister) {
+      name = ((DataSourceRegister) this).shortName();
+    } else {
+      name = this.getClass().getName();
+    }
+    throw new UnsupportedOperationException(name + " does not support user 
specified schema");
+  }
+
+  /**
+   * Creates a {@link DataSourceReader} to scan the data from this data source.
+   *
+   * If this method fails (by throwing an exception), the action will fail and 
no Spark job will be
+   * submitted.
+   *
+   * @param options the options for the returned data source reader, which is 
an immutable
+   *                case-insensitive string-to-string map.
+   */
+  DataSourceReader createReader(DataSourceOptions options);
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/15d2e9d7/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SessionConfigSupport.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SessionConfigSupport.java
 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SessionConfigSupport.java
index bbe430e..926c6fd 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SessionConfigSupport.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SessionConfigSupport.java
@@ -28,10 +28,9 @@ import org.apache.spark.annotation.InterfaceStability;
 public interface SessionConfigSupport extends DataSourceV2 {
 
   /**
-   * Key prefix of the session configs to propagate, which is usually the data 
source name. Spark
-   * will extract all session configs that starts with 
`spark.datasource.$keyPrefix`, turn
-   * `spark.datasource.$keyPrefix.xxx -&gt; yyy` into `xxx -&gt; yyy`, and 
propagate them to all
-   * data source operations in this session.
+   * Key prefix of the session configs to propagate. Spark will extract all 
session configs that
+   * starts with `spark.datasource.$keyPrefix`, turn 
`spark.datasource.$keyPrefix.xxx -&gt; yyy`
+   * into `xxx -&gt; yyy`, and propagate them to all data source operations in 
this session.
    */
   String keyPrefix();
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/15d2e9d7/sql/core/src/main/java/org/apache/spark/sql/sources/v2/StreamWriteSupport.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/StreamWriteSupport.java
 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/StreamWriteSupport.java
new file mode 100644
index 0000000..a77b014
--- /dev/null
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/StreamWriteSupport.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2;
+
+import org.apache.spark.annotation.InterfaceStability;
+import org.apache.spark.sql.execution.streaming.BaseStreamingSink;
+import org.apache.spark.sql.sources.v2.writer.DataSourceWriter;
+import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter;
+import org.apache.spark.sql.streaming.OutputMode;
+import org.apache.spark.sql.types.StructType;
+
+/**
+ * A mix-in interface for {@link DataSourceV2}. Data sources can implement 
this interface to
+ * provide data writing ability for structured streaming.
+ */
+@InterfaceStability.Evolving
+public interface StreamWriteSupport extends DataSourceV2, BaseStreamingSink {
+
+    /**
+     * Creates an optional {@link StreamWriter} to save the data to this data 
source. Data
+     * sources can return None if there is no writing needed to be done.
+     *
+     * @param queryId A unique string for the writing query. It's possible 
that there are many
+     *                writing queries running at the same time, and the 
returned
+     *                {@link DataSourceWriter} can use this id to distinguish 
itself from others.
+     * @param schema the schema of the data to be written.
+     * @param mode the output mode which determines what successive epoch 
output means to this
+     *             sink, please refer to {@link OutputMode} for more details.
+     * @param options the options for the returned data source writer, which 
is an immutable
+     *                case-insensitive string-to-string map.
+     */
+    StreamWriter createStreamWriter(
+        String queryId,
+        StructType schema,
+        OutputMode mode,
+        DataSourceOptions options);
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/15d2e9d7/sql/core/src/main/java/org/apache/spark/sql/sources/v2/StreamingWriteSupportProvider.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/StreamingWriteSupportProvider.java
 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/StreamingWriteSupportProvider.java
deleted file mode 100644
index f9ca85d..0000000
--- 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/StreamingWriteSupportProvider.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.sources.v2;
-
-import org.apache.spark.annotation.InterfaceStability;
-import org.apache.spark.sql.execution.streaming.BaseStreamingSink;
-import org.apache.spark.sql.sources.v2.writer.streaming.StreamingWriteSupport;
-import org.apache.spark.sql.streaming.OutputMode;
-import org.apache.spark.sql.types.StructType;
-
-/**
- * A mix-in interface for {@link DataSourceV2}. Data sources can implement 
this interface to
- * provide data writing ability for structured streaming.
- *
- * This interface is used to create {@link StreamingWriteSupport} instances 
when end users run
- * {@code Dataset.writeStream.format(...).option(...).start()}.
- */
-@InterfaceStability.Evolving
-public interface StreamingWriteSupportProvider extends DataSourceV2, 
BaseStreamingSink {
-
-  /**
-   * Creates a {@link StreamingWriteSupport} instance to save the data to this 
data source, which is
-   * called by Spark at the beginning of each streaming query.
-   *
-   * @param queryId A unique string for the writing query. It's possible that 
there are many
-   *                writing queries running at the same time, and the returned
-   *                {@link StreamingWriteSupport} can use this id to 
distinguish itself from others.
-   * @param schema the schema of the data to be written.
-   * @param mode the output mode which determines what successive epoch output 
means to this
-   *             sink, please refer to {@link OutputMode} for more details.
-   * @param options the options for the returned data source writer, which is 
an immutable
-   *                case-insensitive string-to-string map.
-   */
-  StreamingWriteSupport createStreamingWriteSupport(
-      String queryId,
-      StructType schema,
-      OutputMode mode,
-      DataSourceOptions options);
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/15d2e9d7/sql/core/src/main/java/org/apache/spark/sql/sources/v2/WriteSupport.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/WriteSupport.java 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/WriteSupport.java
new file mode 100644
index 0000000..048787a
--- /dev/null
+++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/WriteSupport.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2;
+
+import java.util.Optional;
+
+import org.apache.spark.annotation.InterfaceStability;
+import org.apache.spark.sql.SaveMode;
+import org.apache.spark.sql.sources.v2.writer.DataSourceWriter;
+import org.apache.spark.sql.types.StructType;
+
+/**
+ * A mix-in interface for {@link DataSourceV2}. Data sources can implement 
this interface to
+ * provide data writing ability and save the data to the data source.
+ */
+@InterfaceStability.Evolving
+public interface WriteSupport extends DataSourceV2 {
+
+  /**
+   * Creates an optional {@link DataSourceWriter} to save the data to this 
data source. Data
+   * sources can return None if there is no writing needed to be done 
according to the save mode.
+   *
+   * If this method fails (by throwing an exception), the action will fail and 
no Spark job will be
+   * submitted.
+   *
+   * @param writeUUID A unique string for the writing job. It's possible that 
there are many writing
+   *                  jobs running at the same time, and the returned {@link 
DataSourceWriter} can
+   *                  use this job id to distinguish itself from other jobs.
+   * @param schema the schema of the data to be written.
+   * @param mode the save mode which determines what to do when the data are 
already in this data
+   *             source, please refer to {@link SaveMode} for more details.
+   * @param options the options for the returned data source writer, which is 
an immutable
+   *                case-insensitive string-to-string map.
+   * @return a writer to append data to this data source
+   */
+  Optional<DataSourceWriter> createWriter(
+      String writeUUID, StructType schema, SaveMode mode, DataSourceOptions 
options);
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/15d2e9d7/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/BatchReadSupport.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/BatchReadSupport.java
 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/BatchReadSupport.java
deleted file mode 100644
index 452ee86..0000000
--- 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/BatchReadSupport.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.sources.v2.reader;
-
-import org.apache.spark.annotation.InterfaceStability;
-
-/**
- * An interface that defines how to load the data from data source for batch 
processing.
- *
- * The execution engine will get an instance of this interface from a data 
source provider
- * (e.g. {@link org.apache.spark.sql.sources.v2.BatchReadSupportProvider}) at 
the start of a batch
- * query, then call {@link #newScanConfigBuilder()} and create an instance of 
{@link ScanConfig}.
- * The {@link ScanConfigBuilder} can apply operator pushdown and keep the 
pushdown result in
- * {@link ScanConfig}. The {@link ScanConfig} will be used to create input 
partitions and reader
- * factory to scan data from the data source with a Spark job.
- */
-@InterfaceStability.Evolving
-public interface BatchReadSupport extends ReadSupport {
-
-  /**
-   * Returns a builder of {@link ScanConfig}. Spark will call this method and 
create a
-   * {@link ScanConfig} for each data scanning job.
-   *
-   * The builder can take some query specific information to do operators 
pushdown, and keep these
-   * information in the created {@link ScanConfig}.
-   *
-   * This is the first step of the data scan. All other methods in {@link 
BatchReadSupport} needs
-   * to take {@link ScanConfig} as an input.
-   */
-  ScanConfigBuilder newScanConfigBuilder();
-
-  /**
-   * Returns a factory, which produces one {@link PartitionReader} for one 
{@link InputPartition}.
-   */
-  PartitionReaderFactory createReaderFactory(ScanConfig config);
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/15d2e9d7/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ContinuousInputPartition.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ContinuousInputPartition.java
 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ContinuousInputPartition.java
new file mode 100644
index 0000000..dcb8771
--- /dev/null
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ContinuousInputPartition.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2.reader;
+
+import org.apache.spark.annotation.InterfaceStability;
+import org.apache.spark.sql.sources.v2.reader.streaming.PartitionOffset;
+
+/**
+ * A mix-in interface for {@link InputPartition}. Continuous input partitions 
can
+ * implement this interface to provide creating {@link InputPartitionReader} 
with particular offset.
+ */
+@InterfaceStability.Evolving
+public interface ContinuousInputPartition<T> extends InputPartition<T> {
+  /**
+   * Create an input partition reader with particular offset as its 
startOffset.
+   *
+   * @param offset offset want to set as the input partition reader's 
startOffset.
+   */
+  InputPartitionReader<T> createContinuousReader(PartitionOffset offset);
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/15d2e9d7/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/DataSourceReader.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/DataSourceReader.java
 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/DataSourceReader.java
new file mode 100644
index 0000000..da98fab
--- /dev/null
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/DataSourceReader.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2.reader;
+
+import java.util.List;
+
+import org.apache.spark.annotation.InterfaceStability;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.sources.v2.DataSourceOptions;
+import org.apache.spark.sql.sources.v2.ReadSupport;
+import org.apache.spark.sql.types.StructType;
+
+/**
+ * A data source reader that is returned by
+ * {@link ReadSupport#createReader(DataSourceOptions)} or
+ * {@link ReadSupport#createReader(StructType, DataSourceOptions)}.
+ * It can mix in various query optimization interfaces to speed up the data 
scan. The actual scan
+ * logic is delegated to {@link InputPartition}s, which are returned by
+ * {@link #planInputPartitions()}.
+ *
+ * There are mainly 3 kinds of query optimizations:
+ *   1. Operators push-down. E.g., filter push-down, required columns 
push-down(aka column
+ *      pruning), etc. Names of these interfaces start with `SupportsPushDown`.
+ *   2. Information Reporting. E.g., statistics reporting, ordering reporting, 
etc.
+ *      Names of these interfaces start with `SupportsReporting`.
+ *   3. Columnar scan if implements {@link SupportsScanColumnarBatch}.
+ *
+ * If an exception was throw when applying any of these query optimizations, 
the action will fail
+ * and no Spark job will be submitted.
+ *
+ * Spark first applies all operator push-down optimizations that this data 
source supports. Then
+ * Spark collects information this data source reported for further 
optimizations. Finally Spark
+ * issues the scan request and does the actual data reading.
+ */
+@InterfaceStability.Evolving
+public interface DataSourceReader {
+
+  /**
+   * Returns the actual schema of this data source reader, which may be 
different from the physical
+   * schema of the underlying storage, as column pruning or other 
optimizations may happen.
+   *
+   * If this method fails (by throwing an exception), the action will fail and 
no Spark job will be
+   * submitted.
+   */
+  StructType readSchema();
+
+  /**
+   * Returns a list of {@link InputPartition}s. Each {@link InputPartition} is 
responsible for
+   * creating a data reader to output data of one RDD partition. The number of 
input partitions
+   * returned here is the same as the number of RDD partitions this scan 
outputs.
+   *
+   * Note that, this may not be a full scan if the data source reader mixes in 
other optimization
+   * interfaces like column pruning, filter push-down, etc. These 
optimizations are applied before
+   * Spark issues the scan request.
+   *
+   * If this method fails (by throwing an exception), the action will fail and 
no Spark job will be
+   * submitted.
+   */
+  List<InputPartition<InternalRow>> planInputPartitions();
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/15d2e9d7/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/InputPartition.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/InputPartition.java
 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/InputPartition.java
index 95c30de..f2038d0 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/InputPartition.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/InputPartition.java
@@ -22,18 +22,18 @@ import java.io.Serializable;
 import org.apache.spark.annotation.InterfaceStability;
 
 /**
- * A serializable representation of an input partition returned by
- * {@link ReadSupport#planInputPartitions(ScanConfig)}.
+ * An input partition returned by {@link 
DataSourceReader#planInputPartitions()} and is
+ * responsible for creating the actual data reader of one RDD partition.
+ * The relationship between {@link InputPartition} and {@link 
InputPartitionReader}
+ * is similar to the relationship between {@link Iterable} and {@link 
java.util.Iterator}.
  *
- * Note that {@link InputPartition} will be serialized and sent to executors, 
then
- * {@link PartitionReader} will be created by
- * {@link PartitionReaderFactory#createReader(InputPartition)} or
- * {@link PartitionReaderFactory#createColumnarReader(InputPartition)} on 
executors to do
- * the actual reading. So {@link InputPartition} must be serializable while 
{@link PartitionReader}
- * doesn't need to be.
+ * Note that {@link InputPartition}s will be serialized and sent to executors, 
then
+ * {@link InputPartitionReader}s will be created on executors to do the actual 
reading. So
+ * {@link InputPartition} must be serializable while {@link 
InputPartitionReader} doesn't need to
+ * be.
  */
 @InterfaceStability.Evolving
-public interface InputPartition extends Serializable {
+public interface InputPartition<T> extends Serializable {
 
   /**
    * The preferred locations where the input partition reader returned by this 
partition can run
@@ -51,4 +51,12 @@ public interface InputPartition extends Serializable {
   default String[] preferredLocations() {
     return new String[0];
   }
+
+  /**
+   * Returns an input partition reader to do the actual reading work.
+   *
+   * If this method fails (by throwing an exception), the corresponding Spark 
task would fail and
+   * get retried until hitting the maximum retry times.
+   */
+  InputPartitionReader<T> createPartitionReader();
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/15d2e9d7/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/InputPartitionReader.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/InputPartitionReader.java
 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/InputPartitionReader.java
new file mode 100644
index 0000000..f3ff7f5
--- /dev/null
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/InputPartitionReader.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2.reader;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+import org.apache.spark.annotation.InterfaceStability;
+
+/**
+ * An input partition reader returned by {@link 
InputPartition#createPartitionReader()} and is
+ * responsible for outputting data for a RDD partition.
+ *
+ * Note that, Currently the type `T` can only be {@link 
org.apache.spark.sql.catalyst.InternalRow}
+ * for normal data source readers, {@link 
org.apache.spark.sql.vectorized.ColumnarBatch} for data
+ * source readers that mix in {@link SupportsScanColumnarBatch}.
+ */
+@InterfaceStability.Evolving
+public interface InputPartitionReader<T> extends Closeable {
+
+  /**
+   * Proceed to next record, returns false if there is no more records.
+   *
+   * If this method fails (by throwing an exception), the corresponding Spark 
task would fail and
+   * get retried until hitting the maximum retry times.
+   *
+   * @throws IOException if failure happens during disk/network IO like 
reading files.
+   */
+  boolean next() throws IOException;
+
+  /**
+   * Return the current record. This method should return same value until 
`next` is called.
+   *
+   * If this method fails (by throwing an exception), the corresponding Spark 
task would fail and
+   * get retried until hitting the maximum retry times.
+   */
+  T get();
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/15d2e9d7/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/PartitionReader.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/PartitionReader.java
 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/PartitionReader.java
deleted file mode 100644
index 04ff8d0..0000000
--- 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/PartitionReader.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.sources.v2.reader;
-
-import java.io.Closeable;
-import java.io.IOException;
-
-import org.apache.spark.annotation.InterfaceStability;
-
-/**
- * A partition reader returned by {@link 
PartitionReaderFactory#createReader(InputPartition)} or
- * {@link PartitionReaderFactory#createColumnarReader(InputPartition)}. It's 
responsible for
- * outputting data for a RDD partition.
- *
- * Note that, Currently the type `T` can only be {@link 
org.apache.spark.sql.catalyst.InternalRow}
- * for normal data sources, or {@link 
org.apache.spark.sql.vectorized.ColumnarBatch} for columnar
- * data sources(whose {@link 
PartitionReaderFactory#supportColumnarReads(InputPartition)}
- * returns true).
- */
-@InterfaceStability.Evolving
-public interface PartitionReader<T> extends Closeable {
-
-  /**
-   * Proceed to next record, returns false if there is no more records.
-   *
-   * @throws IOException if failure happens during disk/network IO like 
reading files.
-   */
-  boolean next() throws IOException;
-
-  /**
-   * Return the current record. This method should return same value until 
`next` is called.
-   */
-  T get();
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/15d2e9d7/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/PartitionReaderFactory.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/PartitionReaderFactory.java
 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/PartitionReaderFactory.java
deleted file mode 100644
index f35de93..0000000
--- 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/PartitionReaderFactory.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.sources.v2.reader;
-
-import java.io.Serializable;
-
-import org.apache.spark.annotation.InterfaceStability;
-import org.apache.spark.sql.catalyst.InternalRow;
-import org.apache.spark.sql.vectorized.ColumnarBatch;
-
-/**
- * A factory used to create {@link PartitionReader} instances.
- *
- * If Spark fails to execute any methods in the implementations of this 
interface or in the returned
- * {@link PartitionReader} (by throwing an exception), corresponding Spark 
task would fail and
- * get retried until hitting the maximum retry times.
- */
-@InterfaceStability.Evolving
-public interface PartitionReaderFactory extends Serializable {
-
-  /**
-   * Returns a row-based partition reader to read data from the given {@link 
InputPartition}.
-   *
-   * Implementations probably need to cast the input partition to the concrete
-   * {@link InputPartition} class defined for the data source.
-   */
-  PartitionReader<InternalRow> createReader(InputPartition partition);
-
-  /**
-   * Returns a columnar partition reader to read data from the given {@link 
InputPartition}.
-   *
-   * Implementations probably need to cast the input partition to the concrete
-   * {@link InputPartition} class defined for the data source.
-   */
-  default PartitionReader<ColumnarBatch> createColumnarReader(InputPartition 
partition) {
-    throw new UnsupportedOperationException("Cannot create columnar reader.");
-  }
-
-  /**
-   * Returns true if the given {@link InputPartition} should be read by Spark 
in a columnar way.
-   * This means, implementations must also implement {@link 
#createColumnarReader(InputPartition)}
-   * for the input partitions that this method returns true.
-   *
-   * As of Spark 2.4, Spark can only read all input partition in a columnar 
way, or none of them.
-   * Data source can't mix columnar and row-based partitions. This may be 
relaxed in future
-   * versions.
-   */
-  default boolean supportColumnarReads(InputPartition partition) {
-    return false;
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/15d2e9d7/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ReadSupport.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ReadSupport.java
 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ReadSupport.java
deleted file mode 100644
index a58ddb2..0000000
--- 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ReadSupport.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.sources.v2.reader;
-
-import org.apache.spark.annotation.InterfaceStability;
-import org.apache.spark.sql.types.StructType;
-
-/**
- * The base interface for all the batch and streaming read supports. Data 
sources should implement
- * concrete read support interfaces like {@link BatchReadSupport}.
- *
- * If Spark fails to execute any methods in the implementations of this 
interface (by throwing an
- * exception), the read action will fail and no Spark job will be submitted.
- */
-@InterfaceStability.Evolving
-public interface ReadSupport {
-
-  /**
-   * Returns the full schema of this data source, which is usually the 
physical schema of the
-   * underlying storage. This full schema should not be affected by column 
pruning or other
-   * optimizations.
-   */
-  StructType fullSchema();
-
-  /**
-   * Returns a list of {@link InputPartition input partitions}. Each {@link 
InputPartition}
-   * represents a data split that can be processed by one Spark task. The 
number of input
-   * partitions returned here is the same as the number of RDD partitions this 
scan outputs.
-   *
-   * Note that, this may not be a full scan if the data source supports 
optimization like filter
-   * push-down. Implementations should check the input {@link ScanConfig} and 
adjust the resulting
-   * {@link InputPartition input partitions}.
-   */
-  InputPartition[] planInputPartitions(ScanConfig config);
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/15d2e9d7/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ScanConfig.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ScanConfig.java 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ScanConfig.java
deleted file mode 100644
index 7462ce2..0000000
--- 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ScanConfig.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.sources.v2.reader;
-
-import org.apache.spark.annotation.InterfaceStability;
-import org.apache.spark.sql.types.StructType;
-
-/**
- * An interface that carries query specific information for the data scanning 
job, like operator
- * pushdown information and streaming query offsets. This is defined as an 
empty interface, and data
- * sources should define their own {@link ScanConfig} classes.
- *
- * For APIs that take a {@link ScanConfig} as input, like
- * {@link ReadSupport#planInputPartitions(ScanConfig)},
- * {@link BatchReadSupport#createReaderFactory(ScanConfig)} and
- * {@link SupportsReportStatistics#estimateStatistics(ScanConfig)}, 
implementations mostly need to
- * cast the input {@link ScanConfig} to the concrete {@link ScanConfig} class 
of the data source.
- */
-@InterfaceStability.Evolving
-public interface ScanConfig {
-
-  /**
-   * Returns the actual schema of this data source reader, which may be 
different from the physical
-   * schema of the underlying storage, as column pruning or other 
optimizations may happen.
-   *
-   * If this method fails (by throwing an exception), the action will fail and 
no Spark job will be
-   * submitted.
-   */
-  StructType readSchema();
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/15d2e9d7/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ScanConfigBuilder.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ScanConfigBuilder.java
 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ScanConfigBuilder.java
deleted file mode 100644
index 4c0eedf..0000000
--- 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ScanConfigBuilder.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.sources.v2.reader;
-
-import org.apache.spark.annotation.InterfaceStability;
-
-/**
- * An interface for building the {@link ScanConfig}. Implementations can mixin 
those
- * SupportsPushDownXYZ interfaces to do operator pushdown, and keep the 
operator pushdown result in
- * the returned {@link ScanConfig}.
- */
-@InterfaceStability.Evolving
-public interface ScanConfigBuilder {
-  ScanConfig build();
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/15d2e9d7/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/Statistics.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/Statistics.java 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/Statistics.java
index 44799c7..031c7a7 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/Statistics.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/Statistics.java
@@ -23,7 +23,7 @@ import org.apache.spark.annotation.InterfaceStability;
 
 /**
  * An interface to represent statistics for a data source, which is returned by
- * {@link SupportsReportStatistics#estimateStatistics(ScanConfig)}.
+ * {@link SupportsReportStatistics#estimateStatistics()}.
  */
 @InterfaceStability.Evolving
 public interface Statistics {

http://git-wip-us.apache.org/repos/asf/spark/blob/15d2e9d7/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsPushDownFilters.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsPushDownFilters.java
 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsPushDownFilters.java
index 5e7985f..7e0020f 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsPushDownFilters.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsPushDownFilters.java
@@ -21,11 +21,11 @@ import org.apache.spark.annotation.InterfaceStability;
 import org.apache.spark.sql.sources.Filter;
 
 /**
- * A mix-in interface for {@link ScanConfigBuilder}. Data sources can 
implement this interface to
- * push down filters to the data source and reduce the size of the data to be 
read.
+ * A mix-in interface for {@link DataSourceReader}. Data source readers can 
implement this
+ * interface to push down filters to the data source and reduce the size of 
the data to be read.
  */
 @InterfaceStability.Evolving
-public interface SupportsPushDownFilters extends ScanConfigBuilder {
+public interface SupportsPushDownFilters extends DataSourceReader {
 
   /**
    * Pushes down filters, and returns filters that need to be evaluated after 
scanning.

http://git-wip-us.apache.org/repos/asf/spark/blob/15d2e9d7/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsPushDownRequiredColumns.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsPushDownRequiredColumns.java
 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsPushDownRequiredColumns.java
index edb1649..427b4d0 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsPushDownRequiredColumns.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsPushDownRequiredColumns.java
@@ -21,12 +21,12 @@ import org.apache.spark.annotation.InterfaceStability;
 import org.apache.spark.sql.types.StructType;
 
 /**
- * A mix-in interface for {@link ScanConfigBuilder}. Data sources can 
implement this
+ * A mix-in interface for {@link DataSourceReader}. Data source readers can 
implement this
  * interface to push down required columns to the data source and only read 
these columns during
  * scan to reduce the size of the data to be read.
  */
 @InterfaceStability.Evolving
-public interface SupportsPushDownRequiredColumns extends ScanConfigBuilder {
+public interface SupportsPushDownRequiredColumns extends DataSourceReader {
 
   /**
    * Applies column pruning w.r.t. the given requiredSchema.
@@ -35,8 +35,8 @@ public interface SupportsPushDownRequiredColumns extends 
ScanConfigBuilder {
    * also OK to do the pruning partially, e.g., a data source may not be able 
to prune nested
    * fields, and only prune top-level columns.
    *
-   * Note that, {@link ScanConfig#readSchema()} implementation should take 
care of the column
-   * pruning applied here.
+   * Note that, data source readers should update {@link 
DataSourceReader#readSchema()} after
+   * applying column pruning.
    */
   void pruneColumns(StructType requiredSchema);
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/15d2e9d7/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsReportPartitioning.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsReportPartitioning.java
 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsReportPartitioning.java
index db62cd4..6b60da7 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsReportPartitioning.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsReportPartitioning.java
@@ -21,17 +21,17 @@ import org.apache.spark.annotation.InterfaceStability;
 import org.apache.spark.sql.sources.v2.reader.partitioning.Partitioning;
 
 /**
- * A mix in interface for {@link BatchReadSupport}. Data sources can implement 
this interface to
- * report data partitioning and try to avoid shuffle at Spark side.
+ * A mix in interface for {@link DataSourceReader}. Data source readers can 
implement this
+ * interface to report data partitioning and try to avoid shuffle at Spark 
side.
  *
- * Note that, when a {@link ReadSupport} implementation creates exactly one 
{@link InputPartition},
- * Spark may avoid adding a shuffle even if the reader does not implement this 
interface.
+ * Note that, when the reader creates exactly one {@link InputPartition}, 
Spark may avoid
+ * adding a shuffle even if the reader does not implement this interface.
  */
 @InterfaceStability.Evolving
-public interface SupportsReportPartitioning extends ReadSupport {
+public interface SupportsReportPartitioning extends DataSourceReader {
 
   /**
    * Returns the output data partitioning that this reader guarantees.
    */
-  Partitioning outputPartitioning(ScanConfig config);
+  Partitioning outputPartitioning();
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/15d2e9d7/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsReportStatistics.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsReportStatistics.java
 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsReportStatistics.java
index 1831488..44d0ce3 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsReportStatistics.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsReportStatistics.java
@@ -20,18 +20,18 @@ package org.apache.spark.sql.sources.v2.reader;
 import org.apache.spark.annotation.InterfaceStability;
 
 /**
- * A mix in interface for {@link BatchReadSupport}. Data sources can implement 
this interface to
- * report statistics to Spark.
+ * A mix in interface for {@link DataSourceReader}. Data source readers can 
implement this
+ * interface to report statistics to Spark.
  *
  * As of Spark 2.4, statistics are reported to the optimizer before any 
operator is pushed to the
- * data source. Implementations that return more accurate statistics based on 
pushed operators will
- * not improve query performance until the planner can push operators before 
getting stats.
+ * DataSourceReader. Implementations that return more accurate statistics 
based on pushed operators
+ * will not improve query performance until the planner can push operators 
before getting stats.
  */
 @InterfaceStability.Evolving
-public interface SupportsReportStatistics extends ReadSupport {
+public interface SupportsReportStatistics extends DataSourceReader {
 
   /**
-   * Returns the estimated statistics of this data source scan.
+   * Returns the estimated statistics of this data source.
    */
-  Statistics estimateStatistics(ScanConfig config);
+  Statistics estimateStatistics();
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/15d2e9d7/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsScanColumnarBatch.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsScanColumnarBatch.java
 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsScanColumnarBatch.java
new file mode 100644
index 0000000..f4da686
--- /dev/null
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsScanColumnarBatch.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2.reader;
+
+import java.util.List;
+
+import org.apache.spark.annotation.InterfaceStability;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.vectorized.ColumnarBatch;
+
+/**
+ * A mix-in interface for {@link DataSourceReader}. Data source readers can 
implement this
+ * interface to output {@link ColumnarBatch} and make the scan faster.
+ */
+@InterfaceStability.Evolving
+public interface SupportsScanColumnarBatch extends DataSourceReader {
+  @Override
+  default List<InputPartition<InternalRow>> planInputPartitions() {
+    throw new IllegalStateException(
+      "planInputPartitions not supported by default within 
SupportsScanColumnarBatch.");
+  }
+
+  /**
+   * Similar to {@link DataSourceReader#planInputPartitions()}, but returns 
columnar data
+   * in batches.
+   */
+  List<InputPartition<ColumnarBatch>> planBatchInputPartitions();
+
+  /**
+   * Returns true if the concrete data source reader can read data in batch 
according to the scan
+   * properties like required columns, pushes filters, etc. It's possible that 
the implementation
+   * can only support some certain columns with certain types. Users can 
overwrite this method and
+   * {@link #planInputPartitions()} to fallback to normal read path under some 
conditions.
+   */
+  default boolean enableBatchRead() {
+    return true;
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/15d2e9d7/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/partitioning/ClusteredDistribution.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/partitioning/ClusteredDistribution.java
 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/partitioning/ClusteredDistribution.java
index 6764d4b..38ca5fc 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/partitioning/ClusteredDistribution.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/partitioning/ClusteredDistribution.java
@@ -18,12 +18,12 @@
 package org.apache.spark.sql.sources.v2.reader.partitioning;
 
 import org.apache.spark.annotation.InterfaceStability;
-import org.apache.spark.sql.sources.v2.reader.PartitionReader;
+import org.apache.spark.sql.sources.v2.reader.InputPartitionReader;
 
 /**
  * A concrete implementation of {@link Distribution}. Represents a 
distribution where records that
  * share the same values for the {@link #clusteredColumns} will be produced by 
the same
- * {@link PartitionReader}.
+ * {@link InputPartitionReader}.
  */
 @InterfaceStability.Evolving
 public class ClusteredDistribution implements Distribution {

http://git-wip-us.apache.org/repos/asf/spark/blob/15d2e9d7/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/partitioning/Distribution.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/partitioning/Distribution.java
 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/partitioning/Distribution.java
index 364a3f5..5e32ba6 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/partitioning/Distribution.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/partitioning/Distribution.java
@@ -18,14 +18,14 @@
 package org.apache.spark.sql.sources.v2.reader.partitioning;
 
 import org.apache.spark.annotation.InterfaceStability;
-import org.apache.spark.sql.sources.v2.reader.PartitionReader;
+import org.apache.spark.sql.sources.v2.reader.InputPartitionReader;
 
 /**
  * An interface to represent data distribution requirement, which specifies 
how the records should
- * be distributed among the data partitions (one {@link PartitionReader} 
outputs data for one
+ * be distributed among the data partitions (one {@link InputPartitionReader} 
outputs data for one
  * partition).
  * Note that this interface has nothing to do with the data ordering inside one
- * partition(the output records of a single {@link PartitionReader}).
+ * partition(the output records of a single {@link InputPartitionReader}).
  *
  * The instance of this interface is created and provided by Spark, then 
consumed by
  * {@link Partitioning#satisfy(Distribution)}. This means data source 
developers don't need to

http://git-wip-us.apache.org/repos/asf/spark/blob/15d2e9d7/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/partitioning/Partitioning.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/partitioning/Partitioning.java
 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/partitioning/Partitioning.java
index fb0b6f1..f460f6b 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/partitioning/Partitioning.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/partitioning/Partitioning.java
@@ -19,13 +19,12 @@ package org.apache.spark.sql.sources.v2.reader.partitioning;
 
 import org.apache.spark.annotation.InterfaceStability;
 import org.apache.spark.sql.sources.v2.reader.InputPartition;
-import org.apache.spark.sql.sources.v2.reader.ScanConfig;
 import org.apache.spark.sql.sources.v2.reader.SupportsReportPartitioning;
 
 /**
  * An interface to represent the output data partitioning for a data source, 
which is returned by
- * {@link SupportsReportPartitioning#outputPartitioning(ScanConfig)}. Note 
that this should work
- * like a snapshot. Once created, it should be deterministic and always report 
the same number of
+ * {@link SupportsReportPartitioning#outputPartitioning()}. Note that this 
should work like a
+ * snapshot. Once created, it should be deterministic and always report the 
same number of
  * partitions and the same "satisfy" result for a certain distribution.
  */
 @InterfaceStability.Evolving

http://git-wip-us.apache.org/repos/asf/spark/blob/15d2e9d7/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/ContinuousInputPartitionReader.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/ContinuousInputPartitionReader.java
 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/ContinuousInputPartitionReader.java
new file mode 100644
index 0000000..7b0ba0b
--- /dev/null
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/ContinuousInputPartitionReader.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2.reader.streaming;
+
+import org.apache.spark.annotation.InterfaceStability;
+import org.apache.spark.sql.sources.v2.reader.InputPartitionReader;
+
+/**
+ * A variation on {@link InputPartitionReader} for use with streaming in 
continuous processing mode.
+ */
+@InterfaceStability.Evolving
+public interface ContinuousInputPartitionReader<T> extends 
InputPartitionReader<T> {
+    /**
+     * Get the offset of the current record, or the start offset if no records 
have been read.
+     *
+     * The execution engine will call this method along with get() to keep 
track of the current
+     * offset. When an epoch ends, the offset of the previous record in each 
partition will be saved
+     * as a restart checkpoint.
+     */
+    PartitionOffset getOffset();
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/15d2e9d7/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/ContinuousPartitionReader.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/ContinuousPartitionReader.java
 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/ContinuousPartitionReader.java
deleted file mode 100644
index 9101c8a..0000000
--- 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/ContinuousPartitionReader.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.sources.v2.reader.streaming;
-
-import org.apache.spark.annotation.InterfaceStability;
-import org.apache.spark.sql.sources.v2.reader.PartitionReader;
-
-/**
- * A variation on {@link PartitionReader} for use with continuous streaming 
processing.
- */
-@InterfaceStability.Evolving
-public interface ContinuousPartitionReader<T> extends PartitionReader<T> {
-
-  /**
-   * Get the offset of the current record, or the start offset if no records 
have been read.
-   *
-   * The execution engine will call this method along with get() to keep track 
of the current
-   * offset. When an epoch ends, the offset of the previous record in each 
partition will be saved
-   * as a restart checkpoint.
-   */
-  PartitionOffset getOffset();
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/15d2e9d7/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/ContinuousPartitionReaderFactory.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/ContinuousPartitionReaderFactory.java
 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/ContinuousPartitionReaderFactory.java
deleted file mode 100644
index 2d9f1ca..0000000
--- 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/ContinuousPartitionReaderFactory.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.sources.v2.reader.streaming;
-
-import org.apache.spark.annotation.InterfaceStability;
-import org.apache.spark.sql.catalyst.InternalRow;
-import org.apache.spark.sql.sources.v2.reader.InputPartition;
-import org.apache.spark.sql.sources.v2.reader.PartitionReaderFactory;
-import org.apache.spark.sql.vectorized.ColumnarBatch;
-
-/**
- * A variation on {@link PartitionReaderFactory} that returns {@link 
ContinuousPartitionReader}
- * instead of {@link org.apache.spark.sql.sources.v2.reader.PartitionReader}. 
It's used for
- * continuous streaming processing.
- */
-@InterfaceStability.Evolving
-public interface ContinuousPartitionReaderFactory extends 
PartitionReaderFactory {
-  @Override
-  ContinuousPartitionReader<InternalRow> createReader(InputPartition 
partition);
-
-  @Override
-  default ContinuousPartitionReader<ColumnarBatch> 
createColumnarReader(InputPartition partition) {
-    throw new UnsupportedOperationException("Cannot create columnar reader.");
-  }
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to