This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new f6f6026  [SPARK-32364][SQL][FOLLOWUP] Add toMap to return originalMap 
and documentation
f6f6026 is described below

commit f6f6026965d59e1f79dec1740e1b2056adea94e8
Author: Dongjoon Hyun <dongj...@apache.org>
AuthorDate: Thu Jul 23 06:28:08 2020 -0700

    [SPARK-32364][SQL][FOLLOWUP] Add toMap to return originalMap and 
documentation
    
    ### What changes were proposed in this pull request?
    
    This is a follow-up of https://github.com/apache/spark/pull/29160. We 
already removed the indeterministicity. This PR aims the following for the 
existing code base.
    1. Add an explicit document to `DataFrameReader/DataFrameWriter`.
    
    2. Add `toMap` to `CaseInsensitiveMap` in order to return `originalMap: 
Map[String, T]` because it's more consistent with the existing `case-sensitive 
key names` behavior for the existing code pattern like `AppendData.byName(..., 
extraOptions.toMap)`. Previously, it was `HashMap.toMap`.
    
    3. During (2), we need to change the following to keep the original logic 
using `CaseInsensitiveMap.++`.
    ```scala
    - val params = extraOptions.toMap ++ connectionProperties.asScala.toMap
    + val params = extraOptions ++ connectionProperties.asScala
    ```
    
    4. Additionally, use `.toMap` in the following because 
`dsOptions.asCaseSensitiveMap()` is used later.
    ```scala
    - val options = sessionOptions ++ extraOptions
    + val options = sessionOptions.filterKeys(!extraOptions.contains(_)) ++ 
