This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push: new 44acb5a [SPARK-32832][SS] Use CaseInsensitiveMap for DataStreamReader/Writer options 44acb5a is described below commit 44acb5a03a1c3cebe0935ec6f2b2d59afbb8f7e2 Author: Dongjoon Hyun <dongj...@apache.org> AuthorDate: Wed Sep 9 23:41:32 2020 -0700 [SPARK-32832][SS] Use CaseInsensitiveMap for DataStreamReader/Writer options This PR aims to fix indeterministic behavior on DataStreamReader/Writer options like the following. ```scala scala> spark.readStream.format("parquet").option("paTh", "1").option("PATH", "2").option("Path", "3").option("patH", "4").option("path", "5").load() org.apache.spark.sql.AnalysisException: Path does not exist: 1; ``` This will make the behavior deterministic. Yes, but the previous behavior is indeterministic. Pass the newly test cases. Closes #29702 from dongjoon-hyun/SPARK-32832. Authored-by: Dongjoon Hyun <dongj...@apache.org> Signed-off-by: Dongjoon Hyun <dongj...@apache.org> (cherry picked from commit 2f85f9516cfc33a376871cf27f9fb4ac30ecbed8) Signed-off-by: Dongjoon Hyun <dongj...@apache.org> --- .../spark/sql/streaming/DataStreamReader.scala | 5 ++-- .../spark/sql/streaming/DataStreamWriter.scala | 5 ++-- .../test/DataStreamReaderWriterSuite.scala | 31 ++++++++++++++++++++++ 3 files changed, 37 insertions(+), 4 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala index 1d7e4d3..6b30949 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala @@ -24,6 +24,7 @@ import scala.collection.JavaConverters._ import org.apache.spark.annotation.Evolving import org.apache.spark.internal.Logging import org.apache.spark.sql.{AnalysisException, DataFrame, Dataset, SparkSession} +import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap import org.apache.spark.sql.connector.catalog.{SupportsRead, TableProvider} import org.apache.spark.sql.connector.catalog.TableCapability._ import org.apache.spark.sql.execution.command.DDLUtils @@ -210,7 +211,7 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo case provider: TableProvider if !provider.isInstanceOf[FileDataSourceV2] => val sessionOptions = DataSourceV2Utils.extractSessionConfigs( source = provider, conf = sparkSession.sessionState.conf) - val options = sessionOptions ++ extraOptions + val options = sessionOptions ++ extraOptions.toMap val dsOptions = new CaseInsensitiveStringMap(options.asJava) val table = DataSourceV2Utils.getTableFromProvider(provider, dsOptions, userSpecifiedSchema) import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Implicits._ @@ -520,5 +521,5 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo private var userSpecifiedSchema: Option[StructType] = None - private var extraOptions = new scala.collection.mutable.HashMap[String, String] + private var extraOptions = CaseInsensitiveMap[String](Map.empty) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala index 1d0ca4d..07ab400 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala @@ -26,6 +26,7 @@ import org.apache.spark.annotation.Evolving import org.apache.spark.api.java.function.VoidFunction2 import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.streaming.InternalOutputModes +import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap import org.apache.spark.sql.connector.catalog.{SupportsWrite, TableProvider} import org.apache.spark.sql.connector.catalog.TableCapability._ import org.apache.spark.sql.execution.command.DDLUtils @@ -349,7 +350,7 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) { val provider = cls.getConstructor().newInstance().asInstanceOf[TableProvider] val sessionOptions = DataSourceV2Utils.extractSessionConfigs( source = provider, conf = df.sparkSession.sessionState.conf) - val options = sessionOptions ++ extraOptions + val options = sessionOptions ++ extraOptions.toMap val dsOptions = new CaseInsensitiveStringMap(options.asJava) val table = DataSourceV2Utils.getTableFromProvider( provider, dsOptions, userSpecifiedSchema = None) @@ -472,7 +473,7 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) { private var trigger: Trigger = Trigger.ProcessingTime(0L) - private var extraOptions = new scala.collection.mutable.HashMap[String, String] + private var extraOptions = CaseInsensitiveMap[String](Map.empty) private var foreachWriter: ForeachWriter[T] = null diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala index b646387..d90af35 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala @@ -185,6 +185,37 @@ class DataStreamReaderWriterSuite extends StreamTest with BeforeAndAfter { query.stop() } + test("SPARK-32832: later option should override earlier options for load()") { + spark.readStream + .format("org.apache.spark.sql.streaming.test") + .option("paTh", "1") + .option("PATH", "2") + .option("Path", "3") + .option("patH", "4") + .option("path", "5") + .load() + assert(LastOptions.parameters("path") == "5") + } + + test("SPARK-32832: later option should override earlier options for start()") { + val ds = spark.readStream + .format("org.apache.spark.sql.streaming.test") + .load() + assert(LastOptions.parameters.isEmpty) + + val query = ds.writeStream + .format("org.apache.spark.sql.streaming.test") + .option("checkpointLocation", newMetadataDir) + .option("paTh", "1") + .option("PATH", "2") + .option("Path", "3") + .option("patH", "4") + .option("path", "5") + .start() + assert(LastOptions.parameters("path") == "5") + query.stop() + } + test("partitioning") { val df = spark.readStream .format("org.apache.spark.sql.streaming.test") --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org