Repository: spark
Updated Branches:
  refs/heads/master 1e3b8762a -> 310a8cd06


[SPARK-23341][SQL] define some standard options for data source v2

## What changes were proposed in this pull request?

Each data source implementation can define its own options and teach its users 
how to set them. Spark doesn't have any restrictions about what options a data 
source should or should not have. It's possible that some options are very 
common and many data sources use them. However different data sources may 
define the common options(key and meaning) differently, which is quite 
confusing to end users.

This PR defines some standard options that data sources can optionally adopt: 
path, table and database.

## How was this patch tested?

a new test case.

Author: Wenchen Fan <wenc...@databricks.com>

Closes #20535 from cloud-fan/options.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/310a8cd0
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/310a8cd0
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/310a8cd0

Branch: refs/heads/master
Commit: 310a8cd06299e434d94a1e391a6eb62944112446
Parents: 1e3b876
Author: Wenchen Fan <wenc...@databricks.com>
Authored: Wed Apr 18 11:51:10 2018 +0800
Committer: Wenchen Fan <wenc...@databricks.com>
Committed: Wed Apr 18 11:51:10 2018 +0800

----------------------------------------------------------------------
 .../spark/sql/sources/v2/DataSourceOptions.java | 100 +++++++++++++++++++
 .../org/apache/spark/sql/DataFrameReader.scala  |  14 ++-
 .../sql/sources/v2/DataSourceOptionsSuite.scala |  25 +++++
 3 files changed, 135 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/310a8cd0/sql/core/src/main/java/org/apache/spark/sql/sources/v2/DataSourceOptions.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/DataSourceOptions.java 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/DataSourceOptions.java
index c320535..83df3be 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/DataSourceOptions.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/DataSourceOptions.java
@@ -17,16 +17,61 @@
 
 package org.apache.spark.sql.sources.v2;
 
+import java.io.IOException;
 import java.util.HashMap;
 import java.util.Locale;
 import java.util.Map;
 import java.util.Optional;
