This is an automated email from the ASF dual-hosted git repository. viirya pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 1676648 [MINOR][SQL] Use SQLConf.resolver for caseSensitiveResolution/caseInsensitiveResolution 1676648 is described below commit 167664896d4b0ca656bbb2ab6d5a045411e64cd1 Author: Huaxin Gao <huaxin_...@apple.com> AuthorDate: Mon Oct 4 12:57:48 2021 -0700 [MINOR][SQL] Use SQLConf.resolver for caseSensitiveResolution/caseInsensitiveResolution ### What changes were proposed in this pull request? Use `SQLConf.resolver` for `caseSensitiveResolution`/`caseInsensitveResolution` instead of having a new method ### Why are the changes needed? remove redundant code ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? existing code Closes #34171 from huaxingao/minor. Authored-by: Huaxin Gao <huaxin_...@apple.com> Signed-off-by: Liang-Chi Hsieh <vii...@gmail.com> --- .../execution/datasources/PartitioningUtils.scala | 23 +++++------------- .../sql/execution/datasources/jdbc/JdbcUtils.scala | 27 +++++++--------------- 2 files changed, 14 insertions(+), 36 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala index 184e6317..273dc77 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala @@ -29,14 +29,13 @@ import scala.util.control.NonFatal import org.apache.hadoop.fs.Path -import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.{InternalRow, SQLConfHelper} import org.apache.spark.sql.catalyst.analysis.TypeCoercion import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils.getPartitionValueString import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, Literal} import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateFormatter, DateTimeUtils, TimestampFormatter} import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors} -import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ import org.apache.spark.sql.util.SchemaUtils import org.apache.spark.unsafe.types.UTF8String @@ -62,7 +61,7 @@ object PartitionSpec { val emptySpec = PartitionSpec(StructType(Seq.empty[StructField]), Seq.empty[PartitionPath]) } -object PartitioningUtils { +object PartitioningUtils extends SQLConfHelper{ val timestampPartitionPattern = "yyyy-MM-dd HH:mm:ss[.S]" @@ -491,7 +490,7 @@ object PartitioningUtils { val timestampTry = Try { val unescapedRaw = unescapePathName(raw) // the inferred data type is consistent with the default timestamp type - val timestampType = SQLConf.get.timestampType + val timestampType = conf.timestampType // try and parse the date, if no exception occurs this is a candidate to be resolved as // TimestampType or TimestampNTZType timestampType match { @@ -556,7 +555,7 @@ object PartitioningUtils { SchemaUtils.checkColumnNameDuplication( partitionColumns, partitionColumns.mkString(", "), caseSensitive) - partitionColumnsSchema(schema, partitionColumns, caseSensitive).foreach { + partitionColumnsSchema(schema, partitionColumns).foreach { field => field.dataType match { case _: AtomicType => // OK case _ => throw QueryCompilationErrors.cannotUseDataTypeForPartitionColumnError(field) @@ -570,11 +569,9 @@ object PartitioningUtils { def partitionColumnsSchema( schema: StructType, - partitionColumns: Seq[String], - caseSensitive: Boolean): StructType = { - val equality = columnNameEquality(caseSensitive) + partitionColumns: Seq[String]): StructType = { StructType(partitionColumns.map { col => - schema.find(f => equality(f.name, col)).getOrElse { + schema.find(f => conf.resolver(f.name, col)).getOrElse { val schemaCatalog = schema.catalogString throw QueryCompilationErrors.partitionColumnNotFoundInSchemaError(col, schemaCatalog) } @@ -610,14 +607,6 @@ object PartitioningUtils { } } - private def columnNameEquality(caseSensitive: Boolean): (String, String) => Boolean = { - if (caseSensitive) { - org.apache.spark.sql.catalyst.analysis.caseSensitiveResolution - } else { - org.apache.spark.sql.catalyst.analysis.caseInsensitiveResolution - } - } - /** * Given a collection of [[Literal]]s, resolves possible type conflicts by * [[findWiderTypeForPartitionColumn]]. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala index 7b555bd..d49f4b0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala @@ -29,7 +29,7 @@ import org.apache.spark.TaskContext import org.apache.spark.executor.InputMetrics import org.apache.spark.internal.Logging import org.apache.spark.sql.{DataFrame, Row} -import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.{InternalRow, SQLConfHelper} import org.apache.spark.sql.catalyst.analysis.Resolver import org.apache.spark.sql.catalyst.encoders.RowEncoder import org.apache.spark.sql.catalyst.expressions.SpecificInternalRow @@ -39,7 +39,6 @@ import org.apache.spark.sql.catalyst.util.DateTimeUtils.{instantToMicros, localD import org.apache.spark.sql.connector.catalog.TableChange import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors} import org.apache.spark.sql.execution.datasources.jdbc.connection.ConnectionProvider -import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.jdbc.{JdbcDialect, JdbcDialects, JdbcType} import org.apache.spark.sql.types._ import org.apache.spark.sql.util.SchemaUtils @@ -49,7 +48,7 @@ import org.apache.spark.util.NextIterator /** * Util functions for JDBC tables. */ -object JdbcUtils extends Logging { +object JdbcUtils extends Logging with SQLConfHelper { /** * Returns a factory for creating connections to the given JDBC URL. * @@ -131,18 +130,13 @@ object JdbcUtils extends Logging { val columns = if (tableSchema.isEmpty) { rddSchema.fields.map(x => dialect.quoteIdentifier(x.name)).mkString(",") } else { - val columnNameEquality = if (isCaseSensitive) { - org.apache.spark.sql.catalyst.analysis.caseSensitiveResolution - } else { - org.apache.spark.sql.catalyst.analysis.caseInsensitiveResolution - } // The generated insert statement needs to follow rddSchema's column sequence and // tableSchema's column names. When appending data into some case-sensitive DBMSs like // PostgreSQL/Oracle, we need to respect the existing case-sensitive column names instead of // RDD column names for user convenience. val tableColumnNames = tableSchema.get.fieldNames rddSchema.fields.map { col => - val normalizedName = tableColumnNames.find(f => columnNameEquality(f, col.name)).getOrElse { + val normalizedName = tableColumnNames.find(f => conf.resolver(f, col.name)).getOrElse { throw QueryCompilationErrors.columnNotFoundInSchemaError(col, tableSchema) } dialect.quoteIdentifier(normalizedName) @@ -475,7 +469,7 @@ object JdbcUtils extends Logging { val localTimeMicro = TimeUnit.NANOSECONDS.toMicros( rawTime.toLocalTime().toNanoOfDay()) val utcTimeMicro = DateTimeUtils.toUTCTime( - localTimeMicro, SQLConf.get.sessionLocalTimeZone) + localTimeMicro, conf.sessionLocalTimeZone) row.setLong(pos, utcTimeMicro) } else { row.update(pos, null) @@ -594,7 +588,7 @@ object JdbcUtils extends Logging { stmt.setBytes(pos + 1, row.getAs[Array[Byte]](pos)) case TimestampType => - if (SQLConf.get.datetimeJava8ApiEnabled) { + if (conf.datetimeJava8ApiEnabled) { (stmt: PreparedStatement, row: Row, pos: Int) => stmt.setTimestamp(pos + 1, toJavaTimestamp(instantToMicros(row.getAs[Instant](pos)))) } else { @@ -603,7 +597,7 @@ object JdbcUtils extends Logging { } case DateType => - if (SQLConf.get.datetimeJava8ApiEnabled) { + if (conf.datetimeJava8ApiEnabled) { (stmt: PreparedStatement, row: Row, pos: Int) => stmt.setDate(pos + 1, toJavaDate(localDateToDays(row.getAs[LocalDate](pos)))) } else { @@ -812,19 +806,14 @@ object JdbcUtils extends Logging { caseSensitive: Boolean, createTableColumnTypes: String): Map[String, String] = { val userSchema = CatalystSqlParser.parseTableSchema(createTableColumnTypes) - val nameEquality = if (caseSensitive) { - org.apache.spark.sql.catalyst.analysis.caseSensitiveResolution - } else { - org.apache.spark.sql.catalyst.analysis.caseInsensitiveResolution - } // checks duplicate columns in the user specified column types. SchemaUtils.checkColumnNameDuplication( - userSchema.map(_.name), "in the createTableColumnTypes option value", nameEquality) + userSchema.map(_.name), "in the createTableColumnTypes option value", conf.resolver) // checks if user specified column names exist in the DataFrame schema userSchema.fieldNames.foreach { col => - schema.find(f => nameEquality(f.name, col)).getOrElse { + schema.find(f => conf.resolver(f.name, col)).getOrElse { throw QueryCompilationErrors.createTableColumnTypesOptionColumnNotFoundInSchemaError( col, schema) } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org