This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push: new f6f6026 [SPARK-32364][SQL][FOLLOWUP] Add toMap to return originalMap and documentation f6f6026 is described below commit f6f6026965d59e1f79dec1740e1b2056adea94e8 Author: Dongjoon Hyun <dongj...@apache.org> AuthorDate: Thu Jul 23 06:28:08 2020 -0700 [SPARK-32364][SQL][FOLLOWUP] Add toMap to return originalMap and documentation ### What changes were proposed in this pull request? This is a follow-up of https://github.com/apache/spark/pull/29160. We already removed the indeterministicity. This PR aims the following for the existing code base. 1. Add an explicit document to `DataFrameReader/DataFrameWriter`. 2. Add `toMap` to `CaseInsensitiveMap` in order to return `originalMap: Map[String, T]` because it's more consistent with the existing `case-sensitive key names` behavior for the existing code pattern like `AppendData.byName(..., extraOptions.toMap)`. Previously, it was `HashMap.toMap`. 3. During (2), we need to change the following to keep the original logic using `CaseInsensitiveMap.++`. ```scala - val params = extraOptions.toMap ++ connectionProperties.asScala.toMap + val params = extraOptions ++ connectionProperties.asScala ``` 4. Additionally, use `.toMap` in the following because `dsOptions.asCaseSensitiveMap()` is used later. ```scala - val options = sessionOptions ++ extraOptions + val options = sessionOptions.filterKeys(!extraOptions.contains(_)) ++ extraOptions.toMap val dsOptions = new CaseInsensitiveStringMap(options.asJava) ``` ### Why are the changes needed? `extraOptions.toMap` is used in several places (e.g. `DataFrameReader`) to hand over `Map[String, T]`. In this case, `CaseInsensitiveMap[T] private (val originalMap: Map[String, T])` had better return `originalMap`. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Pass the Jenkins or GitHub Action with the existing tests and newly add test case at `JDBCSuite`. Closes #29191 from dongjoon-hyun/SPARK-32364-3. Authored-by: Dongjoon Hyun <dongj...@apache.org> Signed-off-by: Dongjoon Hyun <dongj...@apache.org> (cherry picked from commit aed8dbab1d6725eb17f743c300451fcbdbfa3e97) Signed-off-by: Dongjoon Hyun <dongj...@apache.org> --- .../sql/catalyst/util/CaseInsensitiveMap.scala | 2 ++ .../sql/catalyst/util/CaseInsensitiveMap.scala | 2 ++ .../org/apache/spark/sql/DataFrameReader.scala | 20 +++++++++++++++++++- .../org/apache/spark/sql/DataFrameWriter.scala | 20 +++++++++++++++++++- .../scala/org/apache/spark/sql/jdbc/JDBCSuite.scala | 21 +++++++++++++++++++++ 5 files changed, 63 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala-2.12/org/apache/spark/sql/catalyst/util/CaseInsensitiveMap.scala b/sql/catalyst/src/main/scala-2.12/org/apache/spark/sql/catalyst/util/CaseInsensitiveMap.scala index c013888..14b8f62 100644 --- a/sql/catalyst/src/main/scala-2.12/org/apache/spark/sql/catalyst/util/CaseInsensitiveMap.scala +++ b/sql/catalyst/src/main/scala-2.12/org/apache/spark/sql/catalyst/util/CaseInsensitiveMap.scala @@ -52,6 +52,8 @@ class CaseInsensitiveMap[T] private (val originalMap: Map[String, T]) extends Ma override def -(key: String): Map[String, T] = { new CaseInsensitiveMap(originalMap.filter(!_._1.equalsIgnoreCase(key))) } + + def toMap: Map[String, T] = originalMap } object CaseInsensitiveMap { diff --git a/sql/catalyst/src/main/scala-2.13/org/apache/spark/sql/catalyst/util/CaseInsensitiveMap.scala b/sql/catalyst/src/main/scala-2.13/org/apache/spark/sql/catalyst/util/CaseInsensitiveMap.scala index 66a2f25..1db4b7a4 100644 --- a/sql/catalyst/src/main/scala-2.13/org/apache/spark/sql/catalyst/util/CaseInsensitiveMap.scala +++ b/sql/catalyst/src/main/scala-2.13/org/apache/spark/sql/catalyst/util/CaseInsensitiveMap.scala @@ -52,6 +52,8 @@ class CaseInsensitiveMap[T] private (val originalMap: Map[String, T]) extends Ma override def removed(key: String): Map[String, T] = { new CaseInsensitiveMap(originalMap.filter(!_._1.equalsIgnoreCase(key))) } + + def toMap: Map[String, T] = originalMap } object CaseInsensitiveMap { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala index a291e58..5b78690 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala @@ -94,6 +94,9 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { /** * Adds an input option for the underlying data source. * + * All options are maintained in a case-insensitive way in terms of key names. + * If a new option has the same key case-insensitively, it will override the existing option. + * * You can set the following option(s): * <ul> * <li>`timeZone` (default session local timezone): sets the string that indicates a time zone ID @@ -121,6 +124,9 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { /** * Adds an input option for the underlying data source. * + * All options are maintained in a case-insensitive way in terms of key names. + * If a new option has the same key case-insensitively, it will override the existing option. + * * @since 2.0.0 */ def option(key: String, value: Boolean): DataFrameReader = option(key, value.toString) @@ -128,6 +134,9 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { /** * Adds an input option for the underlying data source. * + * All options are maintained in a case-insensitive way in terms of key names. + * If a new option has the same key case-insensitively, it will override the existing option. + * * @since 2.0.0 */ def option(key: String, value: Long): DataFrameReader = option(key, value.toString) @@ -135,6 +144,9 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { /** * Adds an input option for the underlying data source. * + * All options are maintained in a case-insensitive way in terms of key names. + * If a new option has the same key case-insensitively, it will override the existing option. + * * @since 2.0.0 */ def option(key: String, value: Double): DataFrameReader = option(key, value.toString) @@ -142,6 +154,9 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { /** * (Scala-specific) Adds input options for the underlying data source. * + * All options are maintained in a case-insensitive way in terms of key names. + * If a new option has the same key case-insensitively, it will override the existing option. + * * You can set the following option(s): * <ul> * <li>`timeZone` (default session local timezone): sets the string that indicates a time zone ID @@ -169,6 +184,9 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { /** * Adds input options for the underlying data source. * + * All options are maintained in a case-insensitive way in terms of key names. + * If a new option has the same key case-insensitively, it will override the existing option. + * * You can set the following option(s): * <ul> * <li>`timeZone` (default session local timezone): sets the string that indicates a time zone ID @@ -361,7 +379,7 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { connectionProperties: Properties): DataFrame = { assertNoSpecifiedSchema("jdbc") // connectionProperties should override settings in extraOptions. - val params = extraOptions.toMap ++ connectionProperties.asScala.toMap + val params = extraOptions ++ connectionProperties.asScala val options = new JDBCOptions(url, table, params) val parts: Array[Partition] = predicates.zipWithIndex.map { case (part, i) => JDBCPartition(part, i) : Partition diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index 952f896..f463166 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -105,6 +105,9 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { /** * Adds an output option for the underlying data source. * + * All options are maintained in a case-insensitive way in terms of key names. + * If a new option has the same key case-insensitively, it will override the existing option. + * * You can set the following option(s): * <ul> * <li>`timeZone` (default session local timezone): sets the string that indicates a time zone ID @@ -132,6 +135,9 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { /** * Adds an output option for the underlying data source. * + * All options are maintained in a case-insensitive way in terms of key names. + * If a new option has the same key case-insensitively, it will override the existing option. + * * @since 2.0.0 */ def option(key: String, value: Boolean): DataFrameWriter[T] = option(key, value.toString) @@ -139,6 +145,9 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { /** * Adds an output option for the underlying data source. * + * All options are maintained in a case-insensitive way in terms of key names. + * If a new option has the same key case-insensitively, it will override the existing option. + * * @since 2.0.0 */ def option(key: String, value: Long): DataFrameWriter[T] = option(key, value.toString) @@ -146,6 +155,9 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { /** * Adds an output option for the underlying data source. * + * All options are maintained in a case-insensitive way in terms of key names. + * If a new option has the same key case-insensitively, it will override the existing option. + * * @since 2.0.0 */ def option(key: String, value: Double): DataFrameWriter[T] = option(key, value.toString) @@ -153,6 +165,9 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { /** * (Scala-specific) Adds output options for the underlying data source. * + * All options are maintained in a case-insensitive way in terms of key names. + * If a new option has the same key case-insensitively, it will override the existing option. + * * You can set the following option(s): * <ul> * <li>`timeZone` (default session local timezone): sets the string that indicates a time zone ID @@ -180,6 +195,9 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { /** * Adds output options for the underlying data source. * + * All options are maintained in a case-insensitive way in terms of key names. + * If a new option has the same key case-insensitively, it will override the existing option. + * * You can set the following option(s): * <ul> * <li>`timeZone` (default session local timezone): sets the string that indicates a time zone ID @@ -288,7 +306,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { val provider = maybeV2Provider.get val sessionOptions = DataSourceV2Utils.extractSessionConfigs( provider, df.sparkSession.sessionState.conf) - val options = sessionOptions ++ extraOptions + val options = sessionOptions.filterKeys(!extraOptions.contains(_)) ++ extraOptions.toMap val dsOptions = new CaseInsensitiveStringMap(options.asJava) def getTable: Table = { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala index 40237bb..b554754 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala @@ -21,6 +21,8 @@ import java.math.BigDecimal import java.sql.{Date, DriverManager, SQLException, Timestamp} import java.util.{Calendar, GregorianCalendar, Properties} +import scala.collection.JavaConverters._ + import org.h2.jdbc.JdbcSQLException import org.scalatest.{BeforeAndAfter, PrivateMethodTester} @@ -1716,4 +1718,23 @@ class JDBCSuite extends QueryTest jdbcDF = sqlContext.jdbc(urlWithUserAndPass, "TEST.PEOPLE", parts) checkAnswer(jdbcDF, Row("mary", 2) :: Nil) } + + test("SPARK-32364: JDBCOption constructor") { + val extraOptions = CaseInsensitiveMap[String](Map("UrL" -> "url1", "dBTable" -> "table1")) + val connectionProperties = new Properties() + connectionProperties.put("url", "url2") + connectionProperties.put("dbtable", "table2") + + // connection property should override the options in extraOptions + val params = extraOptions ++ connectionProperties.asScala + assert(params.size == 2) + assert(params.get("uRl").contains("url2")) + assert(params.get("DbtaBle").contains("table2")) + + // JDBCOptions constructor parameter should overwrite the existing conf + val options = new JDBCOptions(url, "table3", params) + assert(options.asProperties.size == 2) + assert(options.asProperties.get("url") == url) + assert(options.asProperties.get("dbtable") == "table3") + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org