http://git-wip-us.apache.org/repos/asf/spark/blob/e7548871/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/ContinuousInputPartitionReader.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/ContinuousInputPartitionReader.java
 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/ContinuousInputPartitionReader.java
deleted file mode 100644
index 7b0ba0b..0000000
--- 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/ContinuousInputPartitionReader.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.sources.v2.reader.streaming;
-
-import org.apache.spark.annotation.InterfaceStability;
-import org.apache.spark.sql.sources.v2.reader.InputPartitionReader;
-
-/**
- * A variation on {@link InputPartitionReader} for use with streaming in 
continuous processing mode.
- */
-@InterfaceStability.Evolving
-public interface ContinuousInputPartitionReader<T> extends 
InputPartitionReader<T> {
-    /**
-     * Get the offset of the current record, or the start offset if no records 
have been read.
-     *
-     * The execution engine will call this method along with get() to keep 
track of the current
-     * offset. When an epoch ends, the offset of the previous record in each 
partition will be saved
-     * as a restart checkpoint.
-     */
-    PartitionOffset getOffset();
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/e7548871/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/ContinuousPartitionReader.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/ContinuousPartitionReader.java
 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/ContinuousPartitionReader.java
new file mode 100644
index 0000000..9101c8a
--- /dev/null
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/ContinuousPartitionReader.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2.reader.streaming;
+
+import org.apache.spark.annotation.InterfaceStability;
+import org.apache.spark.sql.sources.v2.reader.PartitionReader;
+
+/**
+ * A variation on {@link PartitionReader} for use with continuous streaming 
processing.
+ */
+@InterfaceStability.Evolving
+public interface ContinuousPartitionReader<T> extends PartitionReader<T> {
+
+  /**
+   * Get the offset of the current record, or the start offset if no records 
have been read.
+   *
+   * The execution engine will call this method along with get() to keep track 
of the current
+   * offset. When an epoch ends, the offset of the previous record in each 
partition will be saved
+   * as a restart checkpoint.
+   */
+  PartitionOffset getOffset();
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/e7548871/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/ContinuousPartitionReaderFactory.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/ContinuousPartitionReaderFactory.java
 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/ContinuousPartitionReaderFactory.java
new file mode 100644
index 0000000..2d9f1ca
--- /dev/null
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/ContinuousPartitionReaderFactory.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2.reader.streaming;
+
+import org.apache.spark.annotation.InterfaceStability;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.sources.v2.reader.InputPartition;
+import org.apache.spark.sql.sources.v2.reader.PartitionReaderFactory;
+import org.apache.spark.sql.vectorized.ColumnarBatch;
+
+/**
+ * A variation on {@link PartitionReaderFactory} that returns {@link 
ContinuousPartitionReader}
+ * instead of {@link org.apache.spark.sql.sources.v2.reader.PartitionReader}. 
It's used for
+ * continuous streaming processing.
+ */
+@InterfaceStability.Evolving
+public interface ContinuousPartitionReaderFactory extends 
PartitionReaderFactory {
+  @Override
+  ContinuousPartitionReader<InternalRow> createReader(InputPartition 
partition);
+
+  @Override
+  default ContinuousPartitionReader<ColumnarBatch> 
createColumnarReader(InputPartition partition) {
+    throw new UnsupportedOperationException("Cannot create columnar reader.");
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/e7548871/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/ContinuousReadSupport.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/ContinuousReadSupport.java
 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/ContinuousReadSupport.java
new file mode 100644
index 0000000..9a3ad2e
--- /dev/null
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/ContinuousReadSupport.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2.reader.streaming;
+
+import org.apache.spark.annotation.InterfaceStability;
+import org.apache.spark.sql.execution.streaming.BaseStreamingSource;
+import org.apache.spark.sql.sources.v2.reader.InputPartition;
+import org.apache.spark.sql.sources.v2.reader.ScanConfig;
+import org.apache.spark.sql.sources.v2.reader.ScanConfigBuilder;
+
+/**
+ * An interface that defines how to load the data from data source for 
continuous streaming
+ * processing.
+ *
+ * The execution engine will get an instance of this interface from a data 
source provider
+ * (e.g. {@link 
org.apache.spark.sql.sources.v2.ContinuousReadSupportProvider}) at the start of 
a
+ * streaming query, then call {@link #newScanConfigBuilder(Offset)} and create 
an instance of
+ * {@link ScanConfig} for the duration of the streaming query or until
+ * {@link #needsReconfiguration(ScanConfig)} is true. The {@link ScanConfig} 
will be used to create
+ * input partitions and reader factory to scan data with a Spark job for its 
duration. At the end
+ * {@link #stop()} will be called when the streaming execution is completed. 
Note that a single
+ * query may have multiple executions due to restart or failure recovery.
+ */
+@InterfaceStability.Evolving
+public interface ContinuousReadSupport extends StreamingReadSupport, 
BaseStreamingSource {
+
+  /**
+   * Returns a builder of {@link ScanConfig}. Spark will call this method and 
create a
+   * {@link ScanConfig} for each data scanning job.
+   *
+   * The builder can take some query specific information to do operators 
pushdown, store streaming
+   * offsets, etc., and keep these information in the created {@link 
ScanConfig}.
+   *
+   * This is the first step of the data scan. All other methods in {@link 
ContinuousReadSupport}
+   * needs to take {@link ScanConfig} as an input.
+   */
+  ScanConfigBuilder newScanConfigBuilder(Offset start);
+
+  /**
+   * Returns a factory, which produces one {@link ContinuousPartitionReader} 
for one
+   * {@link InputPartition}.
+   */
+  ContinuousPartitionReaderFactory createContinuousReaderFactory(ScanConfig 
config);
+
+  /**
+   * Merge partitioned offsets coming from {@link ContinuousPartitionReader} 
instances
+   * for each partition to a single global offset.
+   */
+  Offset mergeOffsets(PartitionOffset[] offsets);
+
+  /**
+   * The execution engine will call this method in every epoch to determine if 
new input
+   * partitions need to be generated, which may be required if for example the 
underlying
+   * source system has had partitions added or removed.
+   *
+   * If true, the query will be shut down and restarted with a new {@link 
ContinuousReadSupport}
+   * instance.
+   */
+  default boolean needsReconfiguration(ScanConfig config) {
+    return false;
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/e7548871/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/ContinuousReader.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/ContinuousReader.java
 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/ContinuousReader.java
deleted file mode 100644
index 6e960be..0000000
--- 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/ContinuousReader.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.sources.v2.reader.streaming;
-
-import org.apache.spark.annotation.InterfaceStability;
-import org.apache.spark.sql.execution.streaming.BaseStreamingSource;
-import org.apache.spark.sql.sources.v2.reader.DataSourceReader;
-
-import java.util.Optional;
-
-/**
- * A mix-in interface for {@link DataSourceReader}. Data source readers can 
implement this
- * interface to allow reading in a continuous processing mode stream.
- *
- * Implementations must ensure each partition reader is a {@link 
ContinuousInputPartitionReader}.
- *
- * Note: This class currently extends {@link BaseStreamingSource} to maintain 
compatibility with
- * DataSource V1 APIs. This extension will be removed once we get rid of V1 
completely.
- */
-@InterfaceStability.Evolving
-public interface ContinuousReader extends BaseStreamingSource, 
DataSourceReader {
-    /**
-     * Merge partitioned offsets coming from {@link 
ContinuousInputPartitionReader} instances
-     * for each partition to a single global offset.
-     */
-    Offset mergeOffsets(PartitionOffset[] offsets);
-
-    /**
-     * Deserialize a JSON string into an Offset of the implementation-defined 
offset type.
-     * @throws IllegalArgumentException if the JSON does not encode a valid 
offset for this reader
-     */
-    Offset deserializeOffset(String json);
-
-    /**
-     * Set the desired start offset for partitions created from this reader. 
The scan will
-     * start from the first record after the provided offset, or from an 
implementation-defined
-     * inferred starting point if no offset is provided.
-     */
-    void setStartOffset(Optional<Offset> start);
-
-    /**
-     * Return the specified or inferred start offset for this reader.
-     *
-     * @throws IllegalStateException if setStartOffset has not been called
-     */
-    Offset getStartOffset();
-
-    /**
-     * The execution engine will call this method in every epoch to determine 
if new input
-     * partitions need to be generated, which may be required if for example 
the underlying
-     * source system has had partitions added or removed.
-     *
-     * If true, the query will be shut down and restarted with a new reader.
-     */
-    default boolean needsReconfiguration() {
-        return false;
-    }
-
-    /**
-     * Informs the source that Spark has completed processing all data for 
offsets less than or
-     * equal to `end` and will only request offsets greater than `end` in the 
future.
-     */
-    void commit(Offset end);
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/e7548871/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/MicroBatchReadSupport.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/MicroBatchReadSupport.java
 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/MicroBatchReadSupport.java
new file mode 100644
index 0000000..edb0db1
--- /dev/null
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/MicroBatchReadSupport.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2.reader.streaming;
+
+import org.apache.spark.annotation.InterfaceStability;
+import org.apache.spark.sql.execution.streaming.BaseStreamingSource;
+import org.apache.spark.sql.sources.v2.reader.*;
+
+/**
+ * An interface that defines how to scan the data from data source for 
micro-batch streaming
+ * processing.
+ *
+ * The execution engine will get an instance of this interface from a data 
source provider
+ * (e.g. {@link 
org.apache.spark.sql.sources.v2.MicroBatchReadSupportProvider}) at the start of 
a
+ * streaming query, then call {@link #newScanConfigBuilder(Offset, Offset)} 
and create an instance
+ * of {@link ScanConfig} for each micro-batch. The {@link ScanConfig} will be 
used to create input
+ * partitions and reader factory to scan a micro-batch with a Spark job. At 
the end {@link #stop()}
+ * will be called when the streaming execution is completed. Note that a 
single query may have
+ * multiple executions due to restart or failure recovery.
+ */
+@InterfaceStability.Evolving
+public interface MicroBatchReadSupport extends StreamingReadSupport, 
BaseStreamingSource {
+
+  /**
+   * Returns a builder of {@link ScanConfig}. Spark will call this method and 
create a
+   * {@link ScanConfig} for each data scanning job.
+   *
+   * The builder can take some query specific information to do operators 
pushdown, store streaming
+   * offsets, etc., and keep these information in the created {@link 
ScanConfig}.
+   *
+   * This is the first step of the data scan. All other methods in {@link 
MicroBatchReadSupport}
+   * needs to take {@link ScanConfig} as an input.
+   */
+  ScanConfigBuilder newScanConfigBuilder(Offset start, Offset end);
+
+  /**
+   * Returns a factory, which produces one {@link PartitionReader} for one 
{@link InputPartition}.
+   */
+  PartitionReaderFactory createReaderFactory(ScanConfig config);
+
+  /**
+   * Returns the most recent offset available.
+   */
+  Offset latestOffset();
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/e7548871/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/MicroBatchReader.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/MicroBatchReader.java
 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/MicroBatchReader.java
deleted file mode 100644
index 0159c73..0000000
--- 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/MicroBatchReader.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.sources.v2.reader.streaming;
-
-import org.apache.spark.annotation.InterfaceStability;
-import org.apache.spark.sql.sources.v2.reader.DataSourceReader;
-import org.apache.spark.sql.execution.streaming.BaseStreamingSource;
-
-import java.util.Optional;
-
-/**
- * A mix-in interface for {@link DataSourceReader}. Data source readers can 
implement this
- * interface to indicate they allow micro-batch streaming reads.
- *
- * Note: This class currently extends {@link BaseStreamingSource} to maintain 
compatibility with
- * DataSource V1 APIs. This extension will be removed once we get rid of V1 
completely.
- */
-@InterfaceStability.Evolving
-public interface MicroBatchReader extends DataSourceReader, 
BaseStreamingSource {
-    /**
-     * Set the desired offset range for input partitions created from this 
reader. Partition readers
-     * will generate only data within (`start`, `end`]; that is, from the 
first record after `start`
-     * to the record with offset `end`.
-     *
-     * @param start The initial offset to scan from. If not specified, scan 
from an
-     *              implementation-specified start point, such as the earliest 
available record.
-     * @param end The last offset to include in the scan. If not specified, 
scan up to an
-     *            implementation-defined endpoint, such as the last available 
offset
-     *            or the start offset plus a target batch size.
-     */
-    void setOffsetRange(Optional<Offset> start, Optional<Offset> end);
-
-    /**
-     * Returns the specified (if explicitly set through setOffsetRange) or 
inferred start offset
-     * for this reader.
-     *
-     * @throws IllegalStateException if setOffsetRange has not been called
-     */
-    Offset getStartOffset();
-
-    /**
-     * Return the specified (if explicitly set through setOffsetRange) or 
inferred end offset
-     * for this reader.
-     *
-     * @throws IllegalStateException if setOffsetRange has not been called
-     */
-    Offset getEndOffset();
-
-    /**
-     * Deserialize a JSON string into an Offset of the implementation-defined 
offset type.
-     * @throws IllegalArgumentException if the JSON does not encode a valid 
offset for this reader
-     */
-    Offset deserializeOffset(String json);
-
-    /**
-     * Informs the source that Spark has completed processing all data for 
offsets less than or
-     * equal to `end` and will only request offsets greater than `end` in the 
future.
-     */
-    void commit(Offset end);
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/e7548871/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/Offset.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/Offset.java
 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/Offset.java
index e41c035..6cf2773 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/Offset.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/Offset.java
@@ -20,8 +20,8 @@ package org.apache.spark.sql.sources.v2.reader.streaming;
 import org.apache.spark.annotation.InterfaceStability;
 
 /**
- * An abstract representation of progress through a {@link MicroBatchReader} or
- * {@link ContinuousReader}.
+ * An abstract representation of progress through a {@link 
MicroBatchReadSupport} or
+ * {@link ContinuousReadSupport}.
  * During execution, offsets provided by the data source implementation will 
be logged and used as
  * restart checkpoints. Each source should provide an offset implementation 
which the source can use
  * to reconstruct a position in the stream up to which data has been 
seen/processed.

http://git-wip-us.apache.org/repos/asf/spark/blob/e7548871/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/StreamingReadSupport.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/StreamingReadSupport.java
 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/StreamingReadSupport.java
new file mode 100644
index 0000000..84872d1
--- /dev/null
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/StreamingReadSupport.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2.reader.streaming;
+
+import org.apache.spark.sql.sources.v2.reader.ReadSupport;
+
+/**
+ * A base interface for streaming read support. This is package private and is 
invisible to data
+ * sources. Data sources should implement concrete streaming read support 
interfaces:
+ * {@link MicroBatchReadSupport} or {@link ContinuousReadSupport}.
+ */
+interface StreamingReadSupport extends ReadSupport {
+
+  /**
+   * Returns the initial offset for a streaming query to start reading from. 
Note that the
+   * streaming data source should not assume that it will start reading from 
its initial offset:
+   * if Spark is restarting an existing query, it will restart from the 
check-pointed offset rather
+   * than the initial one.
+   */
+  Offset initialOffset();
+
+  /**
+   * Deserialize a JSON string into an Offset of the implementation-defined 
offset type.
+   *
+   * @throws IllegalArgumentException if the JSON does not encode a valid 
offset for this reader
+   */
+  Offset deserializeOffset(String json);
+
+  /**
+   * Informs the source that Spark has completed processing all data for 
offsets less than or
+   * equal to `end` and will only request offsets greater than `end` in the 
future.
+   */
+  void commit(Offset end);
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/e7548871/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/SupportsCustomReaderMetrics.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/SupportsCustomReaderMetrics.java
 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/SupportsCustomReaderMetrics.java
index 3b293d9..8693154 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/SupportsCustomReaderMetrics.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/SupportsCustomReaderMetrics.java
@@ -14,20 +14,20 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.spark.sql.sources.v2.reader.streaming;
 
 import org.apache.spark.annotation.InterfaceStability;
 import org.apache.spark.sql.sources.v2.CustomMetrics;
-import org.apache.spark.sql.sources.v2.reader.DataSourceReader;
 
 /**
- * A mix in interface for {@link DataSourceReader}. Data source readers can 
implement this
- * interface to report custom metrics that gets reported under the
+ * A mix in interface for {@link StreamingReadSupport}. Data sources can 
implement this interface
+ * to report custom metrics that gets reported under the
  * {@link org.apache.spark.sql.streaming.SourceProgress}
- *
  */
 @InterfaceStability.Evolving
-public interface SupportsCustomReaderMetrics extends DataSourceReader {
+public interface SupportsCustomReaderMetrics extends StreamingReadSupport {
+
   /**
    * Returns custom metrics specific to this data source.
    */

http://git-wip-us.apache.org/repos/asf/spark/blob/e7548871/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/BatchWriteSupport.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/BatchWriteSupport.java
 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/BatchWriteSupport.java
new file mode 100644
index 0000000..0ec9e05
--- /dev/null
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/BatchWriteSupport.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2.writer;
+
+import org.apache.spark.annotation.InterfaceStability;
+
+/**
+ * An interface that defines how to write the data to data source for batch 
processing.
+ *
+ * The writing procedure is:
+ *   1. Create a writer factory by {@link #createBatchWriterFactory()}, 
serialize and send it to all
+ *      the partitions of the input data(RDD).
+ *   2. For each partition, create the data writer, and write the data of the 
partition with this
+ *      writer. If all the data are written successfully, call {@link 
DataWriter#commit()}. If
+ *      exception happens during the writing, call {@link DataWriter#abort()}.
+ *   3. If all writers are successfully committed, call {@link 
#commit(WriterCommitMessage[])}. If
+ *      some writers are aborted, or the job failed with an unknown reason, 
call
+ *      {@link #abort(WriterCommitMessage[])}.
+ *
+ * While Spark will retry failed writing tasks, Spark won't retry failed 
writing jobs. Users should
+ * do it manually in their Spark applications if they want to retry.
+ *
+ * Please refer to the documentation of commit/abort methods for detailed 
specifications.
+ */
+@InterfaceStability.Evolving
+public interface BatchWriteSupport {
+
+  /**
+   * Creates a writer factory which will be serialized and sent to executors.
+   *
+   * If this method fails (by throwing an exception), the action will fail and 
no Spark job will be
+   * submitted.
+   */
+  DataWriterFactory createBatchWriterFactory();
+
+  /**
+   * Returns whether Spark should use the commit coordinator to ensure that at 
most one task for
+   * each partition commits.
+   *
+   * @return true if commit coordinator should be used, false otherwise.
+   */
+  default boolean useCommitCoordinator() {
+    return true;
+  }
+
+  /**
+   * Handles a commit message on receiving from a successful data writer.
+   *
+   * If this method fails (by throwing an exception), this writing job is 
considered to to have been
+   * failed, and {@link #abort(WriterCommitMessage[])} would be called.
+   */
+  default void onDataWriterCommit(WriterCommitMessage message) {}
+
+  /**
+   * Commits this writing job with a list of commit messages. The commit 
messages are collected from
+   * successful data writers and are produced by {@link DataWriter#commit()}.
+   *
+   * If this method fails (by throwing an exception), this writing job is 
considered to to have been
+   * failed, and {@link #abort(WriterCommitMessage[])} would be called. The 
state of the destination
+   * is undefined and @{@link #abort(WriterCommitMessage[])} may not be able 
to deal with it.
+   *
+   * Note that speculative execution may cause multiple tasks to run for a 
partition. By default,
+   * Spark uses the commit coordinator to allow at most one task to commit. 
Implementations can
+   * disable this behavior by overriding {@link #useCommitCoordinator()}. If 
disabled, multiple
+   * tasks may have committed successfully and one successful commit message 
per task will be
+   * passed to this commit method. The remaining commit messages are ignored 
by Spark.
+   */
+  void commit(WriterCommitMessage[] messages);
+
+  /**
+   * Aborts this writing job because some data writers are failed and keep 
failing when retry,
+   * or the Spark job fails with some unknown reasons,
+   * or {@link #onDataWriterCommit(WriterCommitMessage)} fails,
+   * or {@link #commit(WriterCommitMessage[])} fails.
+   *
+   * If this method fails (by throwing an exception), the underlying data 
source may require manual
+   * cleanup.
+   *
+   * Unless the abort is triggered by the failure of commit, the given 
messages should have some
+   * null slots as there maybe only a few data writers that are committed 
before the abort
+   * happens, or some data writers were committed but their commit messages 
haven't reached the
+   * driver when the abort is triggered. So this is just a "best effort" for 
data sources to
+   * clean up the data left by data writers.
+   */
+  void abort(WriterCommitMessage[] messages);
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/e7548871/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataSourceWriter.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataSourceWriter.java
 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataSourceWriter.java
deleted file mode 100644
index 385fc29..0000000
--- 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataSourceWriter.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.sources.v2.writer;
-
-import org.apache.spark.annotation.InterfaceStability;
-import org.apache.spark.sql.SaveMode;
-import org.apache.spark.sql.catalyst.InternalRow;
-import org.apache.spark.sql.sources.v2.DataSourceOptions;
-import org.apache.spark.sql.sources.v2.StreamWriteSupport;
-import org.apache.spark.sql.sources.v2.WriteSupport;
-import org.apache.spark.sql.streaming.OutputMode;
-import org.apache.spark.sql.types.StructType;
-
-/**
- * A data source writer that is returned by
- * {@link WriteSupport#createWriter(String, StructType, SaveMode, 
DataSourceOptions)}/
- * {@link StreamWriteSupport#createStreamWriter(
- * String, StructType, OutputMode, DataSourceOptions)}.
- * It can mix in various writing optimization interfaces to speed up the data 
saving. The actual
- * writing logic is delegated to {@link DataWriter}.
- *
- * If an exception was throw when applying any of these writing optimizations, 
the action will fail
- * and no Spark job will be submitted.
- *
- * The writing procedure is:
- *   1. Create a writer factory by {@link #createWriterFactory()}, serialize 
and send it to all the
- *      partitions of the input data(RDD).
- *   2. For each partition, create the data writer, and write the data of the 
partition with this
- *      writer. If all the data are written successfully, call {@link 
DataWriter#commit()}. If
- *      exception happens during the writing, call {@link DataWriter#abort()}.
- *   3. If all writers are successfully committed, call {@link 
#commit(WriterCommitMessage[])}. If
- *      some writers are aborted, or the job failed with an unknown reason, 
call
- *      {@link #abort(WriterCommitMessage[])}.
- *
- * While Spark will retry failed writing tasks, Spark won't retry failed 
writing jobs. Users should
- * do it manually in their Spark applications if they want to retry.
- *
- * Please refer to the documentation of commit/abort methods for detailed 
specifications.
- */
-@InterfaceStability.Evolving
-public interface DataSourceWriter {
-
-  /**
-   * Creates a writer factory which will be serialized and sent to executors.
-   *
-   * If this method fails (by throwing an exception), the action will fail and 
no Spark job will be
-   * submitted.
-   */
-  DataWriterFactory<InternalRow> createWriterFactory();
-
-  /**
-   * Returns whether Spark should use the commit coordinator to ensure that at 
most one task for
-   * each partition commits.
-   *
-   * @return true if commit coordinator should be used, false otherwise.
-   */
-  default boolean useCommitCoordinator() {
-    return true;
-  }
-
-  /**
-   * Handles a commit message on receiving from a successful data writer.
-   *
-   * If this method fails (by throwing an exception), this writing job is 
considered to to have been
-   * failed, and {@link #abort(WriterCommitMessage[])} would be called.
-   */
-  default void onDataWriterCommit(WriterCommitMessage message) {}
-
-  /**
-   * Commits this writing job with a list of commit messages. The commit 
messages are collected from
-   * successful data writers and are produced by {@link DataWriter#commit()}.
-   *
-   * If this method fails (by throwing an exception), this writing job is 
considered to to have been
-   * failed, and {@link #abort(WriterCommitMessage[])} would be called. The 
state of the destination
-   * is undefined and @{@link #abort(WriterCommitMessage[])} may not be able 
to deal with it.
-   *
-   * Note that speculative execution may cause multiple tasks to run for a 
partition. By default,
-   * Spark uses the commit coordinator to allow at most one task to commit. 
Implementations can
-   * disable this behavior by overriding {@link #useCommitCoordinator()}. If 
disabled, multiple
-   * tasks may have committed successfully and one successful commit message 
per task will be
-   * passed to this commit method. The remaining commit messages are ignored 
by Spark.
-   */
-  void commit(WriterCommitMessage[] messages);
-
-  /**
-   * Aborts this writing job because some data writers are failed and keep 
failing when retry,
-   * or the Spark job fails with some unknown reasons,
-   * or {@link #onDataWriterCommit(WriterCommitMessage)} fails,
-   * or {@link #commit(WriterCommitMessage[])} fails.
-   *
-   * If this method fails (by throwing an exception), the underlying data 
source may require manual
-   * cleanup.
-   *
-   * Unless the abort is triggered by the failure of commit, the given 
messages should have some
-   * null slots as there maybe only a few data writers that are committed 
before the abort
-   * happens, or some data writers were committed but their commit messages 
haven't reached the
-   * driver when the abort is triggered. So this is just a "best effort" for 
data sources to
-   * clean up the data left by data writers.
-   */
-  void abort(WriterCommitMessage[] messages);
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/e7548871/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataWriter.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataWriter.java 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataWriter.java
index 27dc5ea..5fb0679 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataWriter.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataWriter.java
@@ -22,7 +22,7 @@ import java.io.IOException;
 import org.apache.spark.annotation.InterfaceStability;
 
 /**
- * A data writer returned by {@link DataWriterFactory#createDataWriter(int, 
long, long)} and is
+ * A data writer returned by {@link DataWriterFactory#createWriter(int, long)} 
and is
  * responsible for writing data for an input RDD partition.
  *
  * One Spark task has one exclusive data writer, so there is no thread-safe 
concern.
@@ -36,11 +36,11 @@ import org.apache.spark.annotation.InterfaceStability;
  *
  * If this data writer succeeds(all records are successfully written and 
{@link #commit()}
  * succeeds), a {@link WriterCommitMessage} will be sent to the driver side 
and pass to
- * {@link DataSourceWriter#commit(WriterCommitMessage[])} with commit messages 
from other data
+ * {@link BatchWriteSupport#commit(WriterCommitMessage[])} with commit 
messages from other data
  * writers. If this data writer fails(one record fails to write or {@link 
#commit()} fails), an
  * exception will be sent to the driver side, and Spark may retry this writing 
task a few times.
- * In each retry, {@link DataWriterFactory#createDataWriter(int, long, long)} 
will receive a
- * different `taskId`. Spark will call {@link 
DataSourceWriter#abort(WriterCommitMessage[])}
+ * In each retry, {@link DataWriterFactory#createWriter(int, long)} will 
receive a
+ * different `taskId`. Spark will call {@link 
BatchWriteSupport#abort(WriterCommitMessage[])}
  * when the configured number of retries is exhausted.
  *
  * Besides the retry mechanism, Spark may launch speculative tasks if the 
existing writing task
@@ -71,11 +71,11 @@ public interface DataWriter<T> {
   /**
    * Commits this writer after all records are written successfully, returns a 
commit message which
    * will be sent back to driver side and passed to
-   * {@link DataSourceWriter#commit(WriterCommitMessage[])}.
+   * {@link BatchWriteSupport#commit(WriterCommitMessage[])}.
    *
    * The written data should only be visible to data source readers after
-   * {@link DataSourceWriter#commit(WriterCommitMessage[])} succeeds, which 
means this method
-   * should still "hide" the written data and ask the {@link DataSourceWriter} 
at driver side to
+   * {@link BatchWriteSupport#commit(WriterCommitMessage[])} succeeds, which 
means this method
+   * should still "hide" the written data and ask the {@link 
BatchWriteSupport} at driver side to
    * do the final commit via {@link WriterCommitMessage}.
    *
    * If this method fails (by throwing an exception), {@link #abort()} will be 
called and this
@@ -93,7 +93,7 @@ public interface DataWriter<T> {
    * failed.
    *
    * If this method fails(by throwing an exception), the underlying data 
source may have garbage
-   * that need to be cleaned by {@link 
DataSourceWriter#abort(WriterCommitMessage[])} or manually,
+   * that need to be cleaned by {@link 
BatchWriteSupport#abort(WriterCommitMessage[])} or manually,
    * but these garbage should not be visible to data source readers.
    *
    * @throws IOException if failure happens during disk/network IO like 
writing files.

http://git-wip-us.apache.org/repos/asf/spark/blob/e7548871/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataWriterFactory.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataWriterFactory.java
 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataWriterFactory.java
index 3d337b6..19a36dd 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataWriterFactory.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataWriterFactory.java
@@ -19,18 +19,20 @@ package org.apache.spark.sql.sources.v2.writer;
 
 import java.io.Serializable;
 
+import org.apache.spark.TaskContext;
 import org.apache.spark.annotation.InterfaceStability;
+import org.apache.spark.sql.catalyst.InternalRow;
 
 /**
- * A factory of {@link DataWriter} returned by {@link 
DataSourceWriter#createWriterFactory()},
+ * A factory of {@link DataWriter} returned by {@link 
BatchWriteSupport#createBatchWriterFactory()},
  * which is responsible for creating and initializing the actual data writer 
at executor side.
  *
  * Note that, the writer factory will be serialized and sent to executors, 
then the data writer
- * will be created on executors and do the actual writing. So {@link 
DataWriterFactory} must be
+ * will be created on executors and do the actual writing. So this interface 
must be
  * serializable and {@link DataWriter} doesn't need to be.
  */
 @InterfaceStability.Evolving
-public interface DataWriterFactory<T> extends Serializable {
+public interface DataWriterFactory extends Serializable {
 
   /**
    * Returns a data writer to do the actual writing work. Note that, Spark 
will reuse the same data
@@ -38,19 +40,16 @@ public interface DataWriterFactory<T> extends Serializable {
    * are responsible for defensive copies if necessary, e.g. copy the data 
before buffer it in a
    * list.
    *
-   * If this method fails (by throwing an exception), the action will fail and 
no Spark job will be
-   * submitted.
+   * If this method fails (by throwing an exception), the corresponding Spark 
write task would fail
+   * and get retried until hitting the maximum retry times.
    *
    * @param partitionId A unique id of the RDD partition that the returned 
writer will process.
    *                    Usually Spark processes many RDD partitions at the 
same time,
    *                    implementations should use the partition id to 
distinguish writers for
    *                    different partitions.
-   * @param taskId A unique identifier for a task that is performing the write 
of the partition
-   *               data. Spark may run multiple tasks for the same partition 
(due to speculation
-   *               or task failures, for example).
-   * @param epochId A monotonically increasing id for streaming queries that 
are split in to
-   *                discrete periods of execution. For non-streaming queries,
-   *                this ID will always be 0.
+   * @param taskId The task id returned by {@link 
TaskContext#taskAttemptId()}. Spark may run
+   *               multiple tasks for the same partition (due to speculation 
or task failures,
+   *               for example).
    */
-  DataWriter<T> createDataWriter(int partitionId, long taskId, long epochId);
+  DataWriter<InternalRow> createWriter(int partitionId, long taskId);
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/e7548871/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/WriterCommitMessage.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/WriterCommitMessage.java
 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/WriterCommitMessage.java
index 9e38836..123335c 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/WriterCommitMessage.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/WriterCommitMessage.java
@@ -19,15 +19,16 @@ package org.apache.spark.sql.sources.v2.writer;
 
 import java.io.Serializable;
 
+import org.apache.spark.sql.sources.v2.writer.streaming.StreamingWriteSupport;
 import org.apache.spark.annotation.InterfaceStability;
 
 /**
  * A commit message returned by {@link DataWriter#commit()} and will be sent 
back to the driver side
- * as the input parameter of {@link 
DataSourceWriter#commit(WriterCommitMessage[])}.
+ * as the input parameter of {@link 
BatchWriteSupport#commit(WriterCommitMessage[])} or
+ * {@link StreamingWriteSupport#commit(long, WriterCommitMessage[])}.
  *
- * This is an empty interface, data sources should define their own message 
class and use it in
- * their {@link DataWriter#commit()} and {@link 
DataSourceWriter#commit(WriterCommitMessage[])}
- * implementations.
+ * This is an empty interface, data sources should define their own message 
class and use it when
+ * generating messages at executor side and handling the messages at driver 
side.
  */
 @InterfaceStability.Evolving
 public interface WriterCommitMessage extends Serializable {}

http://git-wip-us.apache.org/repos/asf/spark/blob/e7548871/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/streaming/StreamWriter.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/streaming/StreamWriter.java
 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/streaming/StreamWriter.java
deleted file mode 100644
index a316b2a..0000000
--- 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/streaming/StreamWriter.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.sources.v2.writer.streaming;
-
-import org.apache.spark.annotation.InterfaceStability;
-import org.apache.spark.sql.sources.v2.writer.DataSourceWriter;
-import org.apache.spark.sql.sources.v2.writer.DataWriter;
-import org.apache.spark.sql.sources.v2.writer.WriterCommitMessage;
-
-/**
- * A {@link DataSourceWriter} for use with structured streaming.
- *
- * Streaming queries are divided into intervals of data called epochs, with a 
monotonically
- * increasing numeric ID. This writer handles commits and aborts for each 
successive epoch.
- */
-@InterfaceStability.Evolving
-public interface StreamWriter extends DataSourceWriter {
-  /**
-   * Commits this writing job for the specified epoch with a list of commit 
messages. The commit
-   * messages are collected from successful data writers and are produced by
-   * {@link DataWriter#commit()}.
-   *
-   * If this method fails (by throwing an exception), this writing job is 
considered to have been
-   * failed, and the execution engine will attempt to call {@link 
#abort(WriterCommitMessage[])}.
-   *
-   * The execution engine may call commit() multiple times for the same epoch 
in some circumstances.
-   * To support exactly-once data semantics, implementations must ensure that 
multiple commits for
-   * the same epoch are idempotent.
-   */
-  void commit(long epochId, WriterCommitMessage[] messages);
-
-  /**
-   * Aborts this writing job because some data writers are failed and keep 
failing when retried, or
-   * the Spark job fails with some unknown reasons, or {@link 
#commit(WriterCommitMessage[])} fails.
-   *
-   * If this method fails (by throwing an exception), the underlying data 
source may require manual
-   * cleanup.
-   *
-   * Unless the abort is triggered by the failure of commit, the given 
messages will have some
-   * null slots, as there may be only a few data writers that were committed 
before the abort
-   * happens, or some data writers were committed but their commit messages 
haven't reached the
-   * driver when the abort is triggered. So this is just a "best effort" for 
data sources to
-   * clean up the data left by data writers.
-   */
-  void abort(long epochId, WriterCommitMessage[] messages);
-
-  default void commit(WriterCommitMessage[] messages) {
-    throw new UnsupportedOperationException(
-        "Commit without epoch should not be called with StreamWriter");
-  }
-
-  default void abort(WriterCommitMessage[] messages) {
-    throw new UnsupportedOperationException(
-        "Abort without epoch should not be called with StreamWriter");
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/e7548871/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/streaming/StreamingDataWriterFactory.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/streaming/StreamingDataWriterFactory.java
 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/streaming/StreamingDataWriterFactory.java
new file mode 100644
index 0000000..a4da24f
--- /dev/null
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/streaming/StreamingDataWriterFactory.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2.writer.streaming;
+
+import java.io.Serializable;
+
+import org.apache.spark.TaskContext;
+import org.apache.spark.annotation.InterfaceStability;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.sources.v2.writer.DataWriter;
+
+/**
+ * A factory of {@link DataWriter} returned by
+ * {@link StreamingWriteSupport#createStreamingWriterFactory()}, which is 
responsible for creating
+ * and initializing the actual data writer at executor side.
+ *
+ * Note that, the writer factory will be serialized and sent to executors, 
then the data writer
+ * will be created on executors and do the actual writing. So this interface 
must be
+ * serializable and {@link DataWriter} doesn't need to be.
+ */
+@InterfaceStability.Evolving
+public interface StreamingDataWriterFactory extends Serializable {
+
+  /**
+   * Returns a data writer to do the actual writing work. Note that, Spark 
will reuse the same data
+   * object instance when sending data to the data writer, for better 
performance. Data writers
+   * are responsible for defensive copies if necessary, e.g. copy the data 
before buffer it in a
+   * list.
+   *
+   * If this method fails (by throwing an exception), the corresponding Spark 
write task would fail
+   * and get retried until hitting the maximum retry times.
+   *
+   * @param partitionId A unique id of the RDD partition that the returned 
writer will process.
+   *                    Usually Spark processes many RDD partitions at the 
same time,
+   *                    implementations should use the partition id to 
distinguish writers for
+   *                    different partitions.
+   * @param taskId The task id returned by {@link 
TaskContext#taskAttemptId()}. Spark may run
+   *               multiple tasks for the same partition (due to speculation 
or task failures,
+   *               for example).
+   * @param epochId A monotonically increasing id for streaming queries that 
are split in to
+   *                discrete periods of execution.
+   */
+  DataWriter<InternalRow> createWriter(int partitionId, long taskId, long 
epochId);
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/e7548871/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/streaming/StreamingWriteSupport.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/streaming/StreamingWriteSupport.java
 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/streaming/StreamingWriteSupport.java
new file mode 100644
index 0000000..3fdfac5
--- /dev/null
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/streaming/StreamingWriteSupport.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2.writer.streaming;
+
+import org.apache.spark.annotation.InterfaceStability;
+import org.apache.spark.sql.sources.v2.writer.DataWriter;
+import org.apache.spark.sql.sources.v2.writer.WriterCommitMessage;
+
+/**
+ * An interface that defines how to write the data to data source for 
streaming processing.
+ *
+ * Streaming queries are divided into intervals of data called epochs, with a 
monotonically
+ * increasing numeric ID. This writer handles commits and aborts for each 
successive epoch.
+ */
+@InterfaceStability.Evolving
+public interface StreamingWriteSupport {
+
+  /**
+   * Creates a writer factory which will be serialized and sent to executors.
+   *
+   * If this method fails (by throwing an exception), the action will fail and 
no Spark job will be
+   * submitted.
+   */
+  StreamingDataWriterFactory createStreamingWriterFactory();
+
+  /**
+   * Commits this writing job for the specified epoch with a list of commit 
messages. The commit
+   * messages are collected from successful data writers and are produced by
+   * {@link DataWriter#commit()}.
+   *
+   * If this method fails (by throwing an exception), this writing job is 
considered to have been
+   * failed, and the execution engine will attempt to call
+   * {@link #abort(long, WriterCommitMessage[])}.
+   *
+   * The execution engine may call `commit` multiple times for the same epoch 
in some circumstances.
+   * To support exactly-once data semantics, implementations must ensure that 
multiple commits for
+   * the same epoch are idempotent.
+   */
+  void commit(long epochId, WriterCommitMessage[] messages);
+
+  /**
+   * Aborts this writing job because some data writers are failed and keep 
failing when retried, or
+   * the Spark job fails with some unknown reasons, or {@link #commit(long, 
WriterCommitMessage[])}
+   * fails.
+   *
+   * If this method fails (by throwing an exception), the underlying data 
source may require manual
+   * cleanup.
+   *
+   * Unless the abort is triggered by the failure of commit, the given 
messages will have some
+   * null slots, as there may be only a few data writers that were committed 
before the abort
+   * happens, or some data writers were committed but their commit messages 
haven't reached the
+   * driver when the abort is triggered. So this is just a "best effort" for 
data sources to
+   * clean up the data left by data writers.
+   */
+  void abort(long epochId, WriterCommitMessage[] messages);
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/e7548871/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/streaming/SupportsCustomWriterMetrics.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/streaming/SupportsCustomWriterMetrics.java
 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/streaming/SupportsCustomWriterMetrics.java
index 0cd3650..2b018c7 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/streaming/SupportsCustomWriterMetrics.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/streaming/SupportsCustomWriterMetrics.java
@@ -14,20 +14,20 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.spark.sql.sources.v2.writer.streaming;
 
 import org.apache.spark.annotation.InterfaceStability;
 import org.apache.spark.sql.sources.v2.CustomMetrics;
-import org.apache.spark.sql.sources.v2.writer.DataSourceWriter;
 
 /**
- * A mix in interface for {@link DataSourceWriter}. Data source writers can 
implement this
- * interface to report custom metrics that gets reported under the
+ * A mix in interface for {@link StreamingWriteSupport}. Data sources can 
implement this interface
+ * to report custom metrics that gets reported under the
  * {@link org.apache.spark.sql.streaming.SinkProgress}
- *
  */
 @InterfaceStability.Evolving
-public interface SupportsCustomWriterMetrics extends DataSourceWriter {
+public interface SupportsCustomWriterMetrics extends StreamingWriteSupport {
+
   /**
    * Returns custom metrics specific to this data source.
    */

http://git-wip-us.apache.org/repos/asf/spark/blob/e7548871/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
index 5b3b5c2..0cfcc45 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
@@ -37,7 +37,7 @@ import org.apache.spark.sql.execution.datasources.jdbc._
 import org.apache.spark.sql.execution.datasources.json.TextInputJsonDataSource
 import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
 import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Utils
-import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, 
ReadSupport}
+import org.apache.spark.sql.sources.v2.{BatchReadSupportProvider, 
DataSourceOptions, DataSourceV2}
 import org.apache.spark.sql.types.{StringType, StructType}
 import org.apache.spark.unsafe.types.UTF8String
 
@@ -194,7 +194,7 @@ class DataFrameReader private[sql](sparkSession: 
SparkSession) extends Logging {
     val cls = DataSource.lookupDataSource(source, 
sparkSession.sessionState.conf)
     if (classOf[DataSourceV2].isAssignableFrom(cls)) {
       val ds = cls.newInstance().asInstanceOf[DataSourceV2]
-      if (ds.isInstanceOf[ReadSupport]) {
+      if (ds.isInstanceOf[BatchReadSupportProvider]) {
         val sessionOptions = DataSourceV2Utils.extractSessionConfigs(
           ds = ds, conf = sparkSession.sessionState.conf)
         val pathsOption = {

http://git-wip-us.apache.org/repos/asf/spark/blob/e7548871/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index 650c917..eca2d5b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -240,7 +240,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) 
{
     if (classOf[DataSourceV2].isAssignableFrom(cls)) {
       val source = cls.newInstance().asInstanceOf[DataSourceV2]
       source match {
-        case ws: WriteSupport =>
+        case provider: BatchWriteSupportProvider =>
           val options = extraOptions ++
               DataSourceV2Utils.extractSessionConfigs(source, 
df.sparkSession.sessionState.conf)
 
@@ -251,8 +251,10 @@ final class DataFrameWriter[T] private[sql](ds: 
Dataset[T]) {
             }
 
           } else {
-            val writer = ws.createWriter(
-              UUID.randomUUID.toString, df.logicalPlan.output.toStructType, 
mode,
+            val writer = provider.createBatchWriteSupport(
+              UUID.randomUUID().toString,
+              df.logicalPlan.output.toStructType,
+              mode,
               new DataSourceOptions(options.asJava))
 
             if (writer.isPresent) {

http://git-wip-us.apache.org/repos/asf/spark/blob/e7548871/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceRDD.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceRDD.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceRDD.scala
index 7828298..f62f734 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceRDD.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceRDD.scala
@@ -17,19 +17,22 @@
 
 package org.apache.spark.sql.execution.datasources.v2
 
-import scala.reflect.ClassTag
-
-import org.apache.spark.{InterruptibleIterator, Partition, SparkContext, 
TaskContext}
+import org.apache.spark._
 import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.sources.v2.reader.InputPartition
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.sources.v2.reader.{InputPartition, 
PartitionReader, PartitionReaderFactory}
 
-class DataSourceRDDPartition[T : ClassTag](val index: Int, val inputPartition: 
InputPartition[T])
+class DataSourceRDDPartition(val index: Int, val inputPartition: 
InputPartition)
   extends Partition with Serializable
 
-class DataSourceRDD[T: ClassTag](
+// TODO: we should have 2 RDDs: an RDD[InternalRow] for row-based scan, an 
`RDD[ColumnarBatch]` for
+// columnar scan.
+class DataSourceRDD(
     sc: SparkContext,
-    @transient private val inputPartitions: Seq[InputPartition[T]])
-  extends RDD[T](sc, Nil) {
+    @transient private val inputPartitions: Seq[InputPartition],
+    partitionReaderFactory: PartitionReaderFactory,
+    columnarReads: Boolean)
+  extends RDD[InternalRow](sc, Nil) {
 
   override protected def getPartitions: Array[Partition] = {
     inputPartitions.zipWithIndex.map {
@@ -37,11 +40,21 @@ class DataSourceRDD[T: ClassTag](
     }.toArray
   }
 
-  override def compute(split: Partition, context: TaskContext): Iterator[T] = {
-    val reader = split.asInstanceOf[DataSourceRDDPartition[T]].inputPartition
-        .createPartitionReader()
+  private def castPartition(split: Partition): DataSourceRDDPartition = split 
match {
+    case p: DataSourceRDDPartition => p
+    case _ => throw new SparkException(s"[BUG] Not a DataSourceRDDPartition: 
$split")
+  }
+
+  override def compute(split: Partition, context: TaskContext): 
Iterator[InternalRow] = {
+    val inputPartition = castPartition(split).inputPartition
+    val reader: PartitionReader[_] = if (columnarReads) {
+      partitionReaderFactory.createColumnarReader(inputPartition)
+    } else {
+      partitionReaderFactory.createReader(inputPartition)
+    }
+
     context.addTaskCompletionListener[Unit](_ => reader.close())
-    val iter = new Iterator[T] {
+    val iter = new Iterator[Any] {
       private[this] var valuePrepared = false
 
       override def hasNext: Boolean = {
@@ -51,7 +64,7 @@ class DataSourceRDD[T: ClassTag](
         valuePrepared
       }
 
-      override def next(): T = {
+      override def next(): Any = {
         if (!hasNext) {
           throw new java.util.NoSuchElementException("End of stream")
         }
@@ -59,10 +72,11 @@ class DataSourceRDD[T: ClassTag](
         reader.get()
       }
     }
-    new InterruptibleIterator(context, iter)
+    // TODO: SPARK-25083 remove the type erasure hack in data source scan
+    new InterruptibleIterator(context, 
iter.asInstanceOf[Iterator[InternalRow]])
   }
 
   override def getPreferredLocations(split: Partition): Seq[String] = {
-    
split.asInstanceOf[DataSourceRDDPartition[T]].inputPartition.preferredLocations()
+    castPartition(split).inputPartition.preferredLocations()
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/e7548871/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
index a4bfc86..f7e2959 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
@@ -27,21 +27,21 @@ import 
org.apache.spark.sql.catalyst.analysis.{MultiInstanceRelation, NamedRelat
 import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
Expression}
 import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, 
Statistics}
 import org.apache.spark.sql.sources.DataSourceRegister
-import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, 
ReadSupport, WriteSupport}
-import org.apache.spark.sql.sources.v2.reader.{DataSourceReader, 
SupportsReportStatistics}
-import org.apache.spark.sql.sources.v2.writer.DataSourceWriter
+import org.apache.spark.sql.sources.v2.{BatchReadSupportProvider, 
BatchWriteSupportProvider, DataSourceOptions, DataSourceV2}
+import org.apache.spark.sql.sources.v2.reader.{BatchReadSupport, ReadSupport, 
ScanConfigBuilder, SupportsReportStatistics}
+import org.apache.spark.sql.sources.v2.writer.BatchWriteSupport
 import org.apache.spark.sql.types.StructType
 
 /**
  * A logical plan representing a data source v2 scan.
  *
  * @param source An instance of a [[DataSourceV2]] implementation.
- * @param options The options for this scan. Used to create fresh 
[[DataSourceReader]].
- * @param userSpecifiedSchema The user-specified schema for this scan. Used to 
create fresh
- *                            [[DataSourceReader]].
+ * @param options The options for this scan. Used to create fresh 
[[BatchWriteSupport]].
+ * @param userSpecifiedSchema The user-specified schema for this scan.
  */
 case class DataSourceV2Relation(
     source: DataSourceV2,
+    readSupport: BatchReadSupport,
     output: Seq[AttributeReference],
     options: Map[String, String],
     tableIdent: Option[TableIdentifier] = None,
@@ -58,13 +58,12 @@ case class DataSourceV2Relation(
 
   override def simpleString: String = "RelationV2 " + metadataString
 
-  def newReader(): DataSourceReader = source.createReader(options, 
userSpecifiedSchema)
+  def newWriteSupport(): BatchWriteSupport = 
source.createWriteSupport(options, schema)
 
-  def newWriter(): DataSourceWriter = source.createWriter(options, schema)
-
-  override def computeStats(): Statistics = newReader match {
+  override def computeStats(): Statistics = readSupport match {
     case r: SupportsReportStatistics =>
-      Statistics(sizeInBytes = 
r.getStatistics.sizeInBytes().orElse(conf.defaultSizeInBytes))
+      val statistics = 
r.estimateStatistics(readSupport.newScanConfigBuilder().build())
+      Statistics(sizeInBytes = 
statistics.sizeInBytes().orElse(conf.defaultSizeInBytes))
     case _ =>
       Statistics(sizeInBytes = conf.defaultSizeInBytes)
   }
@@ -85,7 +84,8 @@ case class StreamingDataSourceV2Relation(
     output: Seq[AttributeReference],
     source: DataSourceV2,
     options: Map[String, String],
-    reader: DataSourceReader)
+    readSupport: ReadSupport,
+    scanConfigBuilder: ScanConfigBuilder)
   extends LeafNode with MultiInstanceRelation with DataSourceV2StringFormat {
 
   override def isStreaming: Boolean = true
@@ -99,7 +99,8 @@ case class StreamingDataSourceV2Relation(
   // TODO: unify the equal/hashCode implementation for all data source v2 
query plans.
   override def equals(other: Any): Boolean = other match {
     case other: StreamingDataSourceV2Relation =>
-      output == other.output && reader.getClass == other.reader.getClass && 
options == other.options
+      output == other.output && readSupport.getClass == 
other.readSupport.getClass &&
+        options == other.options
     case _ => false
   }
 
@@ -107,9 +108,10 @@ case class StreamingDataSourceV2Relation(
     Seq(output, source, options).hashCode()
   }
 
-  override def computeStats(): Statistics = reader match {
+  override def computeStats(): Statistics = readSupport match {
     case r: SupportsReportStatistics =>
-      Statistics(sizeInBytes = 
r.getStatistics.sizeInBytes().orElse(conf.defaultSizeInBytes))
+      val statistics = r.estimateStatistics(scanConfigBuilder.build())
+      Statistics(sizeInBytes = 
statistics.sizeInBytes().orElse(conf.defaultSizeInBytes))
     case _ =>
       Statistics(sizeInBytes = conf.defaultSizeInBytes)
   }
@@ -117,19 +119,19 @@ case class StreamingDataSourceV2Relation(
 
 object DataSourceV2Relation {
   private implicit class SourceHelpers(source: DataSourceV2) {
-    def asReadSupport: ReadSupport = {
+    def asReadSupportProvider: BatchReadSupportProvider = {
       source match {
-        case support: ReadSupport =>
-          support
+        case provider: BatchReadSupportProvider =>
+          provider
         case _ =>
           throw new AnalysisException(s"Data source is not readable: $name")
       }
     }
 
-    def asWriteSupport: WriteSupport = {
+    def asWriteSupportProvider: BatchWriteSupportProvider = {
       source match {
-        case support: WriteSupport =>
-          support
+        case provider: BatchWriteSupportProvider =>
+          provider
         case _ =>
           throw new AnalysisException(s"Data source is not writable: $name")
       }
@@ -144,23 +146,26 @@ object DataSourceV2Relation {
       }
     }
 
-    def createReader(
+    def createReadSupport(
         options: Map[String, String],
-        userSpecifiedSchema: Option[StructType]): DataSourceReader = {
+        userSpecifiedSchema: Option[StructType]): BatchReadSupport = {
       val v2Options = new DataSourceOptions(options.asJava)
       userSpecifiedSchema match {
         case Some(s) =>
-          asReadSupport.createReader(s, v2Options)
+          asReadSupportProvider.createBatchReadSupport(s, v2Options)
         case _ =>
-          asReadSupport.createReader(v2Options)
+          asReadSupportProvider.createBatchReadSupport(v2Options)
       }
     }
 
-    def createWriter(
+    def createWriteSupport(
         options: Map[String, String],
-        schema: StructType): DataSourceWriter = {
-      val v2Options = new DataSourceOptions(options.asJava)
-      asWriteSupport.createWriter(UUID.randomUUID.toString, schema, 
SaveMode.Append, v2Options).get
+        schema: StructType): BatchWriteSupport = {
+      asWriteSupportProvider.createBatchWriteSupport(
+        UUID.randomUUID().toString,
+        schema,
+        SaveMode.Append,
+        new DataSourceOptions(options.asJava)).get
     }
   }
 
@@ -169,15 +174,16 @@ object DataSourceV2Relation {
       options: Map[String, String],
       tableIdent: Option[TableIdentifier] = None,
       userSpecifiedSchema: Option[StructType] = None): DataSourceV2Relation = {
-    val reader = source.createReader(options, userSpecifiedSchema)
+    val readSupport = source.createReadSupport(options, userSpecifiedSchema)
+    val output = readSupport.fullSchema().toAttributes
     val ident = tableIdent.orElse(tableFromOptions(options))
     DataSourceV2Relation(
-      source, reader.readSchema().toAttributes, options, ident, 
userSpecifiedSchema)
+      source, readSupport, output, options, ident, userSpecifiedSchema)
   }
 
   private def tableFromOptions(options: Map[String, String]): 
Option[TableIdentifier] = {
     options
-        .get(DataSourceOptions.TABLE_KEY)
-        .map(TableIdentifier(_, options.get(DataSourceOptions.DATABASE_KEY)))
+      .get(DataSourceOptions.TABLE_KEY)
+      .map(TableIdentifier(_, options.get(DataSourceOptions.DATABASE_KEY)))
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/e7548871/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala
index c8494f9..04a9773 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala
@@ -17,8 +17,6 @@
 
 package org.apache.spark.sql.execution.datasources.v2
 
-import scala.collection.JavaConverters._
-
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions._
@@ -28,8 +26,7 @@ import org.apache.spark.sql.execution.{ColumnarBatchScan, 
LeafExecNode, WholeSta
 import org.apache.spark.sql.execution.streaming.continuous._
 import org.apache.spark.sql.sources.v2.DataSourceV2
 import org.apache.spark.sql.sources.v2.reader._
-import org.apache.spark.sql.sources.v2.reader.streaming.ContinuousReader
-import org.apache.spark.sql.vectorized.ColumnarBatch
+import 
org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousPartitionReaderFactory,
 ContinuousReadSupport, MicroBatchReadSupport}
 
 /**
  * Physical plan node for scanning data from a data source.
@@ -39,7 +36,8 @@ case class DataSourceV2ScanExec(
     @transient source: DataSourceV2,
     @transient options: Map[String, String],
     @transient pushedFilters: Seq[Expression],
-    @transient reader: DataSourceReader)
+    @transient readSupport: ReadSupport,
+    @transient scanConfig: ScanConfig)
   extends LeafExecNode with DataSourceV2StringFormat with ColumnarBatchScan {
 
   override def simpleString: String = "ScanV2 " + metadataString
@@ -47,7 +45,8 @@ case class DataSourceV2ScanExec(
   // TODO: unify the equal/hashCode implementation for all data source v2 
query plans.
   override def equals(other: Any): Boolean = other match {
     case other: DataSourceV2ScanExec =>
-      output == other.output && reader.getClass == other.reader.getClass && 
options == other.options
+      output == other.output && readSupport.getClass == 
other.readSupport.getClass &&
+        options == other.options
     case _ => false
   }
 
@@ -55,36 +54,39 @@ case class DataSourceV2ScanExec(
     Seq(output, source, options).hashCode()
   }
 
-  override def outputPartitioning: physical.Partitioning = reader match {
-    case r: SupportsScanColumnarBatch if r.enableBatchRead() && 
batchPartitions.size == 1 =>
-      SinglePartition
-
-    case r: SupportsScanColumnarBatch if !r.enableBatchRead() && 
partitions.size == 1 =>
-      SinglePartition
-
-    case r if !r.isInstanceOf[SupportsScanColumnarBatch] && partitions.size == 
1 =>
+  override def outputPartitioning: physical.Partitioning = readSupport match {
+    case _ if partitions.length == 1 =>
       SinglePartition
 
     case s: SupportsReportPartitioning =>
       new DataSourcePartitioning(
-        s.outputPartitioning(), AttributeMap(output.map(a => a -> a.name)))
+        s.outputPartitioning(scanConfig), AttributeMap(output.map(a => a -> 
a.name)))
 
     case _ => super.outputPartitioning
   }
 
-  private lazy val partitions: Seq[InputPartition[InternalRow]] = {
-    reader.planInputPartitions().asScala
+  private lazy val partitions: Seq[InputPartition] = 
readSupport.planInputPartitions(scanConfig)
+
+  private lazy val readerFactory = readSupport match {
+    case r: BatchReadSupport => r.createReaderFactory(scanConfig)
+    case r: MicroBatchReadSupport => r.createReaderFactory(scanConfig)
+    case r: ContinuousReadSupport => 
r.createContinuousReaderFactory(scanConfig)
+    case _ => throw new IllegalStateException("unknown read support: " + 
readSupport)
   }
 
-  private lazy val batchPartitions: Seq[InputPartition[ColumnarBatch]] = 
reader match {
-    case r: SupportsScanColumnarBatch if r.enableBatchRead() =>
-      assert(!reader.isInstanceOf[ContinuousReader],
-        "continuous stream reader does not support columnar read yet.")
-      r.planBatchInputPartitions().asScala
+  // TODO: clean this up when we have dedicated scan plan for continuous 
streaming.
+  override val supportsBatch: Boolean = {
+    require(partitions.forall(readerFactory.supportColumnarReads) ||
+      !partitions.exists(readerFactory.supportColumnarReads),
+      "Cannot mix row-based and columnar input partitions.")
+
+    partitions.exists(readerFactory.supportColumnarReads)
   }
 
-  private lazy val inputRDD: RDD[InternalRow] = reader match {
-    case _: ContinuousReader =>
+  private lazy val inputRDD: RDD[InternalRow] = readSupport match {
+    case _: ContinuousReadSupport =>
+      assert(!supportsBatch,
+        "continuous stream reader does not support columnar read yet.")
       EpochCoordinatorRef.get(
           
sparkContext.getLocalProperty(ContinuousExecution.EPOCH_COORDINATOR_ID_KEY),
           sparkContext.env)
@@ -93,22 +95,17 @@ case class DataSourceV2ScanExec(
         sparkContext,
         sqlContext.conf.continuousStreamingExecutorQueueSize,
         sqlContext.conf.continuousStreamingExecutorPollIntervalMs,
-        partitions).asInstanceOf[RDD[InternalRow]]
-
-    case r: SupportsScanColumnarBatch if r.enableBatchRead() =>
-      new DataSourceRDD(sparkContext, 
batchPartitions).asInstanceOf[RDD[InternalRow]]
+        partitions,
+        schema,
+        readerFactory.asInstanceOf[ContinuousPartitionReaderFactory])
 
     case _ =>
-      new DataSourceRDD(sparkContext, 
partitions).asInstanceOf[RDD[InternalRow]]
+      new DataSourceRDD(
+        sparkContext, partitions, 
readerFactory.asInstanceOf[PartitionReaderFactory], supportsBatch)
   }
 
   override def inputRDDs(): Seq[RDD[InternalRow]] = Seq(inputRDD)
 
-  override val supportsBatch: Boolean = reader match {
-    case r: SupportsScanColumnarBatch if r.enableBatchRead() => true
-    case _ => false
-  }
-
   override protected def needsUnsafeRowConversion: Boolean = false
 
   override protected def doExecute(): RDD[InternalRow] = {

http://git-wip-us.apache.org/repos/asf/spark/blob/e7548871/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
index 6daaa4c..fe713ff 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
@@ -26,8 +26,8 @@ import 
org.apache.spark.sql.catalyst.plans.logical.{AppendData, LogicalPlan, Rep
 import org.apache.spark.sql.execution.{FilterExec, ProjectExec, SparkPlan}
 import org.apache.spark.sql.execution.datasources.DataSourceStrategy
 import 
org.apache.spark.sql.execution.streaming.continuous.{ContinuousCoalesceExec, 
WriteToContinuousDataSource, WriteToContinuousDataSourceExec}
-import org.apache.spark.sql.sources.v2.reader.{DataSourceReader, 
SupportsPushDownCatalystFilters, SupportsPushDownFilters, 
SupportsPushDownRequiredColumns}
-import org.apache.spark.sql.sources.v2.reader.streaming.ContinuousReader
+import org.apache.spark.sql.sources.v2.reader._
+import org.apache.spark.sql.sources.v2.reader.streaming.ContinuousReadSupport
 
 object DataSourceV2Strategy extends Strategy {
 
@@ -37,9 +37,9 @@ object DataSourceV2Strategy extends Strategy {
    * @return pushed filter and post-scan filters.
    */
   private def pushFilters(
-      reader: DataSourceReader,
+      configBuilder: ScanConfigBuilder,
       filters: Seq[Expression]): (Seq[Expression], Seq[Expression]) = {
-    reader match {
+    configBuilder match {
       case r: SupportsPushDownCatalystFilters =>
         val postScanFilters = r.pushCatalystFilters(filters.toArray)
         val pushedFilters = r.pushedCatalystFilters()
@@ -76,41 +76,43 @@ object DataSourceV2Strategy extends Strategy {
   /**
    * Applies column pruning to the data source, w.r.t. the references of the 
given expressions.
    *
-   * @return new output attributes after column pruning.
+   * @return the created `ScanConfig`(since column pruning is the last step of 
operator pushdown),
+   *         and new output attributes after column pruning.
    */
   // TODO: nested column pruning.
   private def pruneColumns(
-      reader: DataSourceReader,
+      configBuilder: ScanConfigBuilder,
       relation: DataSourceV2Relation,
-      exprs: Seq[Expression]): Seq[AttributeReference] = {
-    reader match {
+      exprs: Seq[Expression]): (ScanConfig, Seq[AttributeReference]) = {
+    configBuilder match {
       case r: SupportsPushDownRequiredColumns =>
         val requiredColumns = AttributeSet(exprs.flatMap(_.references))
         val neededOutput = relation.output.filter(requiredColumns.contains)
         if (neededOutput != relation.output) {
           r.pruneColumns(neededOutput.toStructType)
+          val config = r.build()
           val nameToAttr = 
relation.output.map(_.name).zip(relation.output).toMap
-          r.readSchema().toAttributes.map {
+          config -> config.readSchema().toAttributes.map {
             // We have to keep the attribute id during transformation.
             a => a.withExprId(nameToAttr(a.name).exprId)
           }
         } else {
-          relation.output
+          r.build() -> relation.output
         }
 
-      case _ => relation.output
+      case _ => configBuilder.build() -> relation.output
     }
   }
 
 
   override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
     case PhysicalOperation(project, filters, relation: DataSourceV2Relation) =>
-      val reader = relation.newReader()
+      val configBuilder = relation.readSupport.newScanConfigBuilder()
       // `pushedFilters` will be pushed down and evaluated in the underlying 
data sources.
       // `postScanFilters` need to be evaluated after the scan.
       // `postScanFilters` and `pushedFilters` can overlap, e.g. the parquet 
row group filter.
-      val (pushedFilters, postScanFilters) = pushFilters(reader, filters)
-      val output = pruneColumns(reader, relation, project ++ postScanFilters)
+      val (pushedFilters, postScanFilters) = pushFilters(configBuilder, 
filters)
+      val (config, output) = pruneColumns(configBuilder, relation, project ++ 
postScanFilters)
       logInfo(
         s"""
            |Pushing operators to ${relation.source.getClass}
@@ -120,7 +122,12 @@ object DataSourceV2Strategy extends Strategy {
          """.stripMargin)
 
       val scan = DataSourceV2ScanExec(
-        output, relation.source, relation.options, pushedFilters, reader)
+        output,
+        relation.source,
+        relation.options,
+        pushedFilters,
+        relation.readSupport,
+        config)
 
       val filterCondition = postScanFilters.reduceLeftOption(And)
       val withFilter = filterCondition.map(FilterExec(_, scan)).getOrElse(scan)
@@ -129,22 +136,26 @@ object DataSourceV2Strategy extends Strategy {
       ProjectExec(project, withFilter) :: Nil
 
     case r: StreamingDataSourceV2Relation =>
+      // TODO: support operator pushdown for streaming data sources.
+      val scanConfig = r.scanConfigBuilder.build()
       // ensure there is a projection, which will produce unsafe rows required 
by some operators
       ProjectExec(r.output,
-        DataSourceV2ScanExec(r.output, r.source, r.options, r.pushedFilters, 
r.reader)) :: Nil
+        DataSourceV2ScanExec(
+          r.output, r.source, r.options, r.pushedFilters, r.readSupport, 
scanConfig)) :: Nil
 
     case WriteToDataSourceV2(writer, query) =>
       WriteToDataSourceV2Exec(writer, planLater(query)) :: Nil
 
     case AppendData(r: DataSourceV2Relation, query, _) =>
-      WriteToDataSourceV2Exec(r.newWriter(), planLater(query)) :: Nil
+      WriteToDataSourceV2Exec(r.newWriteSupport(), planLater(query)) :: Nil
 
     case WriteToContinuousDataSource(writer, query) =>
       WriteToContinuousDataSourceExec(writer, planLater(query)) :: Nil
 
     case Repartition(1, false, child) =>
-      val isContinuous = child.collectFirst {
-        case StreamingDataSourceV2Relation(_, _, _, r: ContinuousReader) => r
+      val isContinuous = child.find {
+        case s: StreamingDataSourceV2Relation => 
s.readSupport.isInstanceOf[ContinuousReadSupport]
+        case _ => false
       }.isDefined
 
       if (isContinuous) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to