extraOptions.toMap
      val dsOptions = new CaseInsensitiveStringMap(options.asJava)
    ```
    
    ### Why are the changes needed?
    
    `extraOptions.toMap` is used in several places (e.g. `DataFrameReader`) to 
hand over `Map[String, T]`. In this case, `CaseInsensitiveMap[T] private (val 
originalMap: Map[String, T])` had better return `originalMap`.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Pass the Jenkins or GitHub Action with the existing tests and newly add 
test case at `JDBCSuite`.
    
    Closes #29191 from dongjoon-hyun/SPARK-32364-3.
    
    Authored-by: Dongjoon Hyun <dongj...@apache.org>
    Signed-off-by: Dongjoon Hyun <dongj...@apache.org>
    (cherry picked from commit aed8dbab1d6725eb17f743c300451fcbdbfa3e97)
    Signed-off-by: Dongjoon Hyun <dongj...@apache.org>
---
 .../sql/catalyst/util/CaseInsensitiveMap.scala      |  2 ++
 .../sql/catalyst/util/CaseInsensitiveMap.scala      |  2 ++
 .../org/apache/spark/sql/DataFrameReader.scala      | 20 +++++++++++++++++++-
 .../org/apache/spark/sql/DataFrameWriter.scala      | 20 +++++++++++++++++++-
 .../scala/org/apache/spark/sql/jdbc/JDBCSuite.scala | 21 +++++++++++++++++++++
 5 files changed, 63 insertions(+), 2 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala-2.12/org/apache/spark/sql/catalyst/util/CaseInsensitiveMap.scala
 
b/sql/catalyst/src/main/scala-2.12/org/apache/spark/sql/catalyst/util/CaseInsensitiveMap.scala
index c013888..14b8f62 100644
--- 
a/sql/catalyst/src/main/scala-2.12/org/apache/spark/sql/catalyst/util/CaseInsensitiveMap.scala
+++ 
b/sql/catalyst/src/main/scala-2.12/org/apache/spark/sql/catalyst/util/CaseInsensitiveMap.scala
@@ -52,6 +52,8 @@ class CaseInsensitiveMap[T] private (val originalMap: 
Map[String, T]) extends Ma
   override def -(key: String): Map[String, T] = {
     new CaseInsensitiveMap(originalMap.filter(!_._1.equalsIgnoreCase(key)))
   }
+
+  def toMap: Map[String, T] = originalMap
 }
 
 object CaseInsensitiveMap {
diff --git 
a/sql/catalyst/src/main/scala-2.13/org/apache/spark/sql/catalyst/util/CaseInsensitiveMap.scala
 
b/sql/catalyst/src/main/scala-2.13/org/apache/spark/sql/catalyst/util/CaseInsensitiveMap.scala
index 66a2f25..1db4b7a4 100644
--- 
a/sql/catalyst/src/main/scala-2.13/org/apache/spark/sql/catalyst/util/CaseInsensitiveMap.scala
+++ 
b/sql/catalyst/src/main/scala-2.13/org/apache/spark/sql/catalyst/util/CaseInsensitiveMap.scala
@@ -52,6 +52,8 @@ class CaseInsensitiveMap[T] private (val originalMap: 
Map[String, T]) extends Ma
   override def removed(key: String): Map[String, T] = {
     new CaseInsensitiveMap(originalMap.filter(!_._1.equalsIgnoreCase(key)))
   }
+
+  def toMap: Map[String, T] = originalMap
 }
 
 object CaseInsensitiveMap {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
index a291e58..5b78690 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
@@ -94,6 +94,9 @@ class DataFrameReader private[sql](sparkSession: 
SparkSession) extends Logging {
   /**
    * Adds an input option for the underlying data source.
    *
+   * All options are maintained in a case-insensitive way in terms of key 
names.
+   * If a new option has the same key case-insensitively, it will override the 
existing option.
+   *
    * You can set the following option(s):
    * <ul>
    * <li>`timeZone` (default session local timezone): sets the string that 
indicates a time zone ID
@@ -121,6 +124,9 @@ class DataFrameReader private[sql](sparkSession: 
SparkSession) extends Logging {
   /**
    * Adds an input option for the underlying data source.
    *
+   * All options are maintained in a case-insensitive way in terms of key 
names.
+   * If a new option has the same key case-insensitively, it will override the 
existing option.
+   *
    * @since 2.0.0
    */
   def option(key: String, value: Boolean): DataFrameReader = option(key, 
value.toString)
@@ -128,6 +134,9 @@ class DataFrameReader private[sql](sparkSession: 
SparkSession) extends Logging {
   /**
    * Adds an input option for the underlying data source.
    *
+   * All options are maintained in a case-insensitive way in terms of key 
names.
+   * If a new option has the same key case-insensitively, it will override the 
existing option.
+   *
    * @since 2.0.0
    */
   def option(key: String, value: Long): DataFrameReader = option(key, 
value.toString)
@@ -135,6 +144,9 @@ class DataFrameReader private[sql](sparkSession: 
SparkSession) extends Logging {
   /**
    * Adds an input option for the underlying data source.
    *
+   * All options are maintained in a case-insensitive way in terms of key 
names.
+   * If a new option has the same key case-insensitively, it will override the 
existing option.
+   *
    * @since 2.0.0
    */
   def option(key: String, value: Double): DataFrameReader = option(key, 
value.toString)
@@ -142,6 +154,9 @@ class DataFrameReader private[sql](sparkSession: 
SparkSession) extends Logging {
   /**
    * (Scala-specific) Adds input options for the underlying data source.
    *
+   * All options are maintained in a case-insensitive way in terms of key 
names.
+   * If a new option has the same key case-insensitively, it will override the 
existing option.
+   *
    * You can set the following option(s):
    * <ul>
    * <li>`timeZone` (default session local timezone): sets the string that 
indicates a time zone ID
@@ -169,6 +184,9 @@ class DataFrameReader private[sql](sparkSession: 
SparkSession) extends Logging {
   /**
    * Adds input options for the underlying data source.
    *
+   * All options are maintained in a case-insensitive way in terms of key 
names.
+   * If a new option has the same key case-insensitively, it will override the 
existing option.
+   *
    * You can set the following option(s):
    * <ul>
    * <li>`timeZone` (default session local timezone): sets the string that 
indicates a time zone ID
@@ -361,7 +379,7 @@ class DataFrameReader private[sql](sparkSession: 
SparkSession) extends Logging {
       connectionProperties: Properties): DataFrame = {
     assertNoSpecifiedSchema("jdbc")
     // connectionProperties should override settings in extraOptions.
-    val params = extraOptions.toMap ++ connectionProperties.asScala.toMap
+    val params = extraOptions ++ connectionProperties.asScala
     val options = new JDBCOptions(url, table, params)
     val parts: Array[Partition] = predicates.zipWithIndex.map { case (part, i) 
=>
       JDBCPartition(part, i) : Partition
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index 952f896..f463166 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -105,6 +105,9 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) 
{
   /**
    * Adds an output option for the underlying data source.
    *
+   * All options are maintained in a case-insensitive way in terms of key 
names.
+   * If a new option has the same key case-insensitively, it will override the 
existing option.
+   *
    * You can set the following option(s):
    * <ul>
    * <li>`timeZone` (default session local timezone): sets the string that 
indicates a time zone ID
@@ -132,6 +135,9 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) 
{
   /**
    * Adds an output option for the underlying data source.
    *
+   * All options are maintained in a case-insensitive way in terms of key 
names.
+   * If a new option has the same key case-insensitively, it will override the 
existing option.
+   *
    * @since 2.0.0
    */
   def option(key: String, value: Boolean): DataFrameWriter[T] = option(key, 
value.toString)
@@ -139,6 +145,9 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) 
{
   /**
    * Adds an output option for the underlying data source.
    *
+   * All options are maintained in a case-insensitive way in terms of key 
names.
+   * If a new option has the same key case-insensitively, it will override the 
existing option.
+   *
    * @since 2.0.0
    */
   def option(key: String, value: Long): DataFrameWriter[T] = option(key, 
value.toString)
@@ -146,6 +155,9 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) 
{
   /**
    * Adds an output option for the underlying data source.
    *
+   * All options are maintained in a case-insensitive way in terms of key 
names.
+   * If a new option has the same key case-insensitively, it will override the 
existing option.
+   *
    * @since 2.0.0
    */
   def option(key: String, value: Double): DataFrameWriter[T] = option(key, 
value.toString)
@@ -153,6 +165,9 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) 
{
   /**
    * (Scala-specific) Adds output options for the underlying data source.
    *
+   * All options are maintained in a case-insensitive way in terms of key 
names.
+   * If a new option has the same key case-insensitively, it will override the 
existing option.
+   *
    * You can set the following option(s):
    * <ul>
    * <li>`timeZone` (default session local timezone): sets the string that 
indicates a time zone ID
@@ -180,6 +195,9 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) 
{
   /**
    * Adds output options for the underlying data source.
    *
+   * All options are maintained in a case-insensitive way in terms of key 
names.
+   * If a new option has the same key case-insensitively, it will override the 
existing option.
+   *
    * You can set the following option(s):
    * <ul>
    * <li>`timeZone` (default session local timezone): sets the string that 
indicates a time zone ID
@@ -288,7 +306,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) 
{
       val provider = maybeV2Provider.get
       val sessionOptions = DataSourceV2Utils.extractSessionConfigs(
         provider, df.sparkSession.sessionState.conf)
-      val options = sessionOptions ++ extraOptions
+      val options = sessionOptions.filterKeys(!extraOptions.contains(_)) ++ 
extraOptions.toMap
       val dsOptions = new CaseInsensitiveStringMap(options.asJava)
 
       def getTable: Table = {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
index 40237bb..b554754 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
@@ -21,6 +21,8 @@ import java.math.BigDecimal
 import java.sql.{Date, DriverManager, SQLException, Timestamp}
 import java.util.{Calendar, GregorianCalendar, Properties}
 
+import scala.collection.JavaConverters._
+
 import org.h2.jdbc.JdbcSQLException
 import org.scalatest.{BeforeAndAfter, PrivateMethodTester}
 
@@ -1716,4 +1718,23 @@ class JDBCSuite extends QueryTest
     jdbcDF = sqlContext.jdbc(urlWithUserAndPass, "TEST.PEOPLE", parts)
     checkAnswer(jdbcDF, Row("mary", 2) :: Nil)
   }
+
+  test("SPARK-32364: JDBCOption constructor") {
+    val extraOptions = CaseInsensitiveMap[String](Map("UrL" -> "url1", 
"dBTable" -> "table1"))
+    val connectionProperties = new Properties()
+    connectionProperties.put("url", "url2")
+    connectionProperties.put("dbtable", "table2")
+
+    // connection property should override the options in extraOptions
+    val params = extraOptions ++ connectionProperties.asScala
+    assert(params.size == 2)
+    assert(params.get("uRl").contains("url2"))
+    assert(params.get("DbtaBle").contains("table2"))
+
+    // JDBCOptions constructor parameter should overwrite the existing conf
+    val options = new JDBCOptions(url, "table3", params)
+    assert(options.asProperties.size == 2)
+    assert(options.asProperties.get("url") == url)
+    assert(options.asProperties.get("dbtable") == "table3")
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to