http://git-wip-us.apache.org/repos/asf/spark/blob/e7548871/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
index c7b74f3..946b636 100644
--- 
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
+++ 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.kafka010
 import java.io._
 import java.nio.charset.StandardCharsets.UTF_8
 import java.nio.file.{Files, Paths}
-import java.util.{Locale, Optional, Properties}
+import java.util.{Locale, Properties}
 import java.util.concurrent.ConcurrentLinkedQueue
 import java.util.concurrent.atomic.AtomicInteger
 
@@ -44,11 +44,9 @@ import 
org.apache.spark.sql.execution.streaming.continuous.ContinuousExecution
 import org.apache.spark.sql.functions.{count, window}
 import org.apache.spark.sql.kafka010.KafkaSourceProvider._
 import org.apache.spark.sql.sources.v2.DataSourceOptions
-import org.apache.spark.sql.sources.v2.reader.streaming.{Offset => OffsetV2}
 import org.apache.spark.sql.streaming.{ProcessingTime, StreamTest}
 import org.apache.spark.sql.streaming.util.StreamManualClock
 import org.apache.spark.sql.test.{SharedSQLContext, TestSparkSession}
-import org.apache.spark.sql.types.StructType
 
 abstract class KafkaSourceTest extends StreamTest with SharedSQLContext with 
