Repository: spark
Updated Branches:
  refs/heads/master 79a4dab62 -> c7307acda


[SPARK-15689][SQL] data source v2 read path

## What changes were proposed in this pull request?

This PR adds the infrastructure for data source v2, and implement features 
which Spark already have in data source v1, i.e. column pruning, filter push 
down, catalyst expression filter push down, InternalRow scan, schema inference, 
data size report. The write path is excluded to avoid making this PR growing 
too big, and will be added in follow-up PR.

## How was this patch tested?

new tests

Author: Wenchen Fan <wenc...@databricks.com>

Closes #19136 from cloud-fan/data-source-v2.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c7307acd
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c7307acd
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c7307acd

Branch: refs/heads/master
Commit: c7307acdad881d98857f0b63328fe9c420ddf9c3
Parents: 79a4dab
Author: Wenchen Fan <wenc...@databricks.com>
Authored: Fri Sep 15 22:18:36 2017 +0800
Committer: Wenchen Fan <wenc...@databricks.com>
Committed: Fri Sep 15 22:18:36 2017 +0800

----------------------------------------------------------------------
 .../spark/sql/sources/v2/DataSourceV2.java      |  31 +++
 .../sql/sources/v2/DataSourceV2Options.java     |  52 +++++
 .../spark/sql/sources/v2/ReadSupport.java       |  38 +++
 .../sql/sources/v2/ReadSupportWithSchema.java   |  47 ++++
 .../spark/sql/sources/v2/reader/DataReader.java |  40 ++++
 .../sources/v2/reader/DataSourceV2Reader.java   |  67 ++++++
 .../spark/sql/sources/v2/reader/ReadTask.java   |  48 ++++
 .../spark/sql/sources/v2/reader/Statistics.java |  32 +++
 .../reader/SupportsPushDownCatalystFilters.java |  43 ++++
 .../v2/reader/SupportsPushDownFilters.java      |  38 +++
 .../reader/SupportsPushDownRequiredColumns.java |  42 ++++
 .../v2/reader/SupportsReportStatistics.java     |  33 +++
 .../v2/reader/SupportsScanUnsafeRow.java        |  49 ++++
 .../org/apache/spark/sql/DataFrameReader.scala  |  46 +++-
 .../spark/sql/execution/SparkPlanner.scala      |   2 +
 .../datasources/v2/DataSourceRDD.scala          |  68 ++++++
 .../datasources/v2/DataSourceV2Relation.scala   |  40 ++++
 .../datasources/v2/DataSourceV2ScanExec.scala   |  89 +++++++
 .../datasources/v2/DataSourceV2Strategy.scala   |  93 ++++++++
 .../sources/v2/JavaAdvancedDataSourceV2.java    | 130 +++++++++++
 .../v2/JavaSchemaRequiredDataSource.java        |  54 +++++
 .../sql/sources/v2/JavaSimpleDataSourceV2.java  |  86 +++++++
 .../sources/v2/JavaUnsafeRowDataSourceV2.java   |  88 +++++++
 .../sources/v2/DataSourceV2OptionsSuite.scala   |  40 ++++
 .../sql/sources/v2/DataSourceV2Suite.scala      | 229 +++++++++++++++++++
 25 files changed, 1518 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/c7307acd/sql/core/src/main/java/org/apache/spark/sql/sources/v2/DataSourceV2.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/DataSourceV2.java 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/DataSourceV2.java
new file mode 100644
index 0000000..dbcbe32
--- /dev/null
+++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/DataSourceV2.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2;
+
+import org.apache.spark.annotation.InterfaceStability;
+
+/**
+ * The base interface for data source v2. Implementations must have a public, 
no arguments
+ * constructor.
+ *
+ * Note that this is an empty interface, data source implementations should 
mix-in at least one of
+ * the plug-in interfaces like {@link ReadSupport}. Otherwise it's just a 
dummy data source which is
+ * un-readable/writable.
+ */
+@InterfaceStability.Evolving
+public interface DataSourceV2 {}

http://git-wip-us.apache.org/repos/asf/spark/blob/c7307acd/sql/core/src/main/java/org/apache/spark/sql/sources/v2/DataSourceV2Options.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/DataSourceV2Options.java
 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/DataSourceV2Options.java
