This is an automated email from the ASF dual-hosted git repository. maxgekk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new e211dbdee42c [SPARK-46831][SQL] Collations - Extending StringType and PhysicalStringType with collationId field e211dbdee42c is described below commit e211dbdee42c887c99635623a0312857a240ebaa Author: Aleksandar Tomic <aleksandar.to...@databricks.com> AuthorDate: Mon Jan 29 17:15:29 2024 +0300 [SPARK-46831][SQL] Collations - Extending StringType and PhysicalStringType with collationId field ### What changes were proposed in this pull request? This PR represents initial change for introducing collation concept into Spark engine. For higher level overview please take a look at the umbrella [JIRA](https://issues.apache.org/jira/browse/SPARK-46830). This PR extends both `StringType` and `PhysicalStringType` with collationId field. At this point this is just a noop field. In the following PRs this field will be used for fetching right UTF8String comparison rules from global collation table. Goal is to make sure that we keep backwards compatibility - this is ensured by keeping singleton `object StringType` that inherits `StringType(DEFAULT_COLLATION_ID)`. DEFAULT_COLLATION_ID represents UTF8 Binary collation rules (i.e. byte for byte comparison, that is already used in Spark). Hence, any code that relies on `StringType` will stay binary compatible with this version. It may be hard to see end state from just this initial PR. For reviewers who want to see how this will fit in the end state, please take a look at this draft [PR](https://github.com/apache/spark/pull/44537). ### Why are the changes needed? Please refer to umbrella JIRA ticket for collation effort. ### Does this PR introduce _any_ user-facing change? At this point No. ### How was this patch tested? This initial PR doesn't introduce any surface level changes. ### Was this patch authored or co-authored using generative AI tooling? No Closes #44901 from dbatomic/string_with_collation_type. Authored-by: Aleksandar Tomic <aleksandar.to...@databricks.com> Signed-off-by: Max Gekk <max.g...@gmail.com> --- project/MimaExcludes.scala | 2 ++ .../main/scala/org/apache/spark/sql/types/StringType.scala | 9 ++++++--- .../scala/org/apache/spark/sql/catalyst/InternalRow.scala | 2 +- .../catalyst/expressions/InterpretedUnsafeProjection.scala | 2 +- .../sql/catalyst/expressions/codegen/CodeGenerator.scala | 4 ++-- .../org/apache/spark/sql/catalyst/expressions/literals.scala | 2 +- .../spark/sql/catalyst/expressions/namedExpressions.scala | 2 +- .../apache/spark/sql/catalyst/types/PhysicalDataType.scala | 12 +++++++----- .../org/apache/spark/sql/execution/columnar/ColumnType.scala | 3 ++- .../spark/sql/execution/columnar/ColumnarDataTypeUtils.scala | 2 +- 10 files changed, 24 insertions(+), 16 deletions(-) diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index eb4c130cc6a9..43723742be97 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -107,6 +107,8 @@ object MimaExcludes { // SPARK-46410: Assign error classes/subclasses to JdbcUtils.classifyException ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.jdbc.JdbcDialect.classifyException"), + // [SPARK-464878][CORE][SQL] (false alert). Invalid rule for StringType extension. + ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.types.StringType.this"), (problem: Problem) => problem match { case MissingClassProblem(cls) => !cls.fullName.startsWith("org.sparkproject.jpmml") && diff --git a/sql/api/src/main/scala/org/apache/spark/sql/types/StringType.scala b/sql/api/src/main/scala/org/apache/spark/sql/types/StringType.scala index 5985238a863e..bd2ff8475741 100644 --- a/sql/api/src/main/scala/org/apache/spark/sql/types/StringType.scala +++ b/sql/api/src/main/scala/org/apache/spark/sql/types/StringType.scala @@ -23,9 +23,10 @@ import org.apache.spark.annotation.Stable * The data type representing `String` values. Please use the singleton `DataTypes.StringType`. * * @since 1.3.0 + * @param collationId The id of collation for this StringType. */ @Stable -class StringType private() extends AtomicType { +class StringType private(val collationId: Int) extends AtomicType { /** * The default size of a value of the StringType is 20 bytes. */ @@ -38,5 +39,7 @@ class StringType private() extends AtomicType { * @since 1.3.0 */ @Stable -case object StringType extends StringType - +case object StringType extends StringType(0) { + val DEFAULT_COLLATION_ID = 0 + def apply(collationId: Int): StringType = new StringType(collationId) +} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/InternalRow.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/InternalRow.scala index 15efc30dd3ea..5158cdb276fd 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/InternalRow.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/InternalRow.scala @@ -140,7 +140,7 @@ object InternalRow { case PhysicalLongType => (input, ordinal) => input.getLong(ordinal) case PhysicalFloatType => (input, ordinal) => input.getFloat(ordinal) case PhysicalDoubleType => (input, ordinal) => input.getDouble(ordinal) - case PhysicalStringType => (input, ordinal) => input.getUTF8String(ordinal) + case _: PhysicalStringType => (input, ordinal) => input.getUTF8String(ordinal) case PhysicalBinaryType => (input, ordinal) => input.getBinary(ordinal) case PhysicalCalendarIntervalType => (input, ordinal) => input.getInterval(ordinal) case t: PhysicalDecimalType => (input, ordinal) => diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/InterpretedUnsafeProjection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/InterpretedUnsafeProjection.scala index 3dcc775d6ab2..96b60efda2b6 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/InterpretedUnsafeProjection.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/InterpretedUnsafeProjection.scala @@ -161,7 +161,7 @@ object InterpretedUnsafeProjection { case PhysicalBinaryType => (v, i) => writer.write(i, v.getBinary(i)) - case PhysicalStringType => (v, i) => writer.write(i, v.getUTF8String(i)) + case _: PhysicalStringType => (v, i) => writer.write(i, v.getUTF8String(i)) case PhysicalVariantType => (v, i) => writer.write(i, v.getVariant(i)) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala index c8c2d5558b14..5d04b0d6d95a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala @@ -1642,7 +1642,7 @@ object CodeGenerator extends Logging { case t: PhysicalDecimalType => s"$input.getDecimal($ordinal, ${t.precision}, ${t.scale})" case _: PhysicalMapType => s"$input.getMap($ordinal)" case PhysicalNullType => "null" - case PhysicalStringType => s"$input.getUTF8String($ordinal)" + case _: PhysicalStringType => s"$input.getUTF8String($ordinal)" case t: PhysicalStructType => s"$input.getStruct($ordinal, ${t.fields.length})" case PhysicalVariantType => s"$input.getVariant($ordinal)" case _ => s"($jt)$input.get($ordinal, null)" @@ -1930,7 +1930,7 @@ object CodeGenerator extends Logging { case PhysicalLongType => JAVA_LONG case _: PhysicalMapType => "MapData" case PhysicalShortType => JAVA_SHORT - case PhysicalStringType => "UTF8String" + case _: PhysicalStringType => "UTF8String" case _: PhysicalStructType => "InternalRow" case _: PhysicalVariantType => "VariantVal" case _ => "Object" diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala index 6c72afae91e9..940ba42b1a28 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala @@ -237,7 +237,7 @@ object Literal { } case PhysicalNullType => true case PhysicalShortType => v.isInstanceOf[Short] - case PhysicalStringType => v.isInstanceOf[UTF8String] + case _: PhysicalStringType => v.isInstanceOf[UTF8String] case PhysicalVariantType => v.isInstanceOf[VariantVal] case st: PhysicalStructType => v.isInstanceOf[InternalRow] && { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala index 604494517518..6bbeba4d2969 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala @@ -572,7 +572,7 @@ object FileSourceMetadataAttribute { def isSupportedType(dataType: DataType): Boolean = PhysicalDataType(dataType) match { // PhysicalPrimitiveType covers: Boolean, Byte, Double, Float, Integer, Long, Null, Short case _: PhysicalPrimitiveType | _: PhysicalDecimalType => true - case PhysicalBinaryType | PhysicalStringType | PhysicalCalendarIntervalType => true + case PhysicalBinaryType | PhysicalStringType(_) | PhysicalCalendarIntervalType => true case _ => false } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/PhysicalDataType.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/PhysicalDataType.scala index 290a35eb8e3b..5a3256a7915f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/PhysicalDataType.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/PhysicalDataType.scala @@ -40,9 +40,9 @@ object PhysicalDataType { case ShortType => PhysicalShortType case IntegerType => PhysicalIntegerType case LongType => PhysicalLongType - case VarcharType(_) => PhysicalStringType - case CharType(_) => PhysicalStringType - case StringType => PhysicalStringType + case VarcharType(_) => PhysicalStringType(StringType.DEFAULT_COLLATION_ID) + case CharType(_) => PhysicalStringType(StringType.DEFAULT_COLLATION_ID) + case s: StringType => PhysicalStringType(s.collationId) case FloatType => PhysicalFloatType case DoubleType => PhysicalDoubleType case DecimalType.Fixed(p, s) => PhysicalDecimalType(p, s) @@ -258,7 +258,7 @@ class PhysicalShortType() extends PhysicalIntegralType with PhysicalPrimitiveTyp } case object PhysicalShortType extends PhysicalShortType -class PhysicalStringType() extends PhysicalDataType { +case class PhysicalStringType(collationId: Int) extends PhysicalDataType { // The companion object and this class is separated so the companion object also subclasses // this type. Otherwise, the companion object would be of type "StringType$" in byte code. // Defined with a private constructor so the companion object is the only possible instantiation. @@ -266,7 +266,9 @@ class PhysicalStringType() extends PhysicalDataType { private[sql] val ordering = implicitly[Ordering[InternalType]] @transient private[sql] lazy val tag = typeTag[InternalType] } -case object PhysicalStringType extends PhysicalStringType +object PhysicalStringType { + def apply(collationId: Int): PhysicalStringType = new PhysicalStringType(collationId) +} case class PhysicalArrayType( elementType: DataType, containsNull: Boolean) extends PhysicalDataType { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/ColumnType.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/ColumnType.scala index 53cb568d2060..06a9fe2b0b5b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/ColumnType.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/ColumnType.scala @@ -492,7 +492,8 @@ private[columnar] trait DirectCopyColumnType[JvmType] extends ColumnType[JvmType } private[columnar] object STRING - extends NativeColumnType(PhysicalStringType, 8) with DirectCopyColumnType[UTF8String] { + extends NativeColumnType(PhysicalStringType(StringType.DEFAULT_COLLATION_ID), 8) + with DirectCopyColumnType[UTF8String] { override def actualSize(row: InternalRow, ordinal: Int): Int = { row.getUTF8String(ordinal).numBytes() + 4 diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/ColumnarDataTypeUtils.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/ColumnarDataTypeUtils.scala index 4c1429e90b95..018ce36eb783 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/ColumnarDataTypeUtils.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/ColumnarDataTypeUtils.scala @@ -32,7 +32,7 @@ object ColumnarDataTypeUtils { case PhysicalCalendarIntervalType => CalendarIntervalType case PhysicalFloatType => FloatType case PhysicalDoubleType => DoubleType - case PhysicalStringType => StringType + case PhysicalStringType(collationId) => StringType(collationId) case PhysicalDecimalType(precision, scale) => DecimalType(precision, scale) case PhysicalArrayType(elementType, containsNull) => ArrayType(elementType, containsNull) case PhysicalStructType(fields) => StructType(fields) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org