This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new d5e13234dfb [SPARK-45399][SQL] XML: Add XML Options using newOption d5e13234dfb is described below commit d5e13234dfb29566c33427ccd2c4318f7dcdb6d2 Author: Sandip Agarwala <131817656+sandip...@users.noreply.github.com> AuthorDate: Wed Oct 4 08:35:48 2023 +0900 [SPARK-45399][SQL] XML: Add XML Options using newOption ### What changes were proposed in this pull request? Add XML Options using `DataSourceOptions::newOption` Also, removed legacy date and timestamp support as these are not applicable to newly introduced XML data source. ### Why are the changes needed? Consistency with other formats like json and csv. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? XML unit tests and github action ### Was this patch authored or co-authored using generative AI tooling? No Closes #43201 from sandip-db/xml-cleanup-options. Authored-by: Sandip Agarwala <131817656+sandip...@users.noreply.github.com> Signed-off-by: Hyukjin Kwon <gurwls...@apache.org> --- .../spark/sql/catalyst/xml/StaxXmlParser.scala | 19 ----- .../apache/spark/sql/catalyst/xml/TypeCast.scala | 23 +----- .../apache/spark/sql/catalyst/xml/XmlOptions.scala | 87 +++++++++++----------- .../sql/execution/datasources/xml/XmlSuite.scala | 2 +- 4 files changed, 46 insertions(+), 85 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/StaxXmlParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/StaxXmlParser.scala index 71022ba281c..ac29e234e5f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/StaxXmlParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/StaxXmlParser.scala @@ -35,7 +35,6 @@ import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, BadRecordException import org.apache.spark.sql.catalyst.xml.StaxXmlParser.convertStream import org.apache.spark.sql.catalyst.xml.TypeCast._ import org.apache.spark.sql.errors.QueryExecutionErrors -import org.apache.spark.sql.internal.{LegacyBehaviorPolicy, SQLConf} import org.apache.spark.sql.sources.Filter import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.UTF8String @@ -47,24 +46,6 @@ class StaxXmlParser( private val factory = options.buildXmlFactory() - // Flags to signal if we need to fall back to the backward compatible behavior of parsing - // dates and timestamps. - // For more information, see comments for "enableDateTimeParsingFallback" option in XmlOptions. - private val enableParsingFallbackForTimestampType = - options.enableDateTimeParsingFallback - .orElse(SQLConf.get.jsonEnableDateTimeParsingFallback) - .getOrElse { - SQLConf.get.legacyTimeParserPolicy == LegacyBehaviorPolicy.LEGACY || - options.timestampFormatInRead.isEmpty - } - private val enableParsingFallbackForDateType = - options.enableDateTimeParsingFallback - .orElse(SQLConf.get.jsonEnableDateTimeParsingFallback) - .getOrElse { - SQLConf.get.legacyTimeParserPolicy == LegacyBehaviorPolicy.LEGACY || - options.dateFormatInRead.isEmpty - } - /** * Parses a single XML string and turns it into either one resulting row or no row (if the * the record is malformed). diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/TypeCast.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/TypeCast.scala index b065dd41f28..3315196ffc7 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/TypeCast.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/TypeCast.scala @@ -22,10 +22,7 @@ import java.util.Locale import scala.util.Try import scala.util.control.Exception._ -import scala.util.control.NonFatal -import org.apache.spark.sql.catalyst.util.DateTimeUtils -import org.apache.spark.sql.internal.{LegacyBehaviorPolicy, SQLConf} import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.UTF8String @@ -85,25 +82,7 @@ private[sql] object TypeCast { } private def parseXmlTimestamp(value: String, options: XmlOptions): Long = { - try { - options.timestampFormatter.parse(value) - } catch { - case NonFatal(e) => - // If fails to parse, then tries the way used in 2.0 and 1.x for backwards - // compatibility if enabled. - val enableParsingFallbackForTimestampType = - options.enableDateTimeParsingFallback - .orElse(SQLConf.get.jsonEnableDateTimeParsingFallback) - .getOrElse { - SQLConf.get.legacyTimeParserPolicy == LegacyBehaviorPolicy.LEGACY || - options.timestampFormatInRead.isEmpty - } - if (!enableParsingFallbackForTimestampType) { - throw e - } - val str = DateTimeUtils.cleanLegacyTimestampStr(UTF8String.fromString(value)) - DateTimeUtils.stringToTimestamp(str, options.zoneId).getOrElse(throw e) - } + options.timestampFormatter.parse(value) } // TODO: This function unnecessarily does type dispatch. Should merge it with `castTo`. diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/XmlOptions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/XmlOptions.scala index 3aef9916a89..d0cfff87279 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/XmlOptions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/XmlOptions.scala @@ -42,7 +42,7 @@ private[sql] class XmlOptions( def this( parameters: Map[String, String] = Map.empty, defaultTimeZoneId: String = SQLConf.get.sessionLocalTimeZone, - defaultColumnNameOfCorruptRecord: String = "") = { + defaultColumnNameOfCorruptRecord: String = SQLConf.get.columnNameOfCorruptRecord) = { this( CaseInsensitiveMap(parameters), defaultTimeZoneId, @@ -62,42 +62,39 @@ private[sql] class XmlOptions( } } - val compressionCodec = parameters.get("compression").orElse(parameters.get("codec")) - .map(CompressionCodecs.getCodecClassName) - val rowTag = parameters.getOrElse("rowTag", XmlOptions.DEFAULT_ROW_TAG) - require(rowTag.nonEmpty, "'rowTag' option should not be empty string.") + val compressionCodec = parameters.get(COMPRESSION).map(CompressionCodecs.getCodecClassName) + val rowTag = parameters.getOrElse(ROW_TAG, XmlOptions.DEFAULT_ROW_TAG) + require(rowTag.nonEmpty, s"'$ROW_TAG' option should not be empty string.") require(!rowTag.startsWith("<") && !rowTag.endsWith(">"), - "'rowTag' should not include angle brackets") - val rootTag = parameters.getOrElse("rootTag", XmlOptions.DEFAULT_ROOT_TAG) + s"'$ROW_TAG' should not include angle brackets") + val rootTag = parameters.getOrElse(ROOT_TAG, XmlOptions.DEFAULT_ROOT_TAG) require(!rootTag.startsWith("<") && !rootTag.endsWith(">"), - "'rootTag' should not include angle brackets") - val declaration = parameters.getOrElse("declaration", XmlOptions.DEFAULT_DECLARATION) + s"'$ROOT_TAG' should not include angle brackets") + val declaration = parameters.getOrElse(DECLARATION, XmlOptions.DEFAULT_DECLARATION) require(!declaration.startsWith("<") && !declaration.endsWith(">"), - "'declaration' should not include angle brackets") - val arrayElementName = parameters.getOrElse("arrayElementName", + s"'$DECLARATION' should not include angle brackets") + val arrayElementName = parameters.getOrElse(ARRAY_ELEMENT_NAME, XmlOptions.DEFAULT_ARRAY_ELEMENT_NAME) - val samplingRatio = parameters.get("samplingRatio").map(_.toDouble).getOrElse(1.0) - require(samplingRatio > 0, s"samplingRatio ($samplingRatio) should be greater than 0") - val excludeAttributeFlag = parameters.get("excludeAttribute").map(_.toBoolean).getOrElse(false) - val treatEmptyValuesAsNulls = - parameters.get("treatEmptyValuesAsNulls").map(_.toBoolean).getOrElse(false) + val samplingRatio = parameters.get(SAMPLING_RATIO).map(_.toDouble).getOrElse(1.0) + require(samplingRatio > 0, s"$SAMPLING_RATIO ($samplingRatio) should be greater than 0") + val excludeAttributeFlag = getBool(EXCLUDE_ATTRIBUTE, false) + val treatEmptyValuesAsNulls = getBool(TREAT_EMPTY_VALUE_AS_NULLS, false) val attributePrefix = - parameters.getOrElse("attributePrefix", XmlOptions.DEFAULT_ATTRIBUTE_PREFIX) - val valueTag = parameters.getOrElse("valueTag", XmlOptions.DEFAULT_VALUE_TAG) - require(valueTag.nonEmpty, "'valueTag' option should not be empty string.") + parameters.getOrElse(ATTRIBUTE_PREFIX, XmlOptions.DEFAULT_ATTRIBUTE_PREFIX) + val valueTag = parameters.getOrElse(VALUE_TAG, XmlOptions.DEFAULT_VALUE_TAG) + require(valueTag.nonEmpty, s"'$VALUE_TAG' option should not be empty string.") require(valueTag != attributePrefix, - "'valueTag' and 'attributePrefix' options should not be the same.") - val nullValue = parameters.getOrElse("nullValue", XmlOptions.DEFAULT_NULL_VALUE) + s"'$VALUE_TAG' and '$ATTRIBUTE_PREFIX' options should not be the same.") + val nullValue = parameters.getOrElse(NULL_VALUE, XmlOptions.DEFAULT_NULL_VALUE) val columnNameOfCorruptRecord = - parameters.getOrElse("columnNameOfCorruptRecord", "_corrupt_record") - val ignoreSurroundingSpaces = - parameters.get("ignoreSurroundingSpaces").map(_.toBoolean).getOrElse(false) - val parseMode = ParseMode.fromString(parameters.getOrElse("mode", PermissiveMode.name)) - val inferSchema = parameters.get("inferSchema").map(_.toBoolean).getOrElse(true) - val rowValidationXSDPath = parameters.get("rowValidationXSDPath").orNull + parameters.getOrElse(COLUMN_NAME_OF_CORRUPT_RECORD, defaultColumnNameOfCorruptRecord) + val ignoreSurroundingSpaces = getBool(IGNORE_SURROUNDING_SPACES, false) + val parseMode = ParseMode.fromString(parameters.getOrElse(MODE, PermissiveMode.name)) + val inferSchema = getBool(INFER_SCHEMA, true) + val rowValidationXSDPath = parameters.get(ROW_VALIDATION_XSD_PATH).orNull val wildcardColName = - parameters.getOrElse("wildcardColName", XmlOptions.DEFAULT_WILDCARD_COL_NAME) - val ignoreNamespace = parameters.get("ignoreNamespace").map(_.toBoolean).getOrElse(false) + parameters.getOrElse(WILDCARD_COL_NAME, XmlOptions.DEFAULT_WILDCARD_COL_NAME) + val ignoreNamespace = getBool(IGNORE_NAMESPACE, false) /** * Infer columns with all valid date entries as date type (otherwise inferred as string or @@ -142,17 +139,6 @@ private[sql] class XmlOptions( s"${DateFormatter.defaultPattern}'T'HH:mm:ss[.SSS][XXX]" }) - // SPARK-39731: Enables the backward compatible parsing behavior. - // Generally, this config should be set to false to avoid producing potentially incorrect results - // which is the current default (see JacksonParser). - // - // If enabled and the date cannot be parsed, we will fall back to `DateTimeUtils.stringToDate`. - // If enabled and the timestamp cannot be parsed, `DateTimeUtils.stringToTimestamp` will be used. - // Otherwise, depending on the parser policy and a custom pattern, an exception may be thrown and - // the value will be parsed as null. - val enableDateTimeParsingFallback: Option[Boolean] = - parameters.get(ENABLE_DATETIME_PARSING_FALLBACK).map(_.toBoolean) - val timezone = parameters.get("timezone") val zoneId: ZoneId = DateTimeUtils.getZoneId( @@ -207,19 +193,34 @@ private[sql] object XmlOptions extends DataSourceOptions { val DEFAULT_CHARSET: String = StandardCharsets.UTF_8.name val DEFAULT_NULL_VALUE: String = null val DEFAULT_WILDCARD_COL_NAME = "xs_any" + val ROW_TAG = newOption("rowTag") + val ROOT_TAG = newOption("rootTag") + val DECLARATION = newOption("declaration") + val ARRAY_ELEMENT_NAME = newOption("arrayElementName") + val EXCLUDE_ATTRIBUTE = newOption("excludeAttribute") + val TREAT_EMPTY_VALUE_AS_NULLS = newOption("treatEmptyValuesAsNulls") + val ATTRIBUTE_PREFIX = newOption("attributePrefix") + val VALUE_TAG = newOption("valueTag") + val NULL_VALUE = newOption("nullValue") + val IGNORE_SURROUNDING_SPACES = newOption("ignoreSurroundingSpaces") + val ROW_VALIDATION_XSD_PATH = newOption("rowValidationXSDPath") + val WILDCARD_COL_NAME = newOption("wildcardColName") + val IGNORE_NAMESPACE = newOption("ignoreNamespace") + val INFER_SCHEMA = newOption("inferSchema") val PREFER_DATE = newOption("preferDate") + val MODE = newOption("mode") val LOCALE = newOption("locale") val COMPRESSION = newOption("compression") - val ENABLE_DATETIME_PARSING_FALLBACK = newOption("enableDateTimeParsingFallback") val MULTI_LINE = newOption("multiLine") + val SAMPLING_RATIO = newOption("samplingRatio") + val COLUMN_NAME_OF_CORRUPT_RECORD = newOption("columnNameOfCorruptRecord") val DATE_FORMAT = newOption("dateFormat") val TIMESTAMP_FORMAT = newOption("timestampFormat") + val TIME_ZONE = newOption("timeZone") // Options with alternative val ENCODING = "encoding" val CHARSET = "charset" newOption(ENCODING, CHARSET) - val TIME_ZONE = "timezone" - newOption(DateTimeUtils.TIMEZONE_OPTION, TIME_ZONE) def apply(parameters: Map[String, String]): XmlOptions = new XmlOptions(parameters, SQLConf.get.sessionLocalTimeZone) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/xml/XmlSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/xml/XmlSuite.scala index 7c0e8c6785f..aed0939f04d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/xml/XmlSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/xml/XmlSuite.scala @@ -332,7 +332,7 @@ class XmlSuite extends QueryTest with SharedSparkSession { val cars = spark.read.xml(getTestResourcePath(resDir + "cars.xml")) cars.write .mode(SaveMode.Overwrite) - .options(Map("codec" -> classOf[GzipCodec].getName)) + .options(Map("compression" -> classOf[GzipCodec].getName)) .xml(copyFilePath.toString) // Check that the part file has a .gz extension assert(Files.list(copyFilePath).iterator().asScala --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org