Repository: spark Updated Branches: refs/heads/master bb2f069cf -> e06da95cd
[SPARK-25425][SQL] Extra options should override session options in DataSource V2 ## What changes were proposed in this pull request? In the PR, I propose overriding session options by extra options in DataSource V2. Extra options are more specific and set via `.option()`, and should overwrite more generic session options. Entries from seconds map overwrites entries with the same key from the first map, for example: ```Scala scala> Map("option" -> false) ++ Map("option" -> true) res0: scala.collection.immutable.Map[String,Boolean] = Map(option -> true) ``` ## How was this patch tested? Added a test for checking which option is propagated to a data source in `load()`. Closes #22413 from MaxGekk/session-options. Lead-authored-by: Maxim Gekk <maxim.g...@databricks.com> Co-authored-by: Dongjoon Hyun <dongj...@apache.org> Co-authored-by: Maxim Gekk <max.g...@gmail.com> Signed-off-by: Dongjoon Hyun <dongj...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e06da95c Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e06da95c Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e06da95c Branch: refs/heads/master Commit: e06da95cd9423f55cdb154a2778b0bddf7be984c Parents: bb2f069 Author: Maxim Gekk <maxim.g...@databricks.com> Authored: Sat Sep 15 17:24:11 2018 -0700 Committer: Dongjoon Hyun <dongj...@apache.org> Committed: Sat Sep 15 17:24:11 2018 -0700 ---------------------------------------------------------------------- .../org/apache/spark/sql/DataFrameReader.scala | 2 +- .../org/apache/spark/sql/DataFrameWriter.scala | 8 +++-- .../sql/sources/v2/DataSourceV2Suite.scala | 35 +++++++++++++++++++- .../sources/v2/SimpleWritableDataSource.scala | 6 +++- 4 files changed, 45 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/e06da95c/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala index e6c2cba..fe69f25 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala @@ -202,7 +202,7 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { DataSourceOptions.PATHS_KEY -> objectMapper.writeValueAsString(paths.toArray) } Dataset.ofRows(sparkSession, DataSourceV2Relation.create( - ds, extraOptions.toMap ++ sessionOptions + pathsOption, + ds, sessionOptions ++ extraOptions.toMap + pathsOption, userSpecifiedSchema = userSpecifiedSchema)) } else { loadV1Source(paths: _*) http://git-wip-us.apache.org/repos/asf/spark/blob/e06da95c/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index dfb8c47..188fce7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -241,10 +241,12 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { val source = cls.newInstance().asInstanceOf[DataSourceV2] source match { case provider: BatchWriteSupportProvider => - val options = extraOptions ++ - DataSourceV2Utils.extractSessionConfigs(source, df.sparkSession.sessionState.conf) + val sessionOptions = DataSourceV2Utils.extractSessionConfigs( + source, + df.sparkSession.sessionState.conf) + val options = sessionOptions ++ extraOptions - val relation = DataSourceV2Relation.create(source, options.toMap) + val relation = DataSourceV2Relation.create(source, options) if (mode == SaveMode.Append) { runCommand(df.sparkSession, "save") { AppendData.byName(relation, df.logicalPlan) http://git-wip-us.apache.org/repos/asf/spark/blob/e06da95c/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala index f6c3e0c..7cc8abc 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.sources.v2 +import java.io.File + import test.org.apache.spark.sql.sources.v2._ import org.apache.spark.SparkException @@ -317,6 +319,38 @@ class DataSourceV2Suite extends QueryTest with SharedSQLContext { checkCanonicalizedOutput(df, 2, 2) checkCanonicalizedOutput(df.select('i), 2, 1) } + + test("SPARK-25425: extra options should override sessions options during reading") { + val prefix = "spark.datasource.userDefinedDataSource." + val optionName = "optionA" + withSQLConf(prefix + optionName -> "true") { + val df = spark + .read + .option(optionName, false) + .format(classOf[DataSourceV2WithSessionConfig].getName).load() + val options = df.queryExecution.optimizedPlan.collectFirst { + case d: DataSourceV2Relation => d.options + } + assert(options.get.get(optionName) == Some("false")) + } + } + + test("SPARK-25425: extra options should override sessions options during writing") { + withTempPath { path => + val sessionPath = path.getCanonicalPath + withSQLConf("spark.datasource.simpleWritableDataSource.path" -> sessionPath) { + withTempPath { file => + val optionPath = file.getCanonicalPath + val format = classOf[SimpleWritableDataSource].getName + + val df = Seq((1L, 2L)).toDF("i", "j") + df.write.format(format).option("path", optionPath).save() + assert(!new File(sessionPath).exists) + checkAnswer(spark.read.format(format).option("path", optionPath).load(), df) + } + } + } + } } @@ -385,7 +419,6 @@ class SimpleDataSourceV2 extends DataSourceV2 with BatchReadSupportProvider { } } - class AdvancedDataSourceV2 extends DataSourceV2 with BatchReadSupportProvider { class ReadSupport extends SimpleReadSupport { http://git-wip-us.apache.org/repos/asf/spark/blob/e06da95c/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala index 952241b..a0f4404 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala @@ -39,10 +39,14 @@ import org.apache.spark.util.SerializableConfiguration * Each job moves files from `target/_temporary/queryId/` to `target`. */ class SimpleWritableDataSource extends DataSourceV2 - with BatchReadSupportProvider with BatchWriteSupportProvider { + with BatchReadSupportProvider + with BatchWriteSupportProvider + with SessionConfigSupport { private val schema = new StructType().add("i", "long").add("j", "long") + override def keyPrefix: String = "simpleWritableDataSource" + class ReadSupport(path: String, conf: Configuration) extends SimpleReadSupport { override def fullSchema(): StructType = schema --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org