Repository: spark Updated Branches: refs/heads/master 1e3b8762a -> 310a8cd06
[SPARK-23341][SQL] define some standard options for data source v2 ## What changes were proposed in this pull request? Each data source implementation can define its own options and teach its users how to set them. Spark doesn't have any restrictions about what options a data source should or should not have. It's possible that some options are very common and many data sources use them. However different data sources may define the common options(key and meaning) differently, which is quite confusing to end users. This PR defines some standard options that data sources can optionally adopt: path, table and database. ## How was this patch tested? a new test case. Author: Wenchen Fan <wenc...@databricks.com> Closes #20535 from cloud-fan/options. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/310a8cd0 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/310a8cd0 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/310a8cd0 Branch: refs/heads/master Commit: 310a8cd06299e434d94a1e391a6eb62944112446 Parents: 1e3b876 Author: Wenchen Fan <wenc...@databricks.com> Authored: Wed Apr 18 11:51:10 2018 +0800 Committer: Wenchen Fan <wenc...@databricks.com> Committed: Wed Apr 18 11:51:10 2018 +0800 ---------------------------------------------------------------------- .../spark/sql/sources/v2/DataSourceOptions.java | 100 +++++++++++++++++++ .../org/apache/spark/sql/DataFrameReader.scala | 14 ++- .../sql/sources/v2/DataSourceOptionsSuite.scala | 25 +++++ 3 files changed, 135 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/310a8cd0/sql/core/src/main/java/org/apache/spark/sql/sources/v2/DataSourceOptions.java ---------------------------------------------------------------------- diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/DataSourceOptions.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/DataSourceOptions.java index c320535..83df3be 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/DataSourceOptions.java +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/DataSourceOptions.java @@ -17,16 +17,61 @@ package org.apache.spark.sql.sources.v2; +import java.io.IOException; import java.util.HashMap; import java.util.Locale; import java.util.Map; import java.util.Optional; +import java.util.stream.Stream; + +import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.spark.annotation.InterfaceStability; /** * An immutable string-to-string map in which keys are case-insensitive. This is used to represent * data source options. + * + * Each data source implementation can define its own options and teach its users how to set them. + * Spark doesn't have any restrictions about what options a data source should or should not have. + * Instead Spark defines some standard options that data sources can optionally adopt. It's possible + * that some options are very common and many data sources use them. However different data + * sources may define the common options(key and meaning) differently, which is quite confusing to + * end users. + * + * The standard options defined by Spark: + * <table summary="standard data source options"> + * <tr> + * <th><b>Option key</b></th> + * <th><b>Option value</b></th> + * </tr> + * <tr> + * <td>path</td> + * <td>A path string of the data files/directories, like + * <code>path1</code>, <code>/absolute/file2</code>, <code>path3/*</code>. The path can + * either be relative or absolute, points to either file or directory, and can contain + * wildcards. This option is commonly used by file-based data sources.</td> + * </tr> + * <tr> + * <td>paths</td> + * <td>A JSON array style paths string of the data files/directories, like + * <code>["path1", "/absolute/file2"]</code>. The format of each path is same as the + * <code>path</code> option, plus it should follow JSON string literal format, e.g. quotes + * should be escaped, <code>pa\"th</code> means pa"th. + * </td> + * </tr> + * <tr> + * <td>table</td> + * <td>A table name string representing the table name directly without any interpretation. + * For example, <code>db.tbl</code> means a table called db.tbl, not a table called tbl + * inside database db. <code>`t*b.l`</code> means a table called `t*b.l`, not t*b.l.</td> + * </tr> + * <tr> + * <td>database</td> + * <td>A database name string representing the database name directly without any + * interpretation, which is very similar to the table name option.</td> + * </tr> + * </table> */ @InterfaceStability.Evolving public class DataSourceOptions { @@ -97,4 +142,59 @@ public class DataSourceOptions { return keyLowerCasedMap.containsKey(lcaseKey) ? Double.parseDouble(keyLowerCasedMap.get(lcaseKey)) : defaultValue; } + + /** + * The option key for singular path. + */ + public static final String PATH_KEY = "path"; + + /** + * The option key for multiple paths. + */ + public static final String PATHS_KEY = "paths"; + + /** + * The option key for table name. + */ + public static final String TABLE_KEY = "table"; + + /** + * The option key for database name. + */ + public static final String DATABASE_KEY = "database"; + + /** + * Returns all the paths specified by both the singular path option and the multiple + * paths option. + */ + public String[] paths() { + String[] singularPath = + get(PATH_KEY).map(s -> new String[]{s}).orElseGet(() -> new String[0]); + Optional<String> pathsStr = get(PATHS_KEY); + if (pathsStr.isPresent()) { + ObjectMapper objectMapper = new ObjectMapper(); + try { + String[] paths = objectMapper.readValue(pathsStr.get(), String[].class); + return Stream.of(singularPath, paths).flatMap(Stream::of).toArray(String[]::new); + } catch (IOException e) { + return singularPath; + } + } else { + return singularPath; + } + } + + /** + * Returns the value of the table name option. + */ + public Optional<String> tableName() { + return get(TABLE_KEY); + } + + /** + * Returns the value of the database name option. + */ + public Optional<String> databaseName() { + return get(DATABASE_KEY); + } } http://git-wip-us.apache.org/repos/asf/spark/blob/310a8cd0/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala index ae3ba16..d640fdc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala @@ -21,6 +21,8 @@ import java.util.{Locale, Properties} import scala.collection.JavaConverters._ +import com.fasterxml.jackson.databind.ObjectMapper + import org.apache.spark.Partition import org.apache.spark.annotation.InterfaceStability import org.apache.spark.api.java.JavaRDD @@ -34,7 +36,7 @@ import org.apache.spark.sql.execution.datasources.jdbc._ import org.apache.spark.sql.execution.datasources.json.TextInputJsonDataSource import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Utils -import org.apache.spark.sql.sources.v2.{DataSourceV2, ReadSupport, ReadSupportWithSchema} +import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, ReadSupport, ReadSupportWithSchema} import org.apache.spark.sql.types.{StringType, StructType} import org.apache.spark.unsafe.types.UTF8String @@ -171,7 +173,8 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { * @since 1.4.0 */ def load(path: String): DataFrame = { - option("path", path).load(Seq.empty: _*) // force invocation of `load(...varargs...)` + // force invocation of `load(...varargs...)` + option(DataSourceOptions.PATH_KEY, path).load(Seq.empty: _*) } /** @@ -193,10 +196,13 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { if (ds.isInstanceOf[ReadSupport] || ds.isInstanceOf[ReadSupportWithSchema]) { val sessionOptions = DataSourceV2Utils.extractSessionConfigs( ds = ds, conf = sparkSession.sessionState.conf) + val pathsOption = { + val objectMapper = new ObjectMapper() + DataSourceOptions.PATHS_KEY -> objectMapper.writeValueAsString(paths.toArray) + } Dataset.ofRows(sparkSession, DataSourceV2Relation.create( - ds, extraOptions.toMap ++ sessionOptions, + ds, extraOptions.toMap ++ sessionOptions + pathsOption, userSpecifiedSchema = userSpecifiedSchema)) - } else { loadV1Source(paths: _*) } http://git-wip-us.apache.org/repos/asf/spark/blob/310a8cd0/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceOptionsSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceOptionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceOptionsSuite.scala index 31dfc55..cfa69a8 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceOptionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceOptionsSuite.scala @@ -79,4 +79,29 @@ class DataSourceOptionsSuite extends SparkFunSuite { options.getDouble("foo", 0.1d) } } + + test("standard options") { + val options = new DataSourceOptions(Map( + DataSourceOptions.PATH_KEY -> "abc", + DataSourceOptions.TABLE_KEY -> "tbl").asJava) + + assert(options.paths().toSeq == Seq("abc")) + assert(options.tableName().get() == "tbl") + assert(!options.databaseName().isPresent) + } + + test("standard options with both singular path and multi-paths") { + val options = new DataSourceOptions(Map( + DataSourceOptions.PATH_KEY -> "abc", + DataSourceOptions.PATHS_KEY -> """["c", "d"]""").asJava) + + assert(options.paths().toSeq == Seq("abc", "c", "d")) + } + + test("standard options with only multi-paths") { + val options = new DataSourceOptions(Map( + DataSourceOptions.PATHS_KEY -> """["c", "d\"e"]""").asJava) + + assert(options.paths().toSeq == Seq("c", "d\"e")) + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org