KafkaTest {
 
@@ -118,14 +116,16 @@ abstract class KafkaSourceTest extends StreamTest with 
SharedSQLContext with Kaf
         query.nonEmpty,
         "Cannot add data when there is no query for finding the active kafka 
source")
 
-      val sources = {
+      val sources: Seq[BaseStreamingSource] = {
         query.get.logicalPlan.collect {
           case StreamingExecutionRelation(source: KafkaSource, _) => source
-          case StreamingExecutionRelation(source: KafkaMicroBatchReader, _) => 
source
+          case StreamingExecutionRelation(source: KafkaMicroBatchReadSupport, 
_) => source
         } ++ (query.get.lastExecution match {
           case null => Seq()
           case e => e.logical.collect {
-            case StreamingDataSourceV2Relation(_, _, _, reader: 
KafkaContinuousReader) => reader
+            case r: StreamingDataSourceV2Relation
+                if r.readSupport.isInstanceOf[KafkaContinuousReadSupport] =>
+              r.readSupport.asInstanceOf[KafkaContinuousReadSupport]
           }
         })
       }.distinct
@@ -650,7 +650,7 @@ class KafkaMicroBatchV2SourceSuite extends 
KafkaMicroBatchSourceSuiteBase {
       makeSureGetOffsetCalled,
       AssertOnQuery { query =>
         query.logicalPlan.collect {
-          case StreamingExecutionRelation(_: KafkaMicroBatchReader, _) => true
+          case StreamingExecutionRelation(_: KafkaMicroBatchReadSupport, _) => 
true
         }.nonEmpty
       }
     )
@@ -675,17 +675,16 @@ class KafkaMicroBatchV2SourceSuite extends 
KafkaMicroBatchSourceSuiteBase {
           "kafka.bootstrap.servers" -> testUtils.brokerAddress,
           "subscribe" -> topic
         ) ++ Option(minPartitions).map { p => "minPartitions" -> p}
-        val reader = provider.createMicroBatchReader(
-          Optional.empty[StructType], dir.getAbsolutePath, new 
DataSourceOptions(options.asJava))
-        reader.setOffsetRange(
-          Optional.of[OffsetV2](KafkaSourceOffset(Map(tp -> 0L))),
-          Optional.of[OffsetV2](KafkaSourceOffset(Map(tp -> 100L)))
-        )
-        val factories = reader.planInputPartitions().asScala
+        val readSupport = provider.createMicroBatchReadSupport(
+          dir.getAbsolutePath, new DataSourceOptions(options.asJava))
+        val config = readSupport.newScanConfigBuilder(
+          KafkaSourceOffset(Map(tp -> 0L)),
+          KafkaSourceOffset(Map(tp -> 100L))).build()
+        val inputPartitions = readSupport.planInputPartitions(config)
           .map(_.asInstanceOf[KafkaMicroBatchInputPartition])
-        withClue(s"minPartitions = $minPartitions generated factories 
$factories\n\t") {
-          assert(factories.size == numPartitionsGenerated)
-          factories.foreach { f => assert(f.reuseKafkaConsumer == 
reusesConsumers) }
+        withClue(s"minPartitions = $minPartitions generated factories 
$inputPartitions\n\t") {
+          assert(inputPartitions.size == numPartitionsGenerated)
+          inputPartitions.foreach { f => assert(f.reuseKafkaConsumer == 
reusesConsumers) }
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/e7548871/sql/core/src/main/java/org/apache/spark/sql/sources/v2/BatchReadSupportProvider.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/BatchReadSupportProvider.java
 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/BatchReadSupportProvider.java
new file mode 100644
index 0000000..f403dc6
--- /dev/null
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/BatchReadSupportProvider.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2;
+
+import org.apache.spark.annotation.InterfaceStability;
+import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Utils;
+import org.apache.spark.sql.sources.v2.reader.BatchReadSupport;
+import org.apache.spark.sql.types.StructType;
+
+/**
+ * A mix-in interface for {@link DataSourceV2}. Data sources can implement 
this interface to
+ * provide data reading ability for batch processing.
+ *
+ * This interface is used to create {@link BatchReadSupport} instances when 
end users run
+ * {@code SparkSession.read.format(...).option(...).load()}.
+ */
+@InterfaceStability.Evolving
+public interface BatchReadSupportProvider extends DataSourceV2 {
+
+  /**
+   * Creates a {@link BatchReadSupport} instance to load the data from this 
data source with a user
+   * specified schema, which is called by Spark at the beginning of each batch 
query.
+   *
+   * Spark will call this method at the beginning of each batch query to 
create a
+   * {@link BatchReadSupport} instance.
+   *
+   * By default this method throws {@link UnsupportedOperationException}, 
implementations should
+   * override this method to handle user specified schema.
+   *
+   * @param schema the user specified schema.
+   * @param options the options for the returned data source reader, which is 
an immutable
+   *                case-insensitive string-to-string map.
+   */
+  default BatchReadSupport createBatchReadSupport(StructType schema, 
DataSourceOptions options) {
+    return DataSourceV2Utils.failForUserSpecifiedSchema(this);
+  }
+
+  /**
+   * Creates a {@link BatchReadSupport} instance to scan the data from this 
data source, which is
+   * called by Spark at the beginning of each batch query.
+   *
+   * @param options the options for the returned data source reader, which is 
an immutable
+   *                case-insensitive string-to-string map.
+   */
+  BatchReadSupport createBatchReadSupport(DataSourceOptions options);
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/e7548871/sql/core/src/main/java/org/apache/spark/sql/sources/v2/BatchWriteSupportProvider.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/BatchWriteSupportProvider.java
 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/BatchWriteSupportProvider.java
new file mode 100644
index 0000000..bd10c33
--- /dev/null
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/BatchWriteSupportProvider.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2;
+
+import java.util.Optional;
+
+import org.apache.spark.annotation.InterfaceStability;
+import org.apache.spark.sql.SaveMode;
+import org.apache.spark.sql.sources.v2.writer.BatchWriteSupport;
+import org.apache.spark.sql.types.StructType;
+
+/**
+ * A mix-in interface for {@link DataSourceV2}. Data sources can implement 
this interface to
+ * provide data writing ability for batch processing.
+ *
+ * This interface is used to create {@link BatchWriteSupport} instances when 
end users run
+ * {@code Dataset.write.format(...).option(...).save()}.
+ */
+@InterfaceStability.Evolving
+public interface BatchWriteSupportProvider extends DataSourceV2 {
+
+  /**
+   * Creates an optional {@link BatchWriteSupport} instance to save the data 
to this data source,
+   * which is called by Spark at the beginning of each batch query.
+   *
+   * Data sources can return None if there is no writing needed to be done 
according to the save
+   * mode.
+   *
+   * @param queryId A unique string for the writing query. It's possible that 
there are many
+   *                writing queries running at the same time, and the returned
+   *                {@link BatchWriteSupport} can use this id to distinguish 
itself from others.
+   * @param schema the schema of the data to be written.
+   * @param mode the save mode which determines what to do when the data are 
already in this data
+   *             source, please refer to {@link SaveMode} for more details.
+   * @param options the options for the returned data source writer, which is 
an immutable
+   *                case-insensitive string-to-string map.
+   * @return a write support to write data to this data source.
+   */
+  Optional<BatchWriteSupport> createBatchWriteSupport(
+      String queryId,
+      StructType schema,
+      SaveMode mode,
+      DataSourceOptions options);
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/e7548871/sql/core/src/main/java/org/apache/spark/sql/sources/v2/ContinuousReadSupport.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/ContinuousReadSupport.java
 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/ContinuousReadSupport.java
deleted file mode 100644
index 7df5a45..0000000
--- 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/ContinuousReadSupport.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.sources.v2;
-
-import java.util.Optional;
-
-import org.apache.spark.annotation.InterfaceStability;
-import org.apache.spark.sql.sources.v2.reader.streaming.ContinuousReader;
-import org.apache.spark.sql.types.StructType;
-
-/**
- * A mix-in interface for {@link DataSourceV2}. Data sources can implement 
this interface to
- * provide data reading ability for continuous stream processing.
- */
-@InterfaceStability.Evolving
-public interface ContinuousReadSupport extends DataSourceV2 {
-  /**
-   * Creates a {@link ContinuousReader} to scan the data from this data source.
-   *
-   * @param schema the user provided schema, or empty() if none was provided
-   * @param checkpointLocation a path to Hadoop FS scratch space that can be 
used for failure
-   *                           recovery. Readers for the same logical source 
in the same query
-   *                           will be given the same checkpointLocation.
-   * @param options the options for the returned data source reader, which is 
an immutable
-   *                case-insensitive string-to-string map.
-   */
-  ContinuousReader createContinuousReader(
-    Optional<StructType> schema,
-    String checkpointLocation,
-    DataSourceOptions options);
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/e7548871/sql/core/src/main/java/org/apache/spark/sql/sources/v2/ContinuousReadSupportProvider.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/ContinuousReadSupportProvider.java
 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/ContinuousReadSupportProvider.java
new file mode 100644
index 0000000..824c290
--- /dev/null
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/ContinuousReadSupportProvider.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2;
+
+import org.apache.spark.annotation.InterfaceStability;
+import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Utils;
+import org.apache.spark.sql.sources.v2.reader.streaming.ContinuousReadSupport;
+import org.apache.spark.sql.types.StructType;
+
+/**
+ * A mix-in interface for {@link DataSourceV2}. Data sources can implement 
this interface to
+ * provide data reading ability for continuous stream processing.
+ *
+ * This interface is used to create {@link ContinuousReadSupport} instances 
when end users run
+ * {@code SparkSession.readStream.format(...).option(...).load()} with a 
continuous trigger.
+ */
+@InterfaceStability.Evolving
+public interface ContinuousReadSupportProvider extends DataSourceV2 {
+
+  /**
+   * Creates a {@link ContinuousReadSupport} instance to scan the data from 
this streaming data
+   * source with a user specified schema, which is called by Spark at the 
beginning of each
+   * continuous streaming query.
+   *
+   * By default this method throws {@link UnsupportedOperationException}, 
implementations should
+   * override this method to handle user specified schema.
+   *
+   * @param schema the user provided schema.
+   * @param checkpointLocation a path to Hadoop FS scratch space that can be 
used for failure
+   *                           recovery. Readers for the same logical source 
in the same query
+   *                           will be given the same checkpointLocation.
+   * @param options the options for the returned data source reader, which is 
an immutable
+   *                case-insensitive string-to-string map.
+   */
+  default ContinuousReadSupport createContinuousReadSupport(
+      StructType schema,
+      String checkpointLocation,
+      DataSourceOptions options) {
+    return DataSourceV2Utils.failForUserSpecifiedSchema(this);
+  }
+
+  /**
+   * Creates a {@link ContinuousReadSupport} instance to scan the data from 
this streaming data
+   * source, which is called by Spark at the beginning of each continuous 
streaming query.
+   *
+   * @param checkpointLocation a path to Hadoop FS scratch space that can be 
used for failure
+   *                           recovery. Readers for the same logical source 
in the same query
+   *                           will be given the same checkpointLocation.
+   * @param options the options for the returned data source reader, which is 
an immutable
+   *                case-insensitive string-to-string map.
+   */
+  ContinuousReadSupport createContinuousReadSupport(
+      String checkpointLocation,
+      DataSourceOptions options);
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/e7548871/sql/core/src/main/java/org/apache/spark/sql/sources/v2/DataSourceV2.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/DataSourceV2.java 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/DataSourceV2.java
index 6234071..6e31e84 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/DataSourceV2.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/DataSourceV2.java
@@ -22,9 +22,13 @@ import org.apache.spark.annotation.InterfaceStability;
 /**
  * The base interface for data source v2. Implementations must have a public, 
0-arg constructor.
  *
- * Note that this is an empty interface. Data source implementations should 
mix-in at least one of
- * the plug-in interfaces like {@link ReadSupport} and {@link WriteSupport}. 
Otherwise it's just
- * a dummy data source which is un-readable/writable.
+ * Note that this is an empty interface. Data source implementations must mix 
in interfaces such as
+ * {@link BatchReadSupportProvider} or {@link BatchWriteSupportProvider}, 
which can provide
+ * batch or streaming read/write support instances. Otherwise it's just a 
dummy data source which
+ * is un-readable/writable.
+ *
+ * If Spark fails to execute any methods in the implementations of this 
interface (by throwing an
+ * exception), the read action will fail and no Spark job will be submitted.
  */
 @InterfaceStability.Evolving
 public interface DataSourceV2 {}

http://git-wip-us.apache.org/repos/asf/spark/blob/e7548871/sql/core/src/main/java/org/apache/spark/sql/sources/v2/MicroBatchReadSupport.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/MicroBatchReadSupport.java
 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/MicroBatchReadSupport.java
deleted file mode 100644
index 7f4a2c9..0000000
--- 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/MicroBatchReadSupport.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.sources.v2;
-
-import java.util.Optional;
-
-import org.apache.spark.annotation.InterfaceStability;
-import org.apache.spark.sql.sources.v2.reader.streaming.MicroBatchReader;
-import org.apache.spark.sql.types.StructType;
-
-/**
- * A mix-in interface for {@link DataSourceV2}. Data sources can implement 
this interface to
- * provide streaming micro-batch data reading ability.
- */
-@InterfaceStability.Evolving
-public interface MicroBatchReadSupport extends DataSourceV2 {
-  /**
-   * Creates a {@link MicroBatchReader} to read batches of data from this data 
source in a
-   * streaming query.
-   *
-   * The execution engine will create a micro-batch reader at the start of a 
streaming query,
-   * alternate calls to setOffsetRange and planInputPartitions for each batch 
to process, and
-   * then call stop() when the execution is complete. Note that a single query 
may have multiple
-   * executions due to restart or failure recovery.
-   *
-   * @param schema the user provided schema, or empty() if none was provided
-   * @param checkpointLocation a path to Hadoop FS scratch space that can be 
used for failure
-   *                           recovery. Readers for the same logical source 
in the same query
-   *                           will be given the same checkpointLocation.
-   * @param options the options for the returned data source reader, which is 
an immutable
-   *                case-insensitive string-to-string map.
-   */
-  MicroBatchReader createMicroBatchReader(
-      Optional<StructType> schema,
-      String checkpointLocation,
-      DataSourceOptions options);
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/e7548871/sql/core/src/main/java/org/apache/spark/sql/sources/v2/MicroBatchReadSupportProvider.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/MicroBatchReadSupportProvider.java
 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/MicroBatchReadSupportProvider.java
new file mode 100644
index 0000000..61c08e7
--- /dev/null
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/MicroBatchReadSupportProvider.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2;
+
+import org.apache.spark.annotation.InterfaceStability;
+import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Utils;
+import org.apache.spark.sql.sources.v2.reader.streaming.MicroBatchReadSupport;
+import org.apache.spark.sql.types.StructType;
+
+/**
+ * A mix-in interface for {@link DataSourceV2}. Data sources can implement 
this interface to
+ * provide data reading ability for micro-batch stream processing.
+ *
+ * This interface is used to create {@link MicroBatchReadSupport} instances 
when end users run
+ * {@code SparkSession.readStream.format(...).option(...).load()} with a 
micro-batch trigger.
+ */
+@InterfaceStability.Evolving
+public interface MicroBatchReadSupportProvider extends DataSourceV2 {
+
+  /**
+   * Creates a {@link MicroBatchReadSupport} instance to scan the data from 
this streaming data
+   * source with a user specified schema, which is called by Spark at the 
beginning of each
+   * micro-batch streaming query.
+   *
+   * By default this method throws {@link UnsupportedOperationException}, 
implementations should
+   * override this method to handle user specified schema.
+   *
+   * @param schema the user provided schema.
+   * @param checkpointLocation a path to Hadoop FS scratch space that can be 
used for failure
+   *                           recovery. Readers for the same logical source 
in the same query
+   *                           will be given the same checkpointLocation.
+   * @param options the options for the returned data source reader, which is 
an immutable
+   *                case-insensitive string-to-string map.
+   */
+  default MicroBatchReadSupport createMicroBatchReadSupport(
+      StructType schema,
+      String checkpointLocation,
+      DataSourceOptions options) {
+    return DataSourceV2Utils.failForUserSpecifiedSchema(this);
+  }
+
+  /**
+   * Creates a {@link MicroBatchReadSupport} instance to scan the data from 
this streaming data
+   * source, which is called by Spark at the beginning of each micro-batch 
streaming query.
+   *
+   * @param checkpointLocation a path to Hadoop FS scratch space that can be 
used for failure
+   *                           recovery. Readers for the same logical source 
in the same query
+   *                           will be given the same checkpointLocation.
+   * @param options the options for the returned data source reader, which is 
an immutable
+   *                case-insensitive string-to-string map.
+   */
+  MicroBatchReadSupport createMicroBatchReadSupport(
+      String checkpointLocation,
+      DataSourceOptions options);
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/e7548871/sql/core/src/main/java/org/apache/spark/sql/sources/v2/ReadSupport.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/ReadSupport.java 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/ReadSupport.java
deleted file mode 100644
index 80ac08e..0000000
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/ReadSupport.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.sources.v2;
-
-import org.apache.spark.annotation.InterfaceStability;
-import org.apache.spark.sql.sources.DataSourceRegister;
-import org.apache.spark.sql.sources.v2.reader.DataSourceReader;
-import org.apache.spark.sql.types.StructType;
-
-/**
- * A mix-in interface for {@link DataSourceV2}. Data sources can implement 
this interface to
- * provide data reading ability and scan the data from the data source.
- */
-@InterfaceStability.Evolving
-public interface ReadSupport extends DataSourceV2 {
-
-  /**
-   * Creates a {@link DataSourceReader} to scan the data from this data source.
-   *
-   * If this method fails (by throwing an exception), the action will fail and 
no Spark job will be
-   * submitted.
-   *
-   * @param schema the user specified schema.
-   * @param options the options for the returned data source reader, which is 
an immutable
-   *                case-insensitive string-to-string map.
-   *
-   * By default this method throws {@link UnsupportedOperationException}, 
implementations should
-   * override this method to handle user specified schema.
-   */
-  default DataSourceReader createReader(StructType schema, DataSourceOptions 
options) {
-    String name;
-    if (this instanceof DataSourceRegister) {
-      name = ((DataSourceRegister) this).shortName();
-    } else {
-      name = this.getClass().getName();
-    }
-    throw new UnsupportedOperationException(name + " does not support user 
specified schema");
-  }
-
-  /**
-   * Creates a {@link DataSourceReader} to scan the data from this data source.
-   *
-   * If this method fails (by throwing an exception), the action will fail and 
no Spark job will be
-   * submitted.
-   *
-   * @param options the options for the returned data source reader, which is 
an immutable
-   *                case-insensitive string-to-string map.
-   */
-  DataSourceReader createReader(DataSourceOptions options);
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/e7548871/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SessionConfigSupport.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SessionConfigSupport.java
 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SessionConfigSupport.java
index 9d66805..bbe430e 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SessionConfigSupport.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SessionConfigSupport.java
@@ -27,10 +27,11 @@ import org.apache.spark.annotation.InterfaceStability;
 @InterfaceStability.Evolving
 public interface SessionConfigSupport extends DataSourceV2 {
 
-    /**
-     * Key prefix of the session configs to propagate. Spark will extract all 
session configs that
-     * starts with `spark.datasource.$keyPrefix`, turn 
`spark.datasource.$keyPrefix.xxx -&gt; yyy`
-     * into `xxx -&gt; yyy`, and propagate them to all data source operations 
in this session.
-     */
-    String keyPrefix();
+  /**
+   * Key prefix of the session configs to propagate, which is usually the data 
source name. Spark
+   * will extract all session configs that starts with 
`spark.datasource.$keyPrefix`, turn
+   * `spark.datasource.$keyPrefix.xxx -&gt; yyy` into `xxx -&gt; yyy`, and 
propagate them to all
+   * data source operations in this session.
+   */
+  String keyPrefix();
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/e7548871/sql/core/src/main/java/org/apache/spark/sql/sources/v2/StreamWriteSupport.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/StreamWriteSupport.java
 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/StreamWriteSupport.java
deleted file mode 100644
index a77b014..0000000
--- 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/StreamWriteSupport.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.sources.v2;
-
-import org.apache.spark.annotation.InterfaceStability;
-import org.apache.spark.sql.execution.streaming.BaseStreamingSink;
-import org.apache.spark.sql.sources.v2.writer.DataSourceWriter;
-import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter;
-import org.apache.spark.sql.streaming.OutputMode;
-import org.apache.spark.sql.types.StructType;
-
-/**
- * A mix-in interface for {@link DataSourceV2}. Data sources can implement 
this interface to
- * provide data writing ability for structured streaming.
- */
-@InterfaceStability.Evolving
-public interface StreamWriteSupport extends DataSourceV2, BaseStreamingSink {
-
-    /**
-     * Creates an optional {@link StreamWriter} to save the data to this data 
source. Data
-     * sources can return None if there is no writing needed to be done.
-     *
-     * @param queryId A unique string for the writing query. It's possible 
that there are many
-     *                writing queries running at the same time, and the 
returned
-     *                {@link DataSourceWriter} can use this id to distinguish 
itself from others.
-     * @param schema the schema of the data to be written.
-     * @param mode the output mode which determines what successive epoch 
output means to this
-     *             sink, please refer to {@link OutputMode} for more details.
-     * @param options the options for the returned data source writer, which 
is an immutable
-     *                case-insensitive string-to-string map.
-     */
-    StreamWriter createStreamWriter(
-        String queryId,
-        StructType schema,
-        OutputMode mode,
-        DataSourceOptions options);
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/e7548871/sql/core/src/main/java/org/apache/spark/sql/sources/v2/StreamingWriteSupportProvider.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/StreamingWriteSupportProvider.java
 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/StreamingWriteSupportProvider.java
new file mode 100644
index 0000000..f9ca85d
--- /dev/null
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/StreamingWriteSupportProvider.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2;
+
+import org.apache.spark.annotation.InterfaceStability;
+import org.apache.spark.sql.execution.streaming.BaseStreamingSink;
+import org.apache.spark.sql.sources.v2.writer.streaming.StreamingWriteSupport;
+import org.apache.spark.sql.streaming.OutputMode;
+import org.apache.spark.sql.types.StructType;
+
+/**
+ * A mix-in interface for {@link DataSourceV2}. Data sources can implement 
this interface to
+ * provide data writing ability for structured streaming.
+ *
+ * This interface is used to create {@link StreamingWriteSupport} instances 
when end users run
+ * {@code Dataset.writeStream.format(...).option(...).start()}.
+ */
+@InterfaceStability.Evolving
+public interface StreamingWriteSupportProvider extends DataSourceV2, 
BaseStreamingSink {
+
+  /**
+   * Creates a {@link StreamingWriteSupport} instance to save the data to this 
data source, which is
+   * called by Spark at the beginning of each streaming query.
+   *
+   * @param queryId A unique string for the writing query. It's possible that 
there are many
+   *                writing queries running at the same time, and the returned
+   *                {@link StreamingWriteSupport} can use this id to 
distinguish itself from others.
+   * @param schema the schema of the data to be written.
+   * @param mode the output mode which determines what successive epoch output 
means to this
+   *             sink, please refer to {@link OutputMode} for more details.
+   * @param options the options for the returned data source writer, which is 
an immutable
+   *                case-insensitive string-to-string map.
+   */
+  StreamingWriteSupport createStreamingWriteSupport(
+      String queryId,
+      StructType schema,
+      OutputMode mode,
+      DataSourceOptions options);
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/e7548871/sql/core/src/main/java/org/apache/spark/sql/sources/v2/WriteSupport.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/WriteSupport.java 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/WriteSupport.java
deleted file mode 100644
index 048787a..0000000
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/WriteSupport.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.sources.v2;
-
-import java.util.Optional;
-
-import org.apache.spark.annotation.InterfaceStability;
-import org.apache.spark.sql.SaveMode;
-import org.apache.spark.sql.sources.v2.writer.DataSourceWriter;
-import org.apache.spark.sql.types.StructType;
-
-/**
- * A mix-in interface for {@link DataSourceV2}. Data sources can implement 
this interface to
- * provide data writing ability and save the data to the data source.
- */
-@InterfaceStability.Evolving
-public interface WriteSupport extends DataSourceV2 {
-
-  /**
-   * Creates an optional {@link DataSourceWriter} to save the data to this 
data source. Data
-   * sources can return None if there is no writing needed to be done 
according to the save mode.
-   *
-   * If this method fails (by throwing an exception), the action will fail and 
no Spark job will be
-   * submitted.
-   *
-   * @param writeUUID A unique string for the writing job. It's possible that 
there are many writing
-   *                  jobs running at the same time, and the returned {@link 
DataSourceWriter} can
-   *                  use this job id to distinguish itself from other jobs.
-   * @param schema the schema of the data to be written.
-   * @param mode the save mode which determines what to do when the data are 
already in this data
-   *             source, please refer to {@link SaveMode} for more details.
-   * @param options the options for the returned data source writer, which is 
an immutable
-   *                case-insensitive string-to-string map.
-   * @return a writer to append data to this data source
-   */
-  Optional<DataSourceWriter> createWriter(
-      String writeUUID, StructType schema, SaveMode mode, DataSourceOptions 
options);
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/e7548871/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/BatchReadSupport.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/BatchReadSupport.java
 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/BatchReadSupport.java
new file mode 100644
index 0000000..452ee86
--- /dev/null
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/BatchReadSupport.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2.reader;
+
+import org.apache.spark.annotation.InterfaceStability;
+
+/**
+ * An interface that defines how to load the data from data source for batch 
processing.
+ *
+ * The execution engine will get an instance of this interface from a data 
source provider
+ * (e.g. {@link org.apache.spark.sql.sources.v2.BatchReadSupportProvider}) at 
the start of a batch
+ * query, then call {@link #newScanConfigBuilder()} and create an instance of 
{@link ScanConfig}.
+ * The {@link ScanConfigBuilder} can apply operator pushdown and keep the 
pushdown result in
+ * {@link ScanConfig}. The {@link ScanConfig} will be used to create input 
partitions and reader
+ * factory to scan data from the data source with a Spark job.
+ */
+@InterfaceStability.Evolving
+public interface BatchReadSupport extends ReadSupport {
+
+  /**
+   * Returns a builder of {@link ScanConfig}. Spark will call this method and 
create a
+   * {@link ScanConfig} for each data scanning job.
+   *
+   * The builder can take some query specific information to do operators 
pushdown, and keep these
+   * information in the created {@link ScanConfig}.
+   *
+   * This is the first step of the data scan. All other methods in {@link 
BatchReadSupport} needs
+   * to take {@link ScanConfig} as an input.
+   */
+  ScanConfigBuilder newScanConfigBuilder();
+
+  /**
+   * Returns a factory, which produces one {@link PartitionReader} for one 
{@link InputPartition}.
+   */
+  PartitionReaderFactory createReaderFactory(ScanConfig config);
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/e7548871/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ContinuousInputPartition.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ContinuousInputPartition.java
 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ContinuousInputPartition.java
deleted file mode 100644
index dcb8771..0000000
--- 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ContinuousInputPartition.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.sources.v2.reader;
-
-import org.apache.spark.annotation.InterfaceStability;
-import org.apache.spark.sql.sources.v2.reader.streaming.PartitionOffset;
-
-/**
- * A mix-in interface for {@link InputPartition}. Continuous input partitions 
can
- * implement this interface to provide creating {@link InputPartitionReader} 
with particular offset.
- */
-@InterfaceStability.Evolving
-public interface ContinuousInputPartition<T> extends InputPartition<T> {
-  /**
-   * Create an input partition reader with particular offset as its 
startOffset.
-   *
-   * @param offset offset want to set as the input partition reader's 
startOffset.
-   */
-  InputPartitionReader<T> createContinuousReader(PartitionOffset offset);
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/e7548871/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/DataSourceReader.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/DataSourceReader.java
 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/DataSourceReader.java
deleted file mode 100644
index da98fab..0000000
--- 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/DataSourceReader.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.sources.v2.reader;
-
-import java.util.List;
-
-import org.apache.spark.annotation.InterfaceStability;
-import org.apache.spark.sql.catalyst.InternalRow;
-import org.apache.spark.sql.sources.v2.DataSourceOptions;
-import org.apache.spark.sql.sources.v2.ReadSupport;
-import org.apache.spark.sql.types.StructType;
-
-/**
- * A data source reader that is returned by
- * {@link ReadSupport#createReader(DataSourceOptions)} or
- * {@link ReadSupport#createReader(StructType, DataSourceOptions)}.
- * It can mix in various query optimization interfaces to speed up the data 
scan. The actual scan
- * logic is delegated to {@link InputPartition}s, which are returned by
- * {@link #planInputPartitions()}.
- *
- * There are mainly 3 kinds of query optimizations:
- *   1. Operators push-down. E.g., filter push-down, required columns 
push-down(aka column
- *      pruning), etc. Names of these interfaces start with `SupportsPushDown`.
- *   2. Information Reporting. E.g., statistics reporting, ordering reporting, 
etc.
- *      Names of these interfaces start with `SupportsReporting`.
- *   3. Columnar scan if implements {@link SupportsScanColumnarBatch}.
- *
- * If an exception was throw when applying any of these query optimizations, 
the action will fail
- * and no Spark job will be submitted.
- *
- * Spark first applies all operator push-down optimizations that this data 
source supports. Then
- * Spark collects information this data source reported for further 
optimizations. Finally Spark
- * issues the scan request and does the actual data reading.
- */
-@InterfaceStability.Evolving
-public interface DataSourceReader {
-
-  /**
-   * Returns the actual schema of this data source reader, which may be 
different from the physical
-   * schema of the underlying storage, as column pruning or other 
optimizations may happen.
-   *
-   * If this method fails (by throwing an exception), the action will fail and 
no Spark job will be
-   * submitted.
-   */
-  StructType readSchema();
-
-  /**
-   * Returns a list of {@link InputPartition}s. Each {@link InputPartition} is 
responsible for
-   * creating a data reader to output data of one RDD partition. The number of 
input partitions
-   * returned here is the same as the number of RDD partitions this scan 
outputs.
-   *
-   * Note that, this may not be a full scan if the data source reader mixes in 
other optimization
-   * interfaces like column pruning, filter push-down, etc. These 
optimizations are applied before
-   * Spark issues the scan request.
-   *
-   * If this method fails (by throwing an exception), the action will fail and 
no Spark job will be
-   * submitted.
-   */
-  List<InputPartition<InternalRow>> planInputPartitions();
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/e7548871/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/InputPartition.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/InputPartition.java
 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/InputPartition.java
index f2038d0..95c30de 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/InputPartition.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/InputPartition.java
@@ -22,18 +22,18 @@ import java.io.Serializable;
 import org.apache.spark.annotation.InterfaceStability;
 
 /**
- * An input partition returned by {@link 
DataSourceReader#planInputPartitions()} and is
- * responsible for creating the actual data reader of one RDD partition.
- * The relationship between {@link InputPartition} and {@link 
InputPartitionReader}
- * is similar to the relationship between {@link Iterable} and {@link 
java.util.Iterator}.
+ * A serializable representation of an input partition returned by
+ * {@link ReadSupport#planInputPartitions(ScanConfig)}.
  *
- * Note that {@link InputPartition}s will be serialized and sent to executors, 
then
- * {@link InputPartitionReader}s will be created on executors to do the actual 
reading. So
- * {@link InputPartition} must be serializable while {@link 
InputPartitionReader} doesn't need to
- * be.
+ * Note that {@link InputPartition} will be serialized and sent to executors, 
then
+ * {@link PartitionReader} will be created by
+ * {@link PartitionReaderFactory#createReader(InputPartition)} or
+ * {@link PartitionReaderFactory#createColumnarReader(InputPartition)} on 
executors to do
+ * the actual reading. So {@link InputPartition} must be serializable while 
{@link PartitionReader}
+ * doesn't need to be.
  */
 @InterfaceStability.Evolving
-public interface InputPartition<T> extends Serializable {
+public interface InputPartition extends Serializable {
 
   /**
    * The preferred locations where the input partition reader returned by this 
partition can run
@@ -51,12 +51,4 @@ public interface InputPartition<T> extends Serializable {
   default String[] preferredLocations() {
     return new String[0];
   }
-
-  /**
-   * Returns an input partition reader to do the actual reading work.
-   *
-   * If this method fails (by throwing an exception), the corresponding Spark 
task would fail and
-   * get retried until hitting the maximum retry times.
-   */
-  InputPartitionReader<T> createPartitionReader();
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/e7548871/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/InputPartitionReader.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/InputPartitionReader.java
 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/InputPartitionReader.java
deleted file mode 100644
index f3ff7f5..0000000
--- 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/InputPartitionReader.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.sources.v2.reader;
-
-import java.io.Closeable;
-import java.io.IOException;
-
-import org.apache.spark.annotation.InterfaceStability;
-
-/**
- * An input partition reader returned by {@link 
InputPartition#createPartitionReader()} and is
- * responsible for outputting data for a RDD partition.
- *
- * Note that, Currently the type `T` can only be {@link 
org.apache.spark.sql.catalyst.InternalRow}
- * for normal data source readers, {@link 
org.apache.spark.sql.vectorized.ColumnarBatch} for data
- * source readers that mix in {@link SupportsScanColumnarBatch}.
- */
-@InterfaceStability.Evolving
-public interface InputPartitionReader<T> extends Closeable {
-
-  /**
-   * Proceed to next record, returns false if there is no more records.
-   *
-   * If this method fails (by throwing an exception), the corresponding Spark 
task would fail and
-   * get retried until hitting the maximum retry times.
-   *
-   * @throws IOException if failure happens during disk/network IO like 
reading files.
-   */
-  boolean next() throws IOException;
-
-  /**
-   * Return the current record. This method should return same value until 
`next` is called.
-   *
-   * If this method fails (by throwing an exception), the corresponding Spark 
task would fail and
-   * get retried until hitting the maximum retry times.
-   */
-  T get();
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/e7548871/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/PartitionReader.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/PartitionReader.java
 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/PartitionReader.java
new file mode 100644
index 0000000..04ff8d0
--- /dev/null
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/PartitionReader.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2.reader;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+import org.apache.spark.annotation.InterfaceStability;
+
+/**
+ * A partition reader returned by {@link 
PartitionReaderFactory#createReader(InputPartition)} or
+ * {@link PartitionReaderFactory#createColumnarReader(InputPartition)}. It's 
responsible for
+ * outputting data for a RDD partition.
+ *
+ * Note that, Currently the type `T` can only be {@link 
org.apache.spark.sql.catalyst.InternalRow}
+ * for normal data sources, or {@link 
org.apache.spark.sql.vectorized.ColumnarBatch} for columnar
+ * data sources(whose {@link 
PartitionReaderFactory#supportColumnarReads(InputPartition)}
+ * returns true).
+ */
+@InterfaceStability.Evolving
+public interface PartitionReader<T> extends Closeable {
+
+  /**
+   * Proceed to next record, returns false if there is no more records.
+   *
+   * @throws IOException if failure happens during disk/network IO like 
reading files.
+   */
+  boolean next() throws IOException;
+
+  /**
+   * Return the current record. This method should return same value until 
`next` is called.
+   */
+  T get();
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/e7548871/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/PartitionReaderFactory.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/PartitionReaderFactory.java
 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/PartitionReaderFactory.java
new file mode 100644
index 0000000..f35de93
--- /dev/null
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/PartitionReaderFactory.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2.reader;
+
+import java.io.Serializable;
+
+import org.apache.spark.annotation.InterfaceStability;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.vectorized.ColumnarBatch;
+
+/**
+ * A factory used to create {@link PartitionReader} instances.
+ *
+ * If Spark fails to execute any methods in the implementations of this 
interface or in the returned
+ * {@link PartitionReader} (by throwing an exception), corresponding Spark 
task would fail and
+ * get retried until hitting the maximum retry times.
+ */
+@InterfaceStability.Evolving
+public interface PartitionReaderFactory extends Serializable {
+
+  /**
+   * Returns a row-based partition reader to read data from the given {@link 
InputPartition}.
+   *
+   * Implementations probably need to cast the input partition to the concrete
+   * {@link InputPartition} class defined for the data source.
+   */
+  PartitionReader<InternalRow> createReader(InputPartition partition);
+
+  /**
+   * Returns a columnar partition reader to read data from the given {@link 
InputPartition}.
+   *
+   * Implementations probably need to cast the input partition to the concrete
+   * {@link InputPartition} class defined for the data source.
+   */
+  default PartitionReader<ColumnarBatch> createColumnarReader(InputPartition 
partition) {
+    throw new UnsupportedOperationException("Cannot create columnar reader.");
+  }
+
+  /**
+   * Returns true if the given {@link InputPartition} should be read by Spark 
in a columnar way.
+   * This means, implementations must also implement {@link 
#createColumnarReader(InputPartition)}
+   * for the input partitions that this method returns true.
+   *
+   * As of Spark 2.4, Spark can only read all input partition in a columnar 
way, or none of them.
+   * Data source can't mix columnar and row-based partitions. This may be 
relaxed in future
+   * versions.
+   */
+  default boolean supportColumnarReads(InputPartition partition) {
+    return false;
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/e7548871/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ReadSupport.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ReadSupport.java
 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ReadSupport.java
new file mode 100644
index 0000000..a58ddb2
--- /dev/null
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ReadSupport.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2.reader;
+
+import org.apache.spark.annotation.InterfaceStability;
+import org.apache.spark.sql.types.StructType;
+
+/**
+ * The base interface for all the batch and streaming read supports. Data 
sources should implement
+ * concrete read support interfaces like {@link BatchReadSupport}.
+ *
+ * If Spark fails to execute any methods in the implementations of this 
interface (by throwing an
+ * exception), the read action will fail and no Spark job will be submitted.
+ */
+@InterfaceStability.Evolving
+public interface ReadSupport {
+
+  /**
+   * Returns the full schema of this data source, which is usually the 
physical schema of the
+   * underlying storage. This full schema should not be affected by column 
pruning or other
+   * optimizations.
+   */
+  StructType fullSchema();
+
+  /**
+   * Returns a list of {@link InputPartition input partitions}. Each {@link 
InputPartition}
+   * represents a data split that can be processed by one Spark task. The 
number of input
+   * partitions returned here is the same as the number of RDD partitions this 
scan outputs.
+   *
+   * Note that, this may not be a full scan if the data source supports 
optimization like filter
+   * push-down. Implementations should check the input {@link ScanConfig} and 
adjust the resulting
+   * {@link InputPartition input partitions}.
+   */
+  InputPartition[] planInputPartitions(ScanConfig config);
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/e7548871/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ScanConfig.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ScanConfig.java 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ScanConfig.java
new file mode 100644
index 0000000..7462ce2
--- /dev/null
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ScanConfig.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2.reader;
+
+import org.apache.spark.annotation.InterfaceStability;
+import org.apache.spark.sql.types.StructType;
+
+/**
+ * An interface that carries query specific information for the data scanning 
job, like operator
+ * pushdown information and streaming query offsets. This is defined as an 
empty interface, and data
+ * sources should define their own {@link ScanConfig} classes.
+ *
+ * For APIs that take a {@link ScanConfig} as input, like
+ * {@link ReadSupport#planInputPartitions(ScanConfig)},
+ * {@link BatchReadSupport#createReaderFactory(ScanConfig)} and
+ * {@link SupportsReportStatistics#estimateStatistics(ScanConfig)}, 
implementations mostly need to
+ * cast the input {@link ScanConfig} to the concrete {@link ScanConfig} class 
of the data source.
+ */
+@InterfaceStability.Evolving
+public interface ScanConfig {
+
+  /**
+   * Returns the actual schema of this data source reader, which may be 
different from the physical
+   * schema of the underlying storage, as column pruning or other 
optimizations may happen.
+   *
+   * If this method fails (by throwing an exception), the action will fail and 
no Spark job will be
+   * submitted.
+   */
+  StructType readSchema();
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/e7548871/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ScanConfigBuilder.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ScanConfigBuilder.java
 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ScanConfigBuilder.java
new file mode 100644
index 0000000..4c0eedf
--- /dev/null
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ScanConfigBuilder.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2.reader;
+
+import org.apache.spark.annotation.InterfaceStability;
+
+/**
+ * An interface for building the {@link ScanConfig}. Implementations can mixin 
those
+ * SupportsPushDownXYZ interfaces to do operator pushdown, and keep the 
operator pushdown result in
+ * the returned {@link ScanConfig}.
+ */
+@InterfaceStability.Evolving
+public interface ScanConfigBuilder {
+  ScanConfig build();
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/e7548871/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/Statistics.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/Statistics.java 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/Statistics.java
index e8cd7ad..44799c7 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/Statistics.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/Statistics.java
@@ -23,7 +23,7 @@ import org.apache.spark.annotation.InterfaceStability;
 
 /**
  * An interface to represent statistics for a data source, which is returned by
- * {@link SupportsReportStatistics#getStatistics()}.
+ * {@link SupportsReportStatistics#estimateStatistics(ScanConfig)}.
  */
 @InterfaceStability.Evolving
 public interface Statistics {

http://git-wip-us.apache.org/repos/asf/spark/blob/e7548871/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsDeprecatedScanRow.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsDeprecatedScanRow.java
 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsDeprecatedScanRow.java
deleted file mode 100644
index 595943c..0000000
--- 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsDeprecatedScanRow.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.sources.v2.reader;
-
-import org.apache.spark.annotation.InterfaceStability;
-import org.apache.spark.sql.Row;
-import org.apache.spark.sql.catalyst.InternalRow;
-
-import java.util.List;
-
-/**
- * A mix-in interface for {@link DataSourceReader}. Data source readers can 
implement this
- * interface to output {@link Row} instead of {@link InternalRow}.
- * This is an experimental and unstable interface.
- */
-@InterfaceStability.Unstable
-public interface SupportsDeprecatedScanRow extends DataSourceReader {
-  default List<InputPartition<InternalRow>> planInputPartitions() {
-    throw new IllegalStateException(
-        "planInputPartitions not supported by default within 
SupportsDeprecatedScanRow");
-  }
-
-  List<InputPartition<Row>> planRowInputPartitions();
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/e7548871/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsPushDownCatalystFilters.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsPushDownCatalystFilters.java
 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsPushDownCatalystFilters.java
index 4543c14..9d79a18 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsPushDownCatalystFilters.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsPushDownCatalystFilters.java
@@ -21,7 +21,7 @@ import org.apache.spark.annotation.InterfaceStability;
 import org.apache.spark.sql.catalyst.expressions.Expression;
 
 /**
- * A mix-in interface for {@link DataSourceReader}. Data source readers can 
implement this
+ * A mix-in interface for {@link ScanConfigBuilder}. Data source readers can 
implement this
  * interface to push down arbitrary expressions as predicates to the data 
source.
  * This is an experimental and unstable interface as {@link Expression} is not 
public and may get
  * changed in the future Spark versions.
@@ -31,7 +31,7 @@ import org.apache.spark.sql.catalyst.expressions.Expression;
  * process this interface.
  */
 @InterfaceStability.Unstable
-public interface SupportsPushDownCatalystFilters extends DataSourceReader {
+public interface SupportsPushDownCatalystFilters extends ScanConfigBuilder {
 
   /**
    * Pushes down filters, and returns filters that need to be evaluated after 
scanning.

http://git-wip-us.apache.org/repos/asf/spark/blob/e7548871/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsPushDownFilters.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsPushDownFilters.java
 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsPushDownFilters.java
index b6a90a3..5d32a8a 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsPushDownFilters.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsPushDownFilters.java
@@ -21,15 +21,15 @@ import org.apache.spark.annotation.InterfaceStability;
 import org.apache.spark.sql.sources.Filter;
 
 /**
- * A mix-in interface for {@link DataSourceReader}. Data source readers can 
implement this
- * interface to push down filters to the data source and reduce the size of 
the data to be read.
+ * A mix-in interface for {@link ScanConfigBuilder}. Data sources can 
implement this interface to
+ * push down filters to the data source and reduce the size of the data to be 
read.
  *
  * Note that, if data source readers implement both this interface and
  * {@link SupportsPushDownCatalystFilters}, Spark will ignore this interface 
and only process
  * {@link SupportsPushDownCatalystFilters}.
  */
 @InterfaceStability.Evolving
-public interface SupportsPushDownFilters extends DataSourceReader {
+public interface SupportsPushDownFilters extends ScanConfigBuilder {
 
   /**
    * Pushes down filters, and returns filters that need to be evaluated after 
scanning.

http://git-wip-us.apache.org/repos/asf/spark/blob/e7548871/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsPushDownRequiredColumns.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsPushDownRequiredColumns.java
 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsPushDownRequiredColumns.java
index 427b4d0..edb1649 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsPushDownRequiredColumns.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsPushDownRequiredColumns.java
@@ -21,12 +21,12 @@ import org.apache.spark.annotation.InterfaceStability;
 import org.apache.spark.sql.types.StructType;
 
 /**
- * A mix-in interface for {@link DataSourceReader}. Data source readers can 
implement this
+ * A mix-in interface for {@link ScanConfigBuilder}. Data sources can 
implement this
  * interface to push down required columns to the data source and only read 
these columns during
  * scan to reduce the size of the data to be read.
  */
 @InterfaceStability.Evolving
-public interface SupportsPushDownRequiredColumns extends DataSourceReader {
+public interface SupportsPushDownRequiredColumns extends ScanConfigBuilder {
 
   /**
    * Applies column pruning w.r.t. the given requiredSchema.
@@ -35,8 +35,8 @@ public interface SupportsPushDownRequiredColumns extends 
DataSourceReader {
    * also OK to do the pruning partially, e.g., a data source may not be able 
to prune nested
    * fields, and only prune top-level columns.
    *
-   * Note that, data source readers should update {@link 
DataSourceReader#readSchema()} after
-   * applying column pruning.
+   * Note that, {@link ScanConfig#readSchema()} implementation should take 
care of the column
+   * pruning applied here.
    */
   void pruneColumns(StructType requiredSchema);
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/e7548871/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsReportPartitioning.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsReportPartitioning.java
 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsReportPartitioning.java
index 6b60da7..db62cd4 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsReportPartitioning.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsReportPartitioning.java
@@ -21,17 +21,17 @@ import org.apache.spark.annotation.InterfaceStability;
 import org.apache.spark.sql.sources.v2.reader.partitioning.Partitioning;
 
 /**
- * A mix in interface for {@link DataSourceReader}. Data source readers can 
implement this
- * interface to report data partitioning and try to avoid shuffle at Spark 
side.
+ * A mix in interface for {@link BatchReadSupport}. Data sources can implement 
this interface to
+ * report data partitioning and try to avoid shuffle at Spark side.
  *
- * Note that, when the reader creates exactly one {@link InputPartition}, 
Spark may avoid
- * adding a shuffle even if the reader does not implement this interface.
+ * Note that, when a {@link ReadSupport} implementation creates exactly one 
{@link InputPartition},
+ * Spark may avoid adding a shuffle even if the reader does not implement this 
interface.
  */
 @InterfaceStability.Evolving
-public interface SupportsReportPartitioning extends DataSourceReader {
+public interface SupportsReportPartitioning extends ReadSupport {
 
   /**
    * Returns the output data partitioning that this reader guarantees.
    */
-  Partitioning outputPartitioning();
+  Partitioning outputPartitioning(ScanConfig config);
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/e7548871/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsReportStatistics.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsReportStatistics.java
 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsReportStatistics.java
index 9263964..1831488 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsReportStatistics.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsReportStatistics.java
@@ -20,18 +20,18 @@ package org.apache.spark.sql.sources.v2.reader;
 import org.apache.spark.annotation.InterfaceStability;
 
 /**
- * A mix in interface for {@link DataSourceReader}. Data source readers can 
implement this
- * interface to report statistics to Spark.
+ * A mix in interface for {@link BatchReadSupport}. Data sources can implement 
this interface to
+ * report statistics to Spark.
  *
- * Statistics are reported to the optimizer before any operator is pushed to 
the DataSourceReader.
- * Implementations that return more accurate statistics based on pushed 
operators will not improve
- * query performance until the planner can push operators before getting stats.
+ * As of Spark 2.4, statistics are reported to the optimizer before any 
operator is pushed to the
+ * data source. Implementations that return more accurate statistics based on 
pushed operators will
+ * not improve query performance until the planner can push operators before 
getting stats.
  */
 @InterfaceStability.Evolving
-public interface SupportsReportStatistics extends DataSourceReader {
+public interface SupportsReportStatistics extends ReadSupport {
 
   /**
-   * Returns the basic statistics of this data source.
+   * Returns the estimated statistics of this data source scan.
    */
-  Statistics getStatistics();
+  Statistics estimateStatistics(ScanConfig config);
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/e7548871/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsScanColumnarBatch.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsScanColumnarBatch.java
 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsScanColumnarBatch.java
deleted file mode 100644
index f4da686..0000000
--- 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsScanColumnarBatch.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.sources.v2.reader;
-
-import java.util.List;
-
-import org.apache.spark.annotation.InterfaceStability;
-import org.apache.spark.sql.catalyst.InternalRow;
-import org.apache.spark.sql.vectorized.ColumnarBatch;
-
-/**
- * A mix-in interface for {@link DataSourceReader}. Data source readers can 
implement this
- * interface to output {@link ColumnarBatch} and make the scan faster.
- */
-@InterfaceStability.Evolving
-public interface SupportsScanColumnarBatch extends DataSourceReader {
-  @Override
-  default List<InputPartition<InternalRow>> planInputPartitions() {
-    throw new IllegalStateException(
-      "planInputPartitions not supported by default within 
SupportsScanColumnarBatch.");
-  }
-
-  /**
-   * Similar to {@link DataSourceReader#planInputPartitions()}, but returns 
columnar data
-   * in batches.
-   */
-  List<InputPartition<ColumnarBatch>> planBatchInputPartitions();
-
-  /**
-   * Returns true if the concrete data source reader can read data in batch 
according to the scan
-   * properties like required columns, pushes filters, etc. It's possible that 
the implementation
-   * can only support some certain columns with certain types. Users can 
overwrite this method and
-   * {@link #planInputPartitions()} to fallback to normal read path under some 
conditions.
-   */
-  default boolean enableBatchRead() {
-    return true;
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/e7548871/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/partitioning/ClusteredDistribution.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/partitioning/ClusteredDistribution.java
 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/partitioning/ClusteredDistribution.java
index 38ca5fc..6764d4b 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/partitioning/ClusteredDistribution.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/partitioning/ClusteredDistribution.java
@@ -18,12 +18,12 @@
 package org.apache.spark.sql.sources.v2.reader.partitioning;
 
 import org.apache.spark.annotation.InterfaceStability;
-import org.apache.spark.sql.sources.v2.reader.InputPartitionReader;
+import org.apache.spark.sql.sources.v2.reader.PartitionReader;
 
 /**
  * A concrete implementation of {@link Distribution}. Represents a 
distribution where records that
  * share the same values for the {@link #clusteredColumns} will be produced by 
the same
- * {@link InputPartitionReader}.
+ * {@link PartitionReader}.
  */
 @InterfaceStability.Evolving
 public class ClusteredDistribution implements Distribution {

http://git-wip-us.apache.org/repos/asf/spark/blob/e7548871/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/partitioning/Distribution.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/partitioning/Distribution.java
 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/partitioning/Distribution.java
index 5e32ba6..364a3f5 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/partitioning/Distribution.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/partitioning/Distribution.java
@@ -18,14 +18,14 @@
 package org.apache.spark.sql.sources.v2.reader.partitioning;
 
 import org.apache.spark.annotation.InterfaceStability;
-import org.apache.spark.sql.sources.v2.reader.InputPartitionReader;
+import org.apache.spark.sql.sources.v2.reader.PartitionReader;
 
 /**
  * An interface to represent data distribution requirement, which specifies 
how the records should
- * be distributed among the data partitions (one {@link InputPartitionReader} 
outputs data for one
+ * be distributed among the data partitions (one {@link PartitionReader} 
outputs data for one
  * partition).
  * Note that this interface has nothing to do with the data ordering inside one
- * partition(the output records of a single {@link InputPartitionReader}).
+ * partition(the output records of a single {@link PartitionReader}).
  *
  * The instance of this interface is created and provided by Spark, then 
consumed by
  * {@link Partitioning#satisfy(Distribution)}. This means data source 
developers don't need to

http://git-wip-us.apache.org/repos/asf/spark/blob/e7548871/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/partitioning/Partitioning.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/partitioning/Partitioning.java
 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/partitioning/Partitioning.java
index f460f6b..fb0b6f1 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/partitioning/Partitioning.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/partitioning/Partitioning.java
@@ -19,12 +19,13 @@ package org.apache.spark.sql.sources.v2.reader.partitioning;
 
 import org.apache.spark.annotation.InterfaceStability;
 import org.apache.spark.sql.sources.v2.reader.InputPartition;
+import org.apache.spark.sql.sources.v2.reader.ScanConfig;
 import org.apache.spark.sql.sources.v2.reader.SupportsReportPartitioning;
 
 /**
  * An interface to represent the output data partitioning for a data source, 
which is returned by
- * {@link SupportsReportPartitioning#outputPartitioning()}. Note that this 
should work like a
- * snapshot. Once created, it should be deterministic and always report the 
same number of
+ * {@link SupportsReportPartitioning#outputPartitioning(ScanConfig)}. Note 
that this should work
+ * like a snapshot. Once created, it should be deterministic and always report 
the same number of
  * partitions and the same "satisfy" result for a certain distribution.
  */
 @InterfaceStability.Evolving


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to