+import java.util.stream.Stream;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
 
 import org.apache.spark.annotation.InterfaceStability;
 
 /**
  * An immutable string-to-string map in which keys are case-insensitive. This 
is used to represent
  * data source options.
+ *
+ * Each data source implementation can define its own options and teach its 
users how to set them.
+ * Spark doesn't have any restrictions about what options a data source should 
or should not have.
+ * Instead Spark defines some standard options that data sources can 
optionally adopt. It's possible
+ * that some options are very common and many data sources use them. However 
different data
+ * sources may define the common options(key and meaning) differently, which 
is quite confusing to
+ * end users.
+ *
+ * The standard options defined by Spark:
+ * <table summary="standard data source options">
+ *   <tr>
+ *     <th><b>Option key</b></th>
+ *     <th><b>Option value</b></th>
+ *   </tr>
+ *   <tr>
+ *     <td>path</td>
+ *     <td>A path string of the data files/directories, like
+ *     <code>path1</code>, <code>/absolute/file2</code>, <code>path3/*</code>. 
The path can
+ *     either be relative or absolute, points to either file or directory, and 
can contain
+ *     wildcards. This option is commonly used by file-based data sources.</td>
+ *   </tr>
+ *   <tr>
+ *     <td>paths</td>
+ *     <td>A JSON array style paths string of the data files/directories, like
+ *     <code>["path1", "/absolute/file2"]</code>. The format of each path is 
same as the
+ *     <code>path</code> option, plus it should follow JSON string literal 
format, e.g. quotes
+ *     should be escaped, <code>pa\"th</code> means pa"th.
+ *     </td>
+ *   </tr>
+ *   <tr>
+ *     <td>table</td>
+ *     <td>A table name string representing the table name directly without 
any interpretation.
+ *     For example, <code>db.tbl</code> means a table called db.tbl, not a 
table called tbl
+ *     inside database db. <code>`t*b.l`</code> means a table called `t*b.l`, 
not t*b.l.</td>
+ *   </tr>
+ *   <tr>
+ *     <td>database</td>
+ *     <td>A database name string representing the database name directly 
without any
+ *     interpretation, which is very similar to the table name option.</td>
+ *   </tr>
+ * </table>
  */
 @InterfaceStability.Evolving
 public class DataSourceOptions {
@@ -97,4 +142,59 @@ public class DataSourceOptions {
     return keyLowerCasedMap.containsKey(lcaseKey) ?
       Double.parseDouble(keyLowerCasedMap.get(lcaseKey)) : defaultValue;
   }
+
+  /**
+   * The option key for singular path.
+   */
+  public static final String PATH_KEY = "path";
+
+  /**
+   * The option key for multiple paths.
+   */
+  public static final String PATHS_KEY = "paths";
+
+  /**
+   * The option key for table name.
+   */
+  public static final String TABLE_KEY = "table";
+
+  /**
+   * The option key for database name.
+   */
+  public static final String DATABASE_KEY = "database";
+
+  /**
+   * Returns all the paths specified by both the singular path option and the 
multiple
+   * paths option.
+   */
+  public String[] paths() {
+    String[] singularPath =
+      get(PATH_KEY).map(s -> new String[]{s}).orElseGet(() -> new String[0]);
+    Optional<String> pathsStr = get(PATHS_KEY);
+    if (pathsStr.isPresent()) {
+      ObjectMapper objectMapper = new ObjectMapper();
+      try {
+        String[] paths = objectMapper.readValue(pathsStr.get(), 
String[].class);
+        return Stream.of(singularPath, 
paths).flatMap(Stream::of).toArray(String[]::new);
+      } catch (IOException e) {
+        return singularPath;
+      }
+    } else {
+      return singularPath;
+    }
+  }
+
+  /**
+   * Returns the value of the table name option.
+   */
+  public Optional<String> tableName() {
+    return get(TABLE_KEY);
+  }
+
+  /**
+   * Returns the value of the database name option.
+   */
+  public Optional<String> databaseName() {
+    return get(DATABASE_KEY);
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/310a8cd0/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
index ae3ba16..d640fdc 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
@@ -21,6 +21,8 @@ import java.util.{Locale, Properties}
 
 import scala.collection.JavaConverters._
 
+import com.fasterxml.jackson.databind.ObjectMapper
+
 import org.apache.spark.Partition
 import org.apache.spark.annotation.InterfaceStability
 import org.apache.spark.api.java.JavaRDD
@@ -34,7 +36,7 @@ import org.apache.spark.sql.execution.datasources.jdbc._
 import org.apache.spark.sql.execution.datasources.json.TextInputJsonDataSource
 import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
 import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Utils
-import org.apache.spark.sql.sources.v2.{DataSourceV2, ReadSupport, 
ReadSupportWithSchema}
+import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, 
ReadSupport, ReadSupportWithSchema}
 import org.apache.spark.sql.types.{StringType, StructType}
 import org.apache.spark.unsafe.types.UTF8String
 
@@ -171,7 +173,8 @@ class DataFrameReader private[sql](sparkSession: 
SparkSession) extends Logging {
    * @since 1.4.0
    */
   def load(path: String): DataFrame = {
-    option("path", path).load(Seq.empty: _*) // force invocation of 
`load(...varargs...)`
+    // force invocation of `load(...varargs...)`
+    option(DataSourceOptions.PATH_KEY, path).load(Seq.empty: _*)
   }
 
   /**
@@ -193,10 +196,13 @@ class DataFrameReader private[sql](sparkSession: 
SparkSession) extends Logging {
       if (ds.isInstanceOf[ReadSupport] || 
ds.isInstanceOf[ReadSupportWithSchema]) {
         val sessionOptions = DataSourceV2Utils.extractSessionConfigs(
           ds = ds, conf = sparkSession.sessionState.conf)
+        val pathsOption = {
+          val objectMapper = new ObjectMapper()
+          DataSourceOptions.PATHS_KEY -> 
objectMapper.writeValueAsString(paths.toArray)
+        }
         Dataset.ofRows(sparkSession, DataSourceV2Relation.create(
-          ds, extraOptions.toMap ++ sessionOptions,
+          ds, extraOptions.toMap ++ sessionOptions + pathsOption,
           userSpecifiedSchema = userSpecifiedSchema))
-
       } else {
         loadV1Source(paths: _*)
       }

http://git-wip-us.apache.org/repos/asf/spark/blob/310a8cd0/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceOptionsSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceOptionsSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceOptionsSuite.scala
index 31dfc55..cfa69a8 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceOptionsSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceOptionsSuite.scala
@@ -79,4 +79,29 @@ class DataSourceOptionsSuite extends SparkFunSuite {
       options.getDouble("foo", 0.1d)
     }
   }
+
+  test("standard options") {
+    val options = new DataSourceOptions(Map(
+      DataSourceOptions.PATH_KEY -> "abc",
+      DataSourceOptions.TABLE_KEY -> "tbl").asJava)
+
+    assert(options.paths().toSeq == Seq("abc"))
+    assert(options.tableName().get() == "tbl")
+    assert(!options.databaseName().isPresent)
+  }
+
+  test("standard options with both singular path and multi-paths") {
+    val options = new DataSourceOptions(Map(
+      DataSourceOptions.PATH_KEY -> "abc",
+      DataSourceOptions.PATHS_KEY -> """["c", "d"]""").asJava)
+
+    assert(options.paths().toSeq == Seq("abc", "c", "d"))
+  }
+
+  test("standard options with only multi-paths") {
+    val options = new DataSourceOptions(Map(
+      DataSourceOptions.PATHS_KEY -> """["c", "d\"e"]""").asJava)
+
+    assert(options.paths().toSeq == Seq("c", "d\"e"))
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to