This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch branch-2.4 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-2.4 by this push: new 6a653a2 [SPARK-32364][SQL][2.4] Use CaseInsensitiveMap for DataFrameReader/Writer options 6a653a2 is described below commit 6a653a2faeb05c1d0f91cbbcaf3c8e37b0d6e0bc Author: Dongjoon Hyun <dongj...@apache.org> AuthorDate: Thu Jul 23 16:09:27 2020 -0700 [SPARK-32364][SQL][2.4] Use CaseInsensitiveMap for DataFrameReader/Writer options ### What changes were proposed in this pull request? This PR is a backport of SPARK-32364 (https://github.com/apache/spark/pull/29160, https://github.com/apache/spark/pull/29191). When a user have multiple options like `path`, `paTH`, and `PATH` for the same key `path`, `option/options` is indeterministic because `extraOptions` is `HashMap`. This PR aims to use `CaseInsensitiveMap` instead of `HashMap` to fix this bug fundamentally. Like the following, DataFrame's `option/options` have been non-deterministic in terms of case-insensitivity because it stores the options at `extraOptions` which is using `HashMap` class. ```scala spark.read .option("paTh", "1") .option("PATH", "2") .option("Path", "3") .option("patH", "4") .load("5") ... org.apache.spark.sql.AnalysisException: Path does not exist: file:/.../1; ``` Also, this PR adds the following. 1. Add an explicit document to `DataFrameReader/DataFrameWriter`. 2. Add `toMap` to `CaseInsensitiveMap` in order to return `originalMap: Map[String, T]` because it's more consistent with the existing `case-sensitive key names` behavior for the existing code pattern like `AppendData.byName(..., extraOptions.toMap)`. Previously, it was `HashMap.toMap`. 3. During (2), we need to change the following to keep the original logic using `CaseInsensitiveMap.++`. ```scala - val params = extraOptions.toMap ++ connectionProperties.asScala.toMap + val params = extraOptions ++ connectionProperties.asScala ``` 4. Additionally, use `.toMap` in the following because `dsOptions.asCaseSensitiveMap()` is used later. ```scala - val options = sessionOptions ++ extraOptions + val options = sessionOptions.filterKeys(!extraOptions.contains(_)) ++ extraOptions.toMap val dsOptions = new CaseInsensitiveStringMap(options.asJava) ``` `extraOptions.toMap` is used in several places (e.g. `DataFrameReader`) to hand over `Map[String, T]`. In this case, `CaseInsensitiveMap[T] private (val originalMap: Map[String, T])` had better return `originalMap`. ### Why are the changes needed? This will fix indeterministic behavior. ### Does this PR introduce _any_ user-facing change? Yes. ### How was this patch tested? Pass the Jenkins with the existing tests and newly add test cases. Closes #29209 from dongjoon-hyun/SPARK-32364-2.4. Authored-by: Dongjoon Hyun <dongj...@apache.org> Signed-off-by: Dongjoon Hyun <dongj...@apache.org> --- .../sql/catalyst/util/CaseInsensitiveMap.scala | 8 ++++++- .../org/apache/spark/sql/DataFrameReader.scala | 25 +++++++++++++++++++--- .../org/apache/spark/sql/DataFrameWriter.scala | 25 +++++++++++++++++++--- .../org/apache/spark/sql/jdbc/JDBCSuite.scala | 24 ++++++++++++++++++++- .../sql/test/DataFrameReaderWriterSuite.scala | 22 +++++++++++++++++++ 5 files changed, 96 insertions(+), 8 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/CaseInsensitiveMap.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/CaseInsensitiveMap.scala index 4b149d2..eb12d33 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/CaseInsensitiveMap.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/CaseInsensitiveMap.scala @@ -35,15 +35,21 @@ class CaseInsensitiveMap[T] private (val originalMap: Map[String, T]) extends Ma override def contains(k: String): Boolean = keyLowerCasedMap.contains(k.toLowerCase(Locale.ROOT)) - override def +[B1 >: T](kv: (String, B1)): Map[String, B1] = { + override def +[B1 >: T](kv: (String, B1)): CaseInsensitiveMap[B1] = { new CaseInsensitiveMap(originalMap.filter(!_._1.equalsIgnoreCase(kv._1)) + kv) } + def ++(xs: TraversableOnce[(String, T)]): CaseInsensitiveMap[T] = { + xs.foldLeft(this)(_ + _) + } + override def iterator: Iterator[(String, T)] = keyLowerCasedMap.iterator override def -(key: String): Map[String, T] = { new CaseInsensitiveMap(originalMap.filterKeys(!_.equalsIgnoreCase(key))) } + + def toMap: Map[String, T] = originalMap } object CaseInsensitiveMap { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala index c71f871..ce0a4e8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala @@ -30,6 +30,7 @@ import org.apache.spark.api.java.JavaRDD import org.apache.spark.internal.Logging import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.json.{CreateJacksonParser, JacksonParser, JSONOptions} +import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap import org.apache.spark.sql.execution.command.DDLUtils import org.apache.spark.sql.execution.datasources.{DataSource, FailureSafeParser} import org.apache.spark.sql.execution.datasources.csv._ @@ -91,6 +92,9 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { /** * Adds an input option for the underlying data source. * + * All options are maintained in a case-insensitive way in terms of key names. + * If a new option has the same key case-insensitively, it will override the existing option. + * * You can set the following option(s): * <ul> * <li>`timeZone` (default session local timezone): sets the string that indicates a timezone @@ -107,6 +111,9 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { /** * Adds an input option for the underlying data source. * + * All options are maintained in a case-insensitive way in terms of key names. + * If a new option has the same key case-insensitively, it will override the existing option. + * * @since 2.0.0 */ def option(key: String, value: Boolean): DataFrameReader = option(key, value.toString) @@ -114,6 +121,9 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { /** * Adds an input option for the underlying data source. * + * All options are maintained in a case-insensitive way in terms of key names. + * If a new option has the same key case-insensitively, it will override the existing option. + * * @since 2.0.0 */ def option(key: String, value: Long): DataFrameReader = option(key, value.toString) @@ -121,6 +131,9 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { /** * Adds an input option for the underlying data source. * + * All options are maintained in a case-insensitive way in terms of key names. + * If a new option has the same key case-insensitively, it will override the existing option. + * * @since 2.0.0 */ def option(key: String, value: Double): DataFrameReader = option(key, value.toString) @@ -128,6 +141,9 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { /** * (Scala-specific) Adds input options for the underlying data source. * + * All options are maintained in a case-insensitive way in terms of key names. + * If a new option has the same key case-insensitively, it will override the existing option. + * * You can set the following option(s): * <ul> * <li>`timeZone` (default session local timezone): sets the string that indicates a timezone @@ -144,6 +160,9 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { /** * Adds input options for the underlying data source. * + * All options are maintained in a case-insensitive way in terms of key names. + * If a new option has the same key case-insensitively, it will override the existing option. + * * You can set the following option(s): * <ul> * <li>`timeZone` (default session local timezone): sets the string that indicates a timezone @@ -234,7 +253,7 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { // properties should override settings in extraOptions. this.extraOptions ++= properties.asScala // explicit url and dbtable should override all - this.extraOptions += (JDBCOptions.JDBC_URL -> url, JDBCOptions.JDBC_TABLE_NAME -> table) + this.extraOptions ++= Seq(JDBCOptions.JDBC_URL -> url, JDBCOptions.JDBC_TABLE_NAME -> table) format("jdbc").load() } @@ -305,7 +324,7 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { connectionProperties: Properties): DataFrame = { assertNoSpecifiedSchema("jdbc") // connectionProperties should override settings in extraOptions. - val params = extraOptions.toMap ++ connectionProperties.asScala.toMap + val params = extraOptions ++ connectionProperties.asScala val options = new JDBCOptions(url, table, params) val parts: Array[Partition] = predicates.zipWithIndex.map { case (part, i) => JDBCPartition(part, i) : Partition @@ -790,6 +809,6 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { private var userSpecifiedSchema: Option[StructType] = None - private val extraOptions = new scala.collection.mutable.HashMap[String, String] + private var extraOptions = CaseInsensitiveMap[String](Map.empty) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index f47926c..3337f22 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -26,6 +26,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.{EliminateSubqueryAliases, UnresolvedRelation} import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.plans.logical.{AppendData, InsertIntoTable, LogicalPlan} +import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap import org.apache.spark.sql.execution.SQLExecution import org.apache.spark.sql.execution.command.DDLUtils import org.apache.spark.sql.execution.datasources.{CreateTable, DataSource, DataSourceUtils, LogicalRelation} @@ -98,6 +99,9 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { /** * Adds an output option for the underlying data source. * + * All options are maintained in a case-insensitive way in terms of key names. + * If a new option has the same key case-insensitively, it will override the existing option. + * * You can set the following option(s): * <ul> * <li>`timeZone` (default session local timezone): sets the string that indicates a timezone @@ -114,6 +118,9 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { /** * Adds an output option for the underlying data source. * + * All options are maintained in a case-insensitive way in terms of key names. + * If a new option has the same key case-insensitively, it will override the existing option. + * * @since 2.0.0 */ def option(key: String, value: Boolean): DataFrameWriter[T] = option(key, value.toString) @@ -121,6 +128,9 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { /** * Adds an output option for the underlying data source. * + * All options are maintained in a case-insensitive way in terms of key names. + * If a new option has the same key case-insensitively, it will override the existing option. + * * @since 2.0.0 */ def option(key: String, value: Long): DataFrameWriter[T] = option(key, value.toString) @@ -128,6 +138,9 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { /** * Adds an output option for the underlying data source. * + * All options are maintained in a case-insensitive way in terms of key names. + * If a new option has the same key case-insensitively, it will override the existing option. + * * @since 2.0.0 */ def option(key: String, value: Double): DataFrameWriter[T] = option(key, value.toString) @@ -135,6 +148,9 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { /** * (Scala-specific) Adds output options for the underlying data source. * + * All options are maintained in a case-insensitive way in terms of key names. + * If a new option has the same key case-insensitively, it will override the existing option. + * * You can set the following option(s): * <ul> * <li>`timeZone` (default session local timezone): sets the string that indicates a timezone @@ -151,6 +167,9 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { /** * Adds output options for the underlying data source. * + * All options are maintained in a case-insensitive way in terms of key names. + * If a new option has the same key case-insensitively, it will override the existing option. + * * You can set the following option(s): * <ul> * <li>`timeZone` (default session local timezone): sets the string that indicates a timezone @@ -251,7 +270,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { val sessionOptions = DataSourceV2Utils.extractSessionConfigs( source, df.sparkSession.sessionState.conf) - val options = sessionOptions ++ extraOptions + val options = sessionOptions.filterKeys(!extraOptions.contains(_)) ++ extraOptions.toMap val writer = ws.createWriter( UUID.randomUUID.toString, df.logicalPlan.output.toStructType, mode, @@ -512,7 +531,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { // connectionProperties should override settings in extraOptions. this.extraOptions ++= connectionProperties.asScala // explicit url and dbtable should override all - this.extraOptions += ("url" -> url, "dbtable" -> table) + this.extraOptions ++= Seq("url" -> url, "dbtable" -> table) format("jdbc").save() } @@ -692,7 +711,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { private var mode: SaveMode = SaveMode.ErrorIfExists - private val extraOptions = new scala.collection.mutable.HashMap[String, String] + private var extraOptions = CaseInsensitiveMap[String](Map.empty) private var partitioningColumns: Option[Seq[String]] = None diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala index 0edd226..dc61f72 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala @@ -21,6 +21,8 @@ import java.math.BigDecimal import java.sql.{Date, DriverManager, SQLException, Timestamp} import java.util.{Calendar, GregorianCalendar, Properties} +import scala.collection.JavaConverters._ + import org.h2.jdbc.JdbcSQLException import org.scalatest.{BeforeAndAfter, PrivateMethodTester} @@ -1261,7 +1263,8 @@ class JDBCSuite extends QueryTest testJdbcOptions(new JDBCOptions(parameters)) testJdbcOptions(new JDBCOptions(CaseInsensitiveMap(parameters))) // test add/remove key-value from the case-insensitive map - var modifiedParameters = CaseInsensitiveMap(Map.empty) ++ parameters + var modifiedParameters = + (CaseInsensitiveMap(Map.empty) ++ parameters).asInstanceOf[Map[String, String]] testJdbcOptions(new JDBCOptions(modifiedParameters)) modifiedParameters -= "dbtable" assert(modifiedParameters.get("dbTAblE").isEmpty) @@ -1585,4 +1588,23 @@ class JDBCSuite extends QueryTest checkNotPushdown(sql("SELECT name, theid FROM predicateOption WHERE theid = 1")), Row("fred", 1) :: Nil) } + + test("SPARK-32364: JDBCOption constructor") { + val extraOptions = CaseInsensitiveMap[String](Map("UrL" -> "url1", "dBTable" -> "table1")) + val connectionProperties = new Properties() + connectionProperties.put("url", "url2") + connectionProperties.put("dbtable", "table2") + + // connection property should override the options in extraOptions + val params = extraOptions ++ connectionProperties.asScala + assert(params.size == 2) + assert(params.get("uRl").contains("url2")) + assert(params.get("DbtaBle").contains("table2")) + + // JDBCOptions constructor parameter should overwrite the existing conf + val options = new JDBCOptions(url, "table3", params) + assert(options.asProperties.size == 2) + assert(options.asProperties.get("url") == url) + assert(options.asProperties.get("dbtable") == "table3") + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala index 67cd0b9..2b5b227 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala @@ -212,6 +212,28 @@ class DataFrameReaderWriterSuite extends QueryTest with SharedSQLContext with Be assert(LastOptions.parameters("opt3") == "3") } + test("SPARK-32364: path argument of load function should override all existing options") { + spark.read + .format("org.apache.spark.sql.test") + .option("paTh", "1") + .option("PATH", "2") + .option("Path", "3") + .option("patH", "4") + .load("5") + assert(LastOptions.parameters("path") == "5") + } + + test("SPARK-32364: path argument of save function should override all existing options") { + Seq(1).toDF.write + .format("org.apache.spark.sql.test") + .option("paTh", "1") + .option("PATH", "2") + .option("Path", "3") + .option("patH", "4") + .save("5") + assert(LastOptions.parameters("path") == "5") + } + test("pass partitionBy as options") { Seq(true, false).foreach { flag => withSQLConf(SQLConf.LEGACY_PASS_PARTITION_BY_AS_OPTIONS.key -> s"$flag") { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org