new file mode 100644
index 0000000..9a89c81
--- /dev/null
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/DataSourceV2Options.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2;
+
+import java.util.HashMap;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Optional;
+
+import org.apache.spark.annotation.InterfaceStability;
+
+/**
+ * An immutable string-to-string map in which keys are case-insensitive. This 
is used to represent
+ * data source options.
+ */
+@InterfaceStability.Evolving
+public class DataSourceV2Options {
+  private final Map<String, String> keyLowerCasedMap;
+
+  private String toLowerCase(String key) {
+    return key.toLowerCase(Locale.ROOT);
+  }
+
+  public DataSourceV2Options(Map<String, String> originalMap) {
+    keyLowerCasedMap = new HashMap<>(originalMap.size());
+    for (Map.Entry<String, String> entry : originalMap.entrySet()) {
+      keyLowerCasedMap.put(toLowerCase(entry.getKey()), entry.getValue());
+    }
+  }
+
+  /**
+   * Returns the option value to which the specified key is mapped, 
case-insensitively.
+   */
+  public Optional<String> get(String key) {
+    return Optional.ofNullable(keyLowerCasedMap.get(toLowerCase(key)));
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/c7307acd/sql/core/src/main/java/org/apache/spark/sql/sources/v2/ReadSupport.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/ReadSupport.java 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/ReadSupport.java
new file mode 100644
index 0000000..ab5254a
--- /dev/null
+++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/ReadSupport.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2;
+
+import org.apache.spark.annotation.InterfaceStability;
+import org.apache.spark.sql.sources.v2.reader.DataSourceV2Reader;
+
+/**
+ * A mix-in interface for {@link DataSourceV2}. Data sources can implement 
this interface to
+ * provide data reading ability and scan the data from the data source.
+ */
+@InterfaceStability.Evolving
+public interface ReadSupport {
+
+  /**
+   * Creates a {@link DataSourceV2Reader} to scan the data from this data 
source.
+   *
+   * @param options the options for this data source reader, which is an 
immutable case-insensitive
+   *                string-to-string map.
+   * @return a reader that implements the actual read logic.
+   */
+  DataSourceV2Reader createReader(DataSourceV2Options options);
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/c7307acd/sql/core/src/main/java/org/apache/spark/sql/sources/v2/ReadSupportWithSchema.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/ReadSupportWithSchema.java
 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/ReadSupportWithSchema.java
new file mode 100644
index 0000000..c13aeca
--- /dev/null
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/ReadSupportWithSchema.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2;
+
+import org.apache.spark.annotation.InterfaceStability;
+import org.apache.spark.sql.sources.v2.reader.DataSourceV2Reader;
+import org.apache.spark.sql.types.StructType;
+
+/**
+ * A mix-in interface for {@link DataSourceV2}. Data sources can implement 
this interface to
+ * provide data reading ability and scan the data from the data source.
+ *
+ * This is a variant of {@link ReadSupport} that accepts user-specified schema 
when reading data.
+ * A data source can implement both {@link ReadSupport} and {@link 
ReadSupportWithSchema} if it
+ * supports both schema inference and user-specified schema.
+ */
+@InterfaceStability.Evolving
+public interface ReadSupportWithSchema {
+
+  /**
+   * Create a {@link DataSourceV2Reader} to scan the data from this data 
source.
+   *
+   * @param schema the full schema of this data source reader. Full schema 
usually maps to the
+   *               physical schema of the underlying storage of this data 
source reader, e.g.
+   *               CSV files, JSON files, etc, while this reader may not read 
data with full
+   *               schema, as column pruning or other optimizations may happen.
+   * @param options the options for this data source reader, which is an 
immutable case-insensitive
+   *                string-to-string map.
+   * @return a reader that implements the actual read logic.
+   */
+  DataSourceV2Reader createReader(StructType schema, DataSourceV2Options 
options);
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/c7307acd/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/DataReader.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/DataReader.java 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/DataReader.java
new file mode 100644
index 0000000..cfafc1a
--- /dev/null
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/DataReader.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2.reader;
+
+import java.io.Closeable;
+
+import org.apache.spark.annotation.InterfaceStability;
+
+/**
+ * A data reader returned by {@link ReadTask#createReader()} and is 
responsible for outputting data
+ * for a RDD partition.
+ */
+@InterfaceStability.Evolving
+public interface DataReader<T> extends Closeable {
+
+  /**
+   * Proceed to next record, returns false if there is no more records.
+   */
+  boolean next();
+
+  /**
+   * Return the current record. This method should return same value until 
`next` is called.
+   */
+  T get();
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/c7307acd/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/DataSourceV2Reader.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/DataSourceV2Reader.java
 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/DataSourceV2Reader.java
new file mode 100644
index 0000000..48feb04
--- /dev/null
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/DataSourceV2Reader.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2.reader;
+
+import java.util.List;
+
+import org.apache.spark.annotation.InterfaceStability;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.sources.v2.DataSourceV2Options;
+import org.apache.spark.sql.sources.v2.ReadSupport;
+import org.apache.spark.sql.sources.v2.ReadSupportWithSchema;
+import org.apache.spark.sql.types.StructType;
+
+/**
+ * A data source reader that is returned by
+ * {@link ReadSupport#createReader(DataSourceV2Options)} or
+ * {@link ReadSupportWithSchema#createReader(StructType, DataSourceV2Options)}.
+ * It can mix in various query optimization interfaces to speed up the data 
scan. The actual scan
+ * logic should be delegated to {@link ReadTask}s that are returned by {@link 
#createReadTasks()}.
+ *
+ * There are mainly 3 kinds of query optimizations:
+ *   1. Operators push-down. E.g., filter push-down, required columns 
push-down(aka column
+ *      pruning), etc. These push-down interfaces are named like 
`SupportsPushDownXXX`.
+ *   2. Information Reporting. E.g., statistics reporting, ordering reporting, 
etc. These
+ *      reporting interfaces are named like `SupportsReportingXXX`.
+ *   3. Special scans. E.g, columnar scan, unsafe row scan, etc. These scan 
interfaces are named
+ *      like `SupportsScanXXX`.
+ *
+ * Spark first applies all operator push-down optimizations that this data 
source supports. Then
+ * Spark collects information this data source reported for further 
optimizations. Finally Spark
+ * issues the scan request and does the actual data reading.
+ */
+@InterfaceStability.Evolving
+public interface DataSourceV2Reader {
+
+  /**
+   * Returns the actual schema of this data source reader, which may be 
different from the physical
+   * schema of the underlying storage, as column pruning or other 
optimizations may happen.
+   */
+  StructType readSchema();
+
+  /**
+   * Returns a list of read tasks. Each task is responsible for outputting 
data for one RDD
+   * partition. That means the number of tasks returned here is same as the 
number of RDD
+   * partitions this scan outputs.
+   *
+   * Note that, this may not be a full scan if the data source reader mixes in 
other optimization
+   * interfaces like column pruning, filter push-down, etc. These 
optimizations are applied before
+   * Spark issues the scan request.
+   */
+  List<ReadTask<Row>> createReadTasks();
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/c7307acd/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ReadTask.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ReadTask.java 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ReadTask.java
new file mode 100644
index 0000000..7885bfc
--- /dev/null
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ReadTask.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2.reader;
+
+import java.io.Serializable;
+
+import org.apache.spark.annotation.InterfaceStability;
+
+/**
+ * A read task returned by {@link DataSourceV2Reader#createReadTasks()} and is 
responsible for
+ * creating the actual data reader. The relationship between {@link ReadTask} 
and {@link DataReader}
+ * is similar to the relationship between {@link Iterable} and {@link 
java.util.Iterator}.
+ *
+ * Note that, the read task will be serialized and sent to executors, then the 
data reader will be
+ * created on executors and do the actual reading.
+ */
+@InterfaceStability.Evolving
+public interface ReadTask<T> extends Serializable {
+
+  /**
+   * The preferred locations where this read task can run faster, but Spark 
does not guarantee that
+   * this task will always run on these locations. The implementations should 
make sure that it can
+   * be run on any location. The location is a string representing the host 
name of an executor.
+   */
+  default String[] preferredLocations() {
+    return new String[0];
+  }
+
+  /**
+   * Returns a data reader to do the actual reading work for this read task.
+   */
+  DataReader<T> createReader();
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/c7307acd/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/Statistics.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/Statistics.java 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/Statistics.java
new file mode 100644
index 0000000..e8cd7ad
--- /dev/null
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/Statistics.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2.reader;
+
+import java.util.OptionalLong;
+
+import org.apache.spark.annotation.InterfaceStability;
+
+/**
+ * An interface to represent statistics for a data source, which is returned by
+ * {@link SupportsReportStatistics#getStatistics()}.
+ */
+@InterfaceStability.Evolving
+public interface Statistics {
+  OptionalLong sizeInBytes();
+  OptionalLong numRows();
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/c7307acd/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsPushDownCatalystFilters.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsPushDownCatalystFilters.java
 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsPushDownCatalystFilters.java
new file mode 100644
index 0000000..19d7062
--- /dev/null
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsPushDownCatalystFilters.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2.reader;
+
+import org.apache.spark.annotation.Experimental;
+import org.apache.spark.annotation.InterfaceStability;
+import org.apache.spark.sql.catalyst.expressions.Expression;
+
+/**
+ * A mix-in interface for {@link DataSourceV2Reader}. Data source readers can 
implement this
+ * interface to push down arbitrary expressions as predicates to the data 
source.
+ * This is an experimental and unstable interface as {@link Expression} is not 
public and may get
+ * changed in the future Spark versions.
+ *
+ * Note that, if data source readers implement both this interface and
+ * {@link SupportsPushDownFilters}, Spark will ignore {@link 
SupportsPushDownFilters} and only
+ * process this interface.
+ */
+@InterfaceStability.Evolving
+@Experimental
+@InterfaceStability.Unstable
+public interface SupportsPushDownCatalystFilters {
+
+  /**
+   * Pushes down filters, and returns unsupported filters.
+   */
+  Expression[] pushCatalystFilters(Expression[] filters);
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/c7307acd/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsPushDownFilters.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsPushDownFilters.java
 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsPushDownFilters.java
new file mode 100644
index 0000000..d4b509e
--- /dev/null
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsPushDownFilters.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2.reader;
+
+import org.apache.spark.annotation.InterfaceStability;
+import org.apache.spark.sql.sources.Filter;
+
+/**
+ * A mix-in interface for {@link DataSourceV2Reader}. Data source readers can 
implement this
+ * interface to push down filters to the data source and reduce the size of 
the data to be read.
+ *
+ * Note that, if data source readers implement both this interface and
+ * {@link SupportsPushDownCatalystFilters}, Spark will ignore this interface 
and only process
+ * {@link SupportsPushDownCatalystFilters}.
+ */
+@InterfaceStability.Evolving
+public interface SupportsPushDownFilters {
+
+  /**
+   * Pushes down filters, and returns unsupported filters.
+   */
+  Filter[] pushFilters(Filter[] filters);
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/c7307acd/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsPushDownRequiredColumns.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsPushDownRequiredColumns.java
 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsPushDownRequiredColumns.java
new file mode 100644
index 0000000..fe0ac8e
--- /dev/null
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsPushDownRequiredColumns.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2.reader;
+
+import org.apache.spark.annotation.InterfaceStability;
+import org.apache.spark.sql.types.StructType;
+
+/**
+ * A mix-in interface for {@link DataSourceV2Reader}. Data source readers can 
implement this
+ * interface to push down required columns to the data source and only read 
these columns during
+ * scan to reduce the size of the data to be read.
+ */
+@InterfaceStability.Evolving
+public interface SupportsPushDownRequiredColumns {
+
+  /**
+   * Applies column pruning w.r.t. the given requiredSchema.
+   *
+   * Implementation should try its best to prune the unnecessary columns or 
nested fields, but it's
+   * also OK to do the pruning partially, e.g., a data source may not be able 
to prune nested
+   * fields, and only prune top-level columns.
+   *
+   * Note that, data source readers should update {@link 
DataSourceV2Reader#readSchema()} after
+   * applying column pruning.
+   */
+  void pruneColumns(StructType requiredSchema);
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/c7307acd/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsReportStatistics.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsReportStatistics.java
 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsReportStatistics.java
new file mode 100644
index 0000000..c019d2f
--- /dev/null
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsReportStatistics.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2.reader;
+
+import org.apache.spark.annotation.InterfaceStability;
+
+/**
+ * A mix in interface for {@link DataSourceV2Reader}. Data source readers can 
implement this
+ * interface to report statistics to Spark.
+ */
+@InterfaceStability.Evolving
+public interface SupportsReportStatistics {
+
+  /**
+   * Returns the basic statistics of this data source.
+   */
+  Statistics getStatistics();
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/c7307acd/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsScanUnsafeRow.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsScanUnsafeRow.java
 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsScanUnsafeRow.java
new file mode 100644
index 0000000..829f9a0
--- /dev/null
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsScanUnsafeRow.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2.reader;
+
+import java.util.List;
+
+import org.apache.spark.annotation.Experimental;
+import org.apache.spark.annotation.InterfaceStability;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow;
+import org.apache.spark.sql.sources.v2.reader.DataSourceV2Reader;
+import org.apache.spark.sql.sources.v2.reader.ReadTask;
+
+/**
+ * A mix-in interface for {@link DataSourceV2Reader}. Data source readers can 
implement this
+ * interface to output {@link UnsafeRow} directly and avoid the row copy at 
Spark side.
+ * This is an experimental and unstable interface, as {@link UnsafeRow} is not 
public and may get
+ * changed in the future Spark versions.
+ */
+@InterfaceStability.Evolving
+@Experimental
+@InterfaceStability.Unstable
+public interface SupportsScanUnsafeRow extends DataSourceV2Reader {
+
+  @Override
+  default List<ReadTask<Row>> createReadTasks() {
+    throw new IllegalStateException("createReadTasks should not be called with 
SupportsScanUnsafeRow.");
+  }
+
+  /**
+   * Similar to {@link DataSourceV2Reader#createReadTasks()}, but returns data 
in unsafe row format.
+   */
+  List<ReadTask<UnsafeRow>> createUnsafeRowReadTasks();
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/c7307acd/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
index c69acc4..78b668c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
@@ -32,6 +32,8 @@ import 
org.apache.spark.sql.execution.datasources.{DataSource, FailureSafeParser
 import org.apache.spark.sql.execution.datasources.csv._
 import org.apache.spark.sql.execution.datasources.jdbc._
 import org.apache.spark.sql.execution.datasources.json.TextInputJsonDataSource
+import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
+import org.apache.spark.sql.sources.v2.{DataSourceV2, DataSourceV2Options, 
ReadSupport, ReadSupportWithSchema}
 import org.apache.spark.sql.types.{StringType, StructType}
 import org.apache.spark.unsafe.types.UTF8String
 
@@ -180,13 +182,43 @@ class DataFrameReader private[sql](sparkSession: 
SparkSession) extends Logging {
         "read files of Hive data source directly.")
     }
 
-    sparkSession.baseRelationToDataFrame(
-      DataSource.apply(
-        sparkSession,
-        paths = paths,
-        userSpecifiedSchema = userSpecifiedSchema,
-        className = source,
-        options = extraOptions.toMap).resolveRelation())
+    val cls = DataSource.lookupDataSource(source)
+    if (classOf[DataSourceV2].isAssignableFrom(cls)) {
+      val dataSource = cls.newInstance()
+      val options = new DataSourceV2Options(extraOptions.asJava)
+
+      val reader = (cls.newInstance(), userSpecifiedSchema) match {
+        case (ds: ReadSupportWithSchema, Some(schema)) =>
+          ds.createReader(schema, options)
+
+        case (ds: ReadSupport, None) =>
+          ds.createReader(options)
+
+        case (_: ReadSupportWithSchema, None) =>
+          throw new AnalysisException(s"A schema needs to be specified when 
using $dataSource.")
+
+        case (ds: ReadSupport, Some(schema)) =>
+          val reader = ds.createReader(options)
+          if (reader.readSchema() != schema) {
+            throw new AnalysisException(s"$ds does not allow user-specified 
schemas.")
+          }
+          reader
+
+        case _ =>
+          throw new AnalysisException(s"$cls does not support data reading.")
+      }
+
+      Dataset.ofRows(sparkSession, DataSourceV2Relation(reader))
+    } else {
+      // Code path for data source v1.
+      sparkSession.baseRelationToDataFrame(
+        DataSource.apply(
+          sparkSession,
+          paths = paths,
+          userSpecifiedSchema = userSpecifiedSchema,
+          className = source,
+          options = extraOptions.toMap).resolveRelation())
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/c7307acd/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanner.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanner.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanner.scala
index 4e718d6..b143d44 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanner.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanner.scala
@@ -22,6 +22,7 @@ import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution.datasources.{DataSourceStrategy, 
FileSourceStrategy}
+import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Strategy
 import org.apache.spark.sql.internal.SQLConf
 
 class SparkPlanner(
@@ -35,6 +36,7 @@ class SparkPlanner(
   def strategies: Seq[Strategy] =
     experimentalMethods.extraStrategies ++
       extraPlanningStrategies ++ (
+      DataSourceV2Strategy ::
       FileSourceStrategy ::
       DataSourceStrategy(conf) ::
       SpecialLimits ::

http://git-wip-us.apache.org/repos/asf/spark/blob/c7307acd/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceRDD.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceRDD.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceRDD.scala
new file mode 100644
index 0000000..b8fe5ac
--- /dev/null
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceRDD.scala
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.v2
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.{InterruptibleIterator, Partition, SparkContext, 
TaskContext}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import org.apache.spark.sql.sources.v2.reader.ReadTask
+
+class DataSourceRDDPartition(val index: Int, val readTask: ReadTask[UnsafeRow])
+  extends Partition with Serializable
+
+class DataSourceRDD(
+    sc: SparkContext,
+    @transient private val readTasks: java.util.List[ReadTask[UnsafeRow]])
+  extends RDD[UnsafeRow](sc, Nil) {
+
+  override protected def getPartitions: Array[Partition] = {
+    readTasks.asScala.zipWithIndex.map {
+      case (readTask, index) => new DataSourceRDDPartition(index, readTask)
+    }.toArray
+  }
+
+  override def compute(split: Partition, context: TaskContext): 
Iterator[UnsafeRow] = {
+    val reader = 
split.asInstanceOf[DataSourceRDDPartition].readTask.createReader()
+    context.addTaskCompletionListener(_ => reader.close())
+    val iter = new Iterator[UnsafeRow] {
+      private[this] var valuePrepared = false
+
+      override def hasNext: Boolean = {
+        if (!valuePrepared) {
+          valuePrepared = reader.next()
+        }
+        valuePrepared
+      }
+
+      override def next(): UnsafeRow = {
+        if (!hasNext) {
+          throw new java.util.NoSuchElementException("End of stream")
+        }
+        valuePrepared = false
+        reader.get()
+      }
+    }
+    new InterruptibleIterator(context, iter)
+  }
+
+  override def getPreferredLocations(split: Partition): Seq[String] = {
+    split.asInstanceOf[DataSourceRDDPartition].readTask.preferredLocations()
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/c7307acd/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
new file mode 100644
index 0000000..3c9b598
--- /dev/null
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.v2
+
+import org.apache.spark.sql.catalyst.expressions.AttributeReference
+import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Statistics}
+import org.apache.spark.sql.sources.v2.reader.{DataSourceV2Reader, 
SupportsReportStatistics}
+
+case class DataSourceV2Relation(
+    output: Seq[AttributeReference],
+    reader: DataSourceV2Reader) extends LeafNode {
+
+  override def computeStats(): Statistics = reader match {
+    case r: SupportsReportStatistics =>
+      Statistics(sizeInBytes = 
r.getStatistics.sizeInBytes().orElse(conf.defaultSizeInBytes))
+    case _ =>
+      Statistics(sizeInBytes = conf.defaultSizeInBytes)
+  }
+}
+
+object DataSourceV2Relation {
+  def apply(reader: DataSourceV2Reader): DataSourceV2Relation = {
+    new DataSourceV2Relation(reader.readSchema().toAttributes, reader)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/c7307acd/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala
new file mode 100644
index 0000000..7999c0c
--- /dev/null
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.v2
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.encoders.{ExpressionEncoder, RowEncoder}
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.execution.LeafExecNode
+import org.apache.spark.sql.execution.metric.SQLMetrics
+import org.apache.spark.sql.sources.v2.reader._
+import org.apache.spark.sql.types.StructType
+
+case class DataSourceV2ScanExec(
+    fullOutput: Array[AttributeReference],
+    @transient reader: DataSourceV2Reader,
+    // TODO: these 3 parameters are only used to determine the equality of the 
scan node, however,
+    // the reader also have this information, and ideally we can just rely on 
the equality of the
+    // reader. The only concern is, the reader implementation is outside of 
Spark and we have no
+    // control.
+    readSchema: StructType,
+    @transient filters: ExpressionSet,
+    hashPartitionKeys: Seq[String]) extends LeafExecNode {
+
+  def output: Seq[Attribute] = readSchema.map(_.name).map { name =>
+    fullOutput.find(_.name == name).get
+  }
+
+  override def references: AttributeSet = AttributeSet.empty
+
+  override lazy val metrics = Map(
+    "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output 
rows"))
+
+  override protected def doExecute(): RDD[InternalRow] = {
+    val readTasks: java.util.List[ReadTask[UnsafeRow]] = reader match {
+      case r: SupportsScanUnsafeRow => r.createUnsafeRowReadTasks()
+      case _ =>
+        reader.createReadTasks().asScala.map {
+          new RowToUnsafeRowReadTask(_, reader.readSchema()): 
ReadTask[UnsafeRow]
+        }.asJava
+    }
+
+    val inputRDD = new DataSourceRDD(sparkContext, readTasks)
+      .asInstanceOf[RDD[InternalRow]]
+    val numOutputRows = longMetric("numOutputRows")
+    inputRDD.map { r =>
+      numOutputRows += 1
+      r
+    }
+  }
+}
+
+class RowToUnsafeRowReadTask(rowReadTask: ReadTask[Row], schema: StructType)
+  extends ReadTask[UnsafeRow] {
+
+  override def preferredLocations: Array[String] = 
rowReadTask.preferredLocations
+
+  override def createReader: DataReader[UnsafeRow] = {
+    new RowToUnsafeDataReader(rowReadTask.createReader, 
RowEncoder.apply(schema))
+  }
+}
+
+class RowToUnsafeDataReader(rowReader: DataReader[Row], encoder: 
ExpressionEncoder[Row])
+  extends DataReader[UnsafeRow] {
+
+  override def next: Boolean = rowReader.next
+
+  override def get: UnsafeRow = 
encoder.toRow(rowReader.get).asInstanceOf[UnsafeRow]
+
+  override def close(): Unit = rowReader.close()
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/c7307acd/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
new file mode 100644
index 0000000..b80f695
--- /dev/null
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.v2
+
+import org.apache.spark.sql.Strategy
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.planning.PhysicalOperation
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.execution.{FilterExec, ProjectExec, SparkPlan}
+import org.apache.spark.sql.execution.datasources.DataSourceStrategy
+import org.apache.spark.sql.sources.Filter
+import org.apache.spark.sql.sources.v2.reader._
+
+object DataSourceV2Strategy extends Strategy {
+  // TODO: write path
+  override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
+    case PhysicalOperation(projects, filters, DataSourceV2Relation(output, 
reader)) =>
+      val stayUpFilters: Seq[Expression] = reader match {
+        case r: SupportsPushDownCatalystFilters =>
+          r.pushCatalystFilters(filters.toArray)
+
+        case r: SupportsPushDownFilters =>
+          // A map from original Catalyst expressions to corresponding 
translated data source
+          // filters. If a predicate is not in this map, it means it cannot be 
pushed down.
+          val translatedMap: Map[Expression, Filter] = filters.flatMap { p =>
+            DataSourceStrategy.translateFilter(p).map(f => p -> f)
+          }.toMap
+
+          // Catalyst predicate expressions that cannot be converted to data 
source filters.
+          val nonConvertiblePredicates = 
filters.filterNot(translatedMap.contains)
+
+          // Data source filters that cannot be pushed down. An unhandled 
filter means
+          // the data source cannot guarantee the rows returned can pass the 
filter.
+          // As a result we must return it so Spark can plan an extra filter 
operator.
+          val unhandledFilters = 
r.pushFilters(translatedMap.values.toArray).toSet
+          val unhandledPredicates = translatedMap.filter { case (_, f) =>
+            unhandledFilters.contains(f)
+          }.keys
+
+          nonConvertiblePredicates ++ unhandledPredicates
+
+        case _ => filters
+      }
+
+      val attrMap = AttributeMap(output.zip(output))
+      val projectSet = AttributeSet(projects.flatMap(_.references))
+      val filterSet = AttributeSet(stayUpFilters.flatMap(_.references))
+
+      // Match original case of attributes.
+      // TODO: nested fields pruning
+      val requiredColumns = (projectSet ++ filterSet).toSeq.map(attrMap)
+      reader match {
+        case r: SupportsPushDownRequiredColumns =>
+          r.pruneColumns(requiredColumns.toStructType)
+        case _ =>
+      }
+
+      val scan = DataSourceV2ScanExec(
+        output.toArray,
+        reader,
+        reader.readSchema(),
+        ExpressionSet(filters),
+        Nil)
+
+      val filterCondition = stayUpFilters.reduceLeftOption(And)
+      val withFilter = filterCondition.map(FilterExec(_, scan)).getOrElse(scan)
+
+      val withProject = if (projects == withFilter.output) {
+        withFilter
+      } else {
+        ProjectExec(projects, withFilter)
+      }
+
+      withProject :: Nil
+
+    case _ => Nil
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/c7307acd/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaAdvancedDataSourceV2.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaAdvancedDataSourceV2.java
 
b/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaAdvancedDataSourceV2.java
new file mode 100644
index 0000000..50900e9
--- /dev/null
+++ 
b/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaAdvancedDataSourceV2.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package test.org.apache.spark.sql.sources.v2;
+
+import java.io.IOException;
+import java.util.*;
+
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.catalyst.expressions.GenericRow;
+import org.apache.spark.sql.sources.Filter;
+import org.apache.spark.sql.sources.GreaterThan;
+import org.apache.spark.sql.sources.v2.DataSourceV2;
+import org.apache.spark.sql.sources.v2.DataSourceV2Options;
+import org.apache.spark.sql.sources.v2.ReadSupport;
+import org.apache.spark.sql.sources.v2.reader.*;
+import org.apache.spark.sql.types.StructType;
+
+public class JavaAdvancedDataSourceV2 implements DataSourceV2, ReadSupport {
+
+  class Reader implements DataSourceV2Reader, SupportsPushDownRequiredColumns, 
SupportsPushDownFilters {
+    private StructType requiredSchema = new StructType().add("i", 
"int").add("j", "int");
+    private Filter[] filters = new Filter[0];
+
+    @Override
+    public StructType readSchema() {
+      return requiredSchema;
+    }
+
+    @Override
+    public void pruneColumns(StructType requiredSchema) {
+      this.requiredSchema = requiredSchema;
+    }
+
+    @Override
+    public Filter[] pushFilters(Filter[] filters) {
+      this.filters = filters;
+      return new Filter[0];
+    }
+
+    @Override
+    public List<ReadTask<Row>> createReadTasks() {
+      List<ReadTask<Row>> res = new ArrayList<>();
+
+      Integer lowerBound = null;
+      for (Filter filter : filters) {
+        if (filter instanceof GreaterThan) {
+          GreaterThan f = (GreaterThan) filter;
+          if ("i".equals(f.attribute()) && f.value() instanceof Integer) {
+            lowerBound = (Integer) f.value();
+            break;
+          }
+        }
+      }
+
+      if (lowerBound == null) {
+        res.add(new JavaAdvancedReadTask(0, 5, requiredSchema));
+        res.add(new JavaAdvancedReadTask(5, 10, requiredSchema));
+      } else if (lowerBound < 4) {
+        res.add(new JavaAdvancedReadTask(lowerBound + 1, 5, requiredSchema));
+        res.add(new JavaAdvancedReadTask(5, 10, requiredSchema));
+      } else if (lowerBound < 9) {
+        res.add(new JavaAdvancedReadTask(lowerBound + 1, 10, requiredSchema));
+      }
+
+      return res;
+    }
+  }
+
+  static class JavaAdvancedReadTask implements ReadTask<Row>, DataReader<Row> {
+    private int start;
+    private int end;
+    private StructType requiredSchema;
+
+    JavaAdvancedReadTask(int start, int end, StructType requiredSchema) {
+      this.start = start;
+      this.end = end;
+      this.requiredSchema = requiredSchema;
+    }
+
+    @Override
+    public DataReader<Row> createReader() {
+      return new JavaAdvancedReadTask(start - 1, end, requiredSchema);
+    }
+
+    @Override
+    public boolean next() {
+      start += 1;
+      return start < end;
+    }
+
+    @Override
+    public Row get() {
+      Object[] values = new Object[requiredSchema.size()];
+      for (int i = 0; i < values.length; i++) {
+        if ("i".equals(requiredSchema.apply(i).name())) {
+          values[i] = start;
+        } else if ("j".equals(requiredSchema.apply(i).name())) {
+          values[i] = -start;
+        }
+      }
+      return new GenericRow(values);
+    }
+
+    @Override
+    public void close() throws IOException {
+
+    }
+  }
+
+
+  @Override
+  public DataSourceV2Reader createReader(DataSourceV2Options options) {
+    return new Reader();
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/c7307acd/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSchemaRequiredDataSource.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSchemaRequiredDataSource.java
 
b/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSchemaRequiredDataSource.java
new file mode 100644
index 0000000..a174bd8
--- /dev/null
+++ 
b/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSchemaRequiredDataSource.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package test.org.apache.spark.sql.sources.v2;
+
+import java.util.List;
+
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.sources.v2.DataSourceV2;
+import org.apache.spark.sql.sources.v2.DataSourceV2Options;
+import org.apache.spark.sql.sources.v2.ReadSupportWithSchema;
+import org.apache.spark.sql.sources.v2.reader.DataSourceV2Reader;
+import org.apache.spark.sql.sources.v2.reader.ReadTask;
+import org.apache.spark.sql.types.StructType;
+
+public class JavaSchemaRequiredDataSource implements DataSourceV2, 
ReadSupportWithSchema {
+
+  class Reader implements DataSourceV2Reader {
+    private final StructType schema;
+
+    Reader(StructType schema) {
+      this.schema = schema;
+    }
+
+    @Override
+    public StructType readSchema() {
+      return schema;
+    }
+
+    @Override
+    public List<ReadTask<Row>> createReadTasks() {
+      return java.util.Collections.emptyList();
+    }
+  }
+
+  @Override
+  public DataSourceV2Reader createReader(StructType schema, 
DataSourceV2Options options) {
+    return new Reader(schema);
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/c7307acd/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSimpleDataSourceV2.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSimpleDataSourceV2.java
 
b/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSimpleDataSourceV2.java
new file mode 100644
index 0000000..08469f1
--- /dev/null
+++ 
b/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSimpleDataSourceV2.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package test.org.apache.spark.sql.sources.v2;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.catalyst.expressions.GenericRow;
+import org.apache.spark.sql.sources.v2.DataSourceV2;
+import org.apache.spark.sql.sources.v2.DataSourceV2Options;
+import org.apache.spark.sql.sources.v2.ReadSupport;
+import org.apache.spark.sql.sources.v2.reader.DataReader;
+import org.apache.spark.sql.sources.v2.reader.ReadTask;
+import org.apache.spark.sql.sources.v2.reader.DataSourceV2Reader;
+import org.apache.spark.sql.types.StructType;
+
+public class JavaSimpleDataSourceV2 implements DataSourceV2, ReadSupport {
+
+  class Reader implements DataSourceV2Reader {
+    private final StructType schema = new StructType().add("i", 
"int").add("j", "int");
+
+    @Override
+    public StructType readSchema() {
+      return schema;
+    }
+
+    @Override
+    public List<ReadTask<Row>> createReadTasks() {
+      return java.util.Arrays.asList(
+        new JavaSimpleReadTask(0, 5),
+        new JavaSimpleReadTask(5, 10));
+    }
+  }
+
+  static class JavaSimpleReadTask implements ReadTask<Row>, DataReader<Row> {
+    private int start;
+    private int end;
+
+    JavaSimpleReadTask(int start, int end) {
+      this.start = start;
+      this.end = end;
+    }
+
+    @Override
+    public DataReader<Row> createReader() {
+      return new JavaSimpleReadTask(start - 1, end);
+    }
+
+    @Override
+    public boolean next() {
+      start += 1;
+      return start < end;
+    }
+
+    @Override
+    public Row get() {
+      return new GenericRow(new Object[] {start, -start});
+    }
+
+    @Override
+    public void close() throws IOException {
+
+    }
+  }
+
+  @Override
+  public DataSourceV2Reader createReader(DataSourceV2Options options) {
+    return new Reader();
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/c7307acd/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaUnsafeRowDataSourceV2.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaUnsafeRowDataSourceV2.java
 
b/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaUnsafeRowDataSourceV2.java
new file mode 100644
index 0000000..9efe7c7
--- /dev/null
+++ 
b/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaUnsafeRowDataSourceV2.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package test.org.apache.spark.sql.sources.v2;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow;
+import org.apache.spark.sql.sources.v2.DataSourceV2;
+import org.apache.spark.sql.sources.v2.DataSourceV2Options;
+import org.apache.spark.sql.sources.v2.ReadSupport;
+import org.apache.spark.sql.sources.v2.reader.*;
+import org.apache.spark.sql.types.StructType;
+
+public class JavaUnsafeRowDataSourceV2 implements DataSourceV2, ReadSupport {
+
+  class Reader implements DataSourceV2Reader, SupportsScanUnsafeRow {
+    private final StructType schema = new StructType().add("i", 
"int").add("j", "int");
+
+    @Override
+    public StructType readSchema() {
+      return schema;
+    }
+
+    @Override
+    public List<ReadTask<UnsafeRow>> createUnsafeRowReadTasks() {
+      return java.util.Arrays.asList(
+        new JavaUnsafeRowReadTask(0, 5),
+        new JavaUnsafeRowReadTask(5, 10));
+    }
+  }
+
+  static class JavaUnsafeRowReadTask implements ReadTask<UnsafeRow>, 
DataReader<UnsafeRow> {
+    private int start;
+    private int end;
+    private UnsafeRow row;
+
+    JavaUnsafeRowReadTask(int start, int end) {
+      this.start = start;
+      this.end = end;
+      this.row = new UnsafeRow(2);
+      row.pointTo(new byte[8 * 3], 8 * 3);
+    }
+
+    @Override
+    public DataReader<UnsafeRow> createReader() {
+      return new JavaUnsafeRowReadTask(start - 1, end);
+    }
+
+    @Override
+    public boolean next() {
+      start += 1;
+      return start < end;
+    }
+
+    @Override
+    public UnsafeRow get() {
+      row.setInt(0, start);
+      row.setInt(1, -start);
+      return row;
+    }
+
+    @Override
+    public void close() throws IOException {
+
+    }
+  }
+
+  @Override
+  public DataSourceV2Reader createReader(DataSourceV2Options options) {
+    return new Reader();
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/c7307acd/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2OptionsSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2OptionsSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2OptionsSuite.scala
new file mode 100644
index 0000000..933f407
--- /dev/null
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2OptionsSuite.scala
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.SparkFunSuite
+
+/**
+ * A simple test suite to verify `DataSourceV2Options`.
+ */
+class DataSourceV2OptionsSuite extends SparkFunSuite {
+
+  test("key is case-insensitive") {
+    val options = new DataSourceV2Options(Map("foo" -> "bar").asJava)
+    assert(options.get("foo").get() == "bar")
+    assert(options.get("FoO").get() == "bar")
+    assert(!options.get("abc").isPresent)
+  }
+
+  test("value is case-sensitive") {
+    val options = new DataSourceV2Options(Map("foo" -> "bAr").asJava)
+    assert(options.get("foo").get == "bAr")
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/c7307acd/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala
new file mode 100644
index 0000000..9ce93d7
--- /dev/null
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2
+
+import java.util.{ArrayList, List => JList}
+
+import test.org.apache.spark.sql.sources.v2._
+
+import org.apache.spark.sql.{AnalysisException, QueryTest, Row}
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import org.apache.spark.sql.sources.{Filter, GreaterThan}
+import org.apache.spark.sql.sources.v2.reader._
+import org.apache.spark.sql.test.SharedSQLContext
+import org.apache.spark.sql.types.StructType
+
+class DataSourceV2Suite extends QueryTest with SharedSQLContext {
+  import testImplicits._
+
+  test("simplest implementation") {
+    Seq(classOf[SimpleDataSourceV2], classOf[JavaSimpleDataSourceV2]).foreach 
{ cls =>
+      withClue(cls.getName) {
+        val df = spark.read.format(cls.getName).load()
+        checkAnswer(df, (0 until 10).map(i => Row(i, -i)))
+        checkAnswer(df.select('j), (0 until 10).map(i => Row(-i)))
+        checkAnswer(df.filter('i > 5), (6 until 10).map(i => Row(i, -i)))
+      }
+    }
+  }
+
+  test("advanced implementation") {
+    Seq(classOf[AdvancedDataSourceV2], 
classOf[JavaAdvancedDataSourceV2]).foreach { cls =>
+      withClue(cls.getName) {
+        val df = spark.read.format(cls.getName).load()
+        checkAnswer(df, (0 until 10).map(i => Row(i, -i)))
+        checkAnswer(df.select('j), (0 until 10).map(i => Row(-i)))
+        checkAnswer(df.filter('i > 3), (4 until 10).map(i => Row(i, -i)))
+        checkAnswer(df.select('j).filter('i > 6), (7 until 10).map(i => 
Row(-i)))
+        checkAnswer(df.select('i).filter('i > 10), Nil)
+      }
+    }
+  }
+
+  test("unsafe row implementation") {
+    Seq(classOf[UnsafeRowDataSourceV2], 
classOf[JavaUnsafeRowDataSourceV2]).foreach { cls =>
+      withClue(cls.getName) {
+        val df = spark.read.format(cls.getName).load()
+        checkAnswer(df, (0 until 10).map(i => Row(i, -i)))
+        checkAnswer(df.select('j), (0 until 10).map(i => Row(-i)))
+        checkAnswer(df.filter('i > 5), (6 until 10).map(i => Row(i, -i)))
+      }
+    }
+  }
+
+  test("schema required data source") {
+    Seq(classOf[SchemaRequiredDataSource], 
classOf[JavaSchemaRequiredDataSource]).foreach { cls =>
+      withClue(cls.getName) {
+        val e = 
intercept[AnalysisException](spark.read.format(cls.getName).load())
+        assert(e.message.contains("A schema needs to be specified"))
+
+        val schema = new StructType().add("i", "int").add("s", "string")
+        val df = spark.read.format(cls.getName).schema(schema).load()
+
+        assert(df.schema == schema)
+        assert(df.collect().isEmpty)
+      }
+    }
+  }
+}
+
+class SimpleDataSourceV2 extends DataSourceV2 with ReadSupport {
+
+  class Reader extends DataSourceV2Reader {
+    override def readSchema(): StructType = new StructType().add("i", 
"int").add("j", "int")
+
+    override def createReadTasks(): JList[ReadTask[Row]] = {
+      java.util.Arrays.asList(new SimpleReadTask(0, 5), new SimpleReadTask(5, 
10))
+    }
+  }
+
+  override def createReader(options: DataSourceV2Options): DataSourceV2Reader 
= new Reader
+}
+
+class SimpleReadTask(start: Int, end: Int) extends ReadTask[Row] with 
DataReader[Row] {
+  private var current = start - 1
+
+  override def createReader(): DataReader[Row] = new SimpleReadTask(start, end)
+
+  override def next(): Boolean = {
+    current += 1
+    current < end
+  }
+
+  override def get(): Row = Row(current, -current)
+
+  override def close(): Unit = {}
+}
+
+
+
+class AdvancedDataSourceV2 extends DataSourceV2 with ReadSupport {
+
+  class Reader extends DataSourceV2Reader
+    with SupportsPushDownRequiredColumns with SupportsPushDownFilters {
+
+    var requiredSchema = new StructType().add("i", "int").add("j", "int")
+    var filters = Array.empty[Filter]
+
+    override def pruneColumns(requiredSchema: StructType): Unit = {
+      this.requiredSchema = requiredSchema
+    }
+
+    override def pushFilters(filters: Array[Filter]): Array[Filter] = {
+      this.filters = filters
+      Array.empty
+    }
+
+    override def readSchema(): StructType = {
+      requiredSchema
+    }
+
+    override def createReadTasks(): JList[ReadTask[Row]] = {
+      val lowerBound = filters.collect {
+        case GreaterThan("i", v: Int) => v
+      }.headOption
+
+      val res = new ArrayList[ReadTask[Row]]
+
+      if (lowerBound.isEmpty) {
+        res.add(new AdvancedReadTask(0, 5, requiredSchema))
+        res.add(new AdvancedReadTask(5, 10, requiredSchema))
+      } else if (lowerBound.get < 4) {
+        res.add(new AdvancedReadTask(lowerBound.get + 1, 5, requiredSchema))
+        res.add(new AdvancedReadTask(5, 10, requiredSchema))
+      } else if (lowerBound.get < 9) {
+        res.add(new AdvancedReadTask(lowerBound.get + 1, 10, requiredSchema))
+      }
+
+      res
+    }
+  }
+
+  override def createReader(options: DataSourceV2Options): DataSourceV2Reader 
= new Reader
+}
+
+class AdvancedReadTask(start: Int, end: Int, requiredSchema: StructType)
+  extends ReadTask[Row] with DataReader[Row] {
+
+  private var current = start - 1
+
+  override def createReader(): DataReader[Row] = new AdvancedReadTask(start, 
end, requiredSchema)
+
+  override def close(): Unit = {}
+
+  override def next(): Boolean = {
+    current += 1
+    current < end
+  }
+
+  override def get(): Row = {
+    val values = requiredSchema.map(_.name).map {
+      case "i" => current
+      case "j" => -current
+    }
+    Row.fromSeq(values)
+  }
+}
+
+
+class UnsafeRowDataSourceV2 extends DataSourceV2 with ReadSupport {
+
+  class Reader extends DataSourceV2Reader with SupportsScanUnsafeRow {
+    override def readSchema(): StructType = new StructType().add("i", 
"int").add("j", "int")
+
+    override def createUnsafeRowReadTasks(): JList[ReadTask[UnsafeRow]] = {
+      java.util.Arrays.asList(new UnsafeRowReadTask(0, 5), new 
UnsafeRowReadTask(5, 10))
+    }
+  }
+
+  override def createReader(options: DataSourceV2Options): DataSourceV2Reader 
= new Reader
+}
+
+class UnsafeRowReadTask(start: Int, end: Int)
+  extends ReadTask[UnsafeRow] with DataReader[UnsafeRow] {
+
+  private val row = new UnsafeRow(2)
+  row.pointTo(new Array[Byte](8 * 3), 8 * 3)
+
+  private var current = start - 1
+
+  override def createReader(): DataReader[UnsafeRow] = new 
UnsafeRowReadTask(start, end)
+
+  override def next(): Boolean = {
+    current += 1
+    current < end
+  }
+  override def get(): UnsafeRow = {
+    row.setInt(0, current)
+    row.setInt(1, -current)
+    row
+  }
+
+  override def close(): Unit = {}
+}
+
+class SchemaRequiredDataSource extends DataSourceV2 with ReadSupportWithSchema 
{
+
+  class Reader(val readSchema: StructType) extends DataSourceV2Reader {
+    override def createReadTasks(): JList[ReadTask[Row]] =
+      java.util.Collections.emptyList()
+  }
+
+  override def createReader(schema: StructType, options: DataSourceV2Options): 
DataSourceV2Reader =
+    new Reader(schema)
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to