olaky commented on code in PR #46280: URL: https://github.com/apache/spark/pull/46280#discussion_r1601024762
########## python/pyspark/sql/tests/test_types.py: ########## @@ -549,6 +549,129 @@ def test_convert_list_to_str(self): self.assertEqual(df.count(), 1) self.assertEqual(df.head(), Row(name="[123]", income=120)) + def test_schema_with_collations_json_ser_de(self): + from pyspark.sql.types import _parse_datatype_json_string + + unicode_collation = "UNICODE" + + simple_struct = StructType([StructField("c1", StringType(unicode_collation))]) + + nested_struct = StructType([StructField("nested", simple_struct)]) + + array_in_schema = StructType( + [StructField("array", ArrayType(StringType(unicode_collation)))] + ) + + map_in_schema = StructType( + [ + StructField( + "map", MapType(StringType(unicode_collation), StringType(unicode_collation)) + ) + ] + ) + + array_in_map = StructType( + [ + StructField( + "arrInMap", + MapType( + StringType(unicode_collation), ArrayType(StringType(unicode_collation)) + ), + ) + ] + ) + + nested_array_in_map = StructType( Review Comment: ```suggestion nested_array_in_map_value = StructType( ``` ########## common/unsafe/src/main/java/org/apache/spark/sql/catalyst/util/CollationFactory.java: ########## @@ -36,11 +36,45 @@ * Provides functionality to the UTF8String object which respects defined collation settings. */ public final class CollationFactory { + + /** + * Identifier for single a collation. + */ + public static class CollationIdentifier { + public final String provider; + public final String name; + public final String version; + + public CollationIdentifier(String provider, String collationName, String version) { + this.provider = provider; + this.name = collationName; + this.version = version; + } + + public static CollationIdentifier fromString(String identifier) { + String[] parts = identifier.split("\\.", 3); + return new CollationIdentifier(parts[0], parts[1], parts[2]); + } + + @Override + public String toString() { + return String.format("%s.%s.%s", provider, name, version); + } + + /** + * Returns the identifier's string value without the version. + */ + public String valueWithoutVersion() { Review Comment: ```suggestion public String toStringWithoutVersion() { ``` ########## common/unsafe/src/main/java/org/apache/spark/sql/catalyst/util/CollationFactory.java: ########## @@ -36,11 +36,45 @@ * Provides functionality to the UTF8String object which respects defined collation settings. */ public final class CollationFactory { + + /** + * Identifier for single a collation. + */ + public static class CollationIdentifier { + public final String provider; + public final String name; + public final String version; Review Comment: How are we representing identifiers without version? Should we make this Optional[String]? The provider might also be optional, but this will make parsing a lot more difficult ########## python/pyspark/sql/tests/test_types.py: ########## @@ -549,6 +549,129 @@ def test_convert_list_to_str(self): self.assertEqual(df.count(), 1) self.assertEqual(df.head(), Row(name="[123]", income=120)) + def test_schema_with_collations_json_ser_de(self): + from pyspark.sql.types import _parse_datatype_json_string + + unicode_collation = "UNICODE" + + simple_struct = StructType([StructField("c1", StringType(unicode_collation))]) + + nested_struct = StructType([StructField("nested", simple_struct)]) + + array_in_schema = StructType( + [StructField("array", ArrayType(StringType(unicode_collation)))] + ) + + map_in_schema = StructType( + [ + StructField( + "map", MapType(StringType(unicode_collation), StringType(unicode_collation)) + ) + ] + ) + + array_in_map = StructType( + [ + StructField( + "arrInMap", + MapType( + StringType(unicode_collation), ArrayType(StringType(unicode_collation)) + ), + ) + ] + ) + + nested_array_in_map = StructType( + [ + StructField( + "nestedArrayInMap", + ArrayType( + MapType( + StringType(unicode_collation), Review Comment: What about nested collations in map keys? ########## common/unsafe/src/main/java/org/apache/spark/sql/catalyst/util/CollationFactory.java: ########## @@ -137,6 +172,11 @@ public Collation( supportsBinaryOrdering, supportsLowercaseEquality); } + + /** Returns the collation identifier*/ Review Comment: ```suggestion /** Returns the collation identifier. */ ``` ########## python/pyspark/sql/types.py: ########## @@ -263,21 +263,30 @@ def __init__(self, collation: Optional[str] = None): def fromCollationId(self, collationId: int) -> "StringType": return StringType(StringType.collationNames[collationId]) - def collationIdToName(self) -> str: - if self.collationId == 0: - return "" - else: - return " collate %s" % StringType.collationNames[self.collationId] + @classmethod + def collationIdToName(cls, collationId: int) -> str: + return StringType.collationNames[collationId] @classmethod def collationNameToId(cls, collationName: str) -> int: return StringType.collationNames.index(collationName) + @classmethod + def collationProvider(cls, collationName: str) -> str: + if collationName.startswith("UTF8"): Review Comment: This looks like it can break easily. Can we rather have a list of names for collations provided by Spark? And have that in the same place where collations are defined, such that it can be discovered? ########## python/pyspark/sql/tests/test_types.py: ########## @@ -549,6 +549,129 @@ def test_convert_list_to_str(self): self.assertEqual(df.count(), 1) self.assertEqual(df.head(), Row(name="[123]", income=120)) + def test_schema_with_collations_json_ser_de(self): + from pyspark.sql.types import _parse_datatype_json_string + + unicode_collation = "UNICODE" + + simple_struct = StructType([StructField("c1", StringType(unicode_collation))]) + + nested_struct = StructType([StructField("nested", simple_struct)]) + + array_in_schema = StructType( + [StructField("array", ArrayType(StringType(unicode_collation)))] + ) + + map_in_schema = StructType( + [ + StructField( + "map", MapType(StringType(unicode_collation), StringType(unicode_collation)) + ) + ] + ) + + array_in_map = StructType( + [ + StructField( + "arrInMap", + MapType( + StringType(unicode_collation), ArrayType(StringType(unicode_collation)) + ), + ) + ] + ) + + nested_array_in_map = StructType( + [ + StructField( + "nestedArrayInMap", + ArrayType( + MapType( + StringType(unicode_collation), + ArrayType(ArrayType(StringType(unicode_collation))), + ) + ), + ) + ] + ) + + schema_with_multiple_fields = StructType( + simple_struct.fields + + nested_struct.fields + + array_in_schema.fields + + map_in_schema.fields + + array_in_map.fields + + nested_array_in_map.fields + ) + + schemas = [ + simple_struct, + nested_struct, + array_in_schema, + map_in_schema, + nested_array_in_map, + array_in_map, + schema_with_multiple_fields, + ] + + for schema in schemas: + scala_datatype = self.spark._jsparkSession.parseDataType(schema.json()) + python_datatype = _parse_datatype_json_string(scala_datatype.json()) + assert schema == python_datatype + assert schema == _parse_datatype_json_string(schema.json()) + + def test_schema_with_collations_on_non_string_types(self): + from pyspark.sql.types import _parse_datatype_json_string, _COLLATIONS_METADATA_KEY + + collations_on_int_col_json = f""" + {{ + "type": "struct", + "fields": [ + {{ + "name": "c1", + "type": "integer", + "nullable": true, + "metadata": {{ + "{_COLLATIONS_METADATA_KEY}": {{ + "c1": "icu.UNICODE" + }} + }} + }} + ] + }} + """ + + collations_in_map_json = f""" Review Comment: we should also test array element to complete this. Maybe let's make the array element one nested? So an array with a map with a key that is a collated int ########## python/pyspark/sql/tests/test_types.py: ########## @@ -549,6 +549,129 @@ def test_convert_list_to_str(self): self.assertEqual(df.count(), 1) self.assertEqual(df.head(), Row(name="[123]", income=120)) + def test_schema_with_collations_json_ser_de(self): + from pyspark.sql.types import _parse_datatype_json_string + + unicode_collation = "UNICODE" + + simple_struct = StructType([StructField("c1", StringType(unicode_collation))]) + + nested_struct = StructType([StructField("nested", simple_struct)]) + + array_in_schema = StructType( + [StructField("array", ArrayType(StringType(unicode_collation)))] + ) + + map_in_schema = StructType( + [ + StructField( + "map", MapType(StringType(unicode_collation), StringType(unicode_collation)) + ) + ] + ) + + array_in_map = StructType( + [ + StructField( + "arrInMap", + MapType( + StringType(unicode_collation), ArrayType(StringType(unicode_collation)) + ), + ) + ] + ) + + nested_array_in_map = StructType( + [ + StructField( + "nestedArrayInMap", + ArrayType( + MapType( + StringType(unicode_collation), + ArrayType(ArrayType(StringType(unicode_collation))), + ) + ), + ) + ] + ) + + schema_with_multiple_fields = StructType( + simple_struct.fields + + nested_struct.fields + + array_in_schema.fields + + map_in_schema.fields + + array_in_map.fields + + nested_array_in_map.fields + ) + + schemas = [ + simple_struct, + nested_struct, + array_in_schema, + map_in_schema, + nested_array_in_map, + array_in_map, + schema_with_multiple_fields, + ] + + for schema in schemas: + scala_datatype = self.spark._jsparkSession.parseDataType(schema.json()) + python_datatype = _parse_datatype_json_string(scala_datatype.json()) + assert schema == python_datatype + assert schema == _parse_datatype_json_string(schema.json()) + + def test_schema_with_collations_on_non_string_types(self): + from pyspark.sql.types import _parse_datatype_json_string, _COLLATIONS_METADATA_KEY + + collations_on_int_col_json = f""" + {{ + "type": "struct", + "fields": [ + {{ + "name": "c1", + "type": "integer", + "nullable": true, + "metadata": {{ + "{_COLLATIONS_METADATA_KEY}": {{ + "c1": "icu.UNICODE" + }} + }} + }} + ] + }} + """ + + collations_in_map_json = f""" + {{ + "type": "struct", + "fields": [ + {{ + "name": "mapField", + "type": {{ + "type": "map", + "keyType": "string", + "valueType": "integer", + "valueContainsNull": true + }}, + "nullable": true, + "metadata": {{ + "{_COLLATIONS_METADATA_KEY}": {{ + "mapField.value": "icu.UNICODE" + }} + }} + }} + ] + }} + """ Review Comment: the struct fields are all top level / columns here. Let's have some nested ones (i.e. struct with struct field with struct field) ########## python/pyspark/sql/types.py: ########## @@ -693,8 +705,16 @@ def jsonValue(self) -> Dict[str, Any]: } @classmethod - def fromJson(cls, json: Dict[str, Any]) -> "ArrayType": - return ArrayType(_parse_datatype_json_value(json["elementType"]), json["containsNull"]) + def fromJson( + cls, + json: Dict[str, Any], + fieldPath: str = "", Review Comment: Are default arguments a good idea here? Specifically for the collationsMap, not having to provide that means we can miss some code paths where this would be a requirement ########## sql/api/src/main/scala/org/apache/spark/sql/types/StringType.scala: ########## @@ -61,6 +63,10 @@ class StringType private(val collationId: Int) extends AtomicType with Serializa if (isUTF8BinaryCollation) "string" else s"string collate ${CollationFactory.fetchCollation(collationId).collationName}" + // Due to backwards compatibility all string types are serialized in json as Review Comment: also mention compatibility with other readers and that the metadata is in StructField ########## python/pyspark/sql/tests/test_types.py: ########## @@ -549,6 +549,129 @@ def test_convert_list_to_str(self): self.assertEqual(df.count(), 1) self.assertEqual(df.head(), Row(name="[123]", income=120)) + def test_schema_with_collations_json_ser_de(self): + from pyspark.sql.types import _parse_datatype_json_string + + unicode_collation = "UNICODE" + + simple_struct = StructType([StructField("c1", StringType(unicode_collation))]) + + nested_struct = StructType([StructField("nested", simple_struct)]) + + array_in_schema = StructType( + [StructField("array", ArrayType(StringType(unicode_collation)))] + ) + + map_in_schema = StructType( + [ + StructField( + "map", MapType(StringType(unicode_collation), StringType(unicode_collation)) + ) + ] + ) + + array_in_map = StructType( + [ + StructField( + "arrInMap", + MapType( + StringType(unicode_collation), ArrayType(StringType(unicode_collation)) + ), + ) + ] + ) + + nested_array_in_map = StructType( + [ + StructField( + "nestedArrayInMap", + ArrayType( + MapType( + StringType(unicode_collation), + ArrayType(ArrayType(StringType(unicode_collation))), + ) + ), + ) + ] + ) + + schema_with_multiple_fields = StructType( + simple_struct.fields + + nested_struct.fields + + array_in_schema.fields + + map_in_schema.fields + + array_in_map.fields + + nested_array_in_map.fields + ) + + schemas = [ + simple_struct, + nested_struct, + array_in_schema, + map_in_schema, + nested_array_in_map, + array_in_map, + schema_with_multiple_fields, + ] + + for schema in schemas: + scala_datatype = self.spark._jsparkSession.parseDataType(schema.json()) + python_datatype = _parse_datatype_json_string(scala_datatype.json()) + assert schema == python_datatype + assert schema == _parse_datatype_json_string(schema.json()) + + def test_schema_with_collations_on_non_string_types(self): + from pyspark.sql.types import _parse_datatype_json_string, _COLLATIONS_METADATA_KEY + + collations_on_int_col_json = f""" + {{ + "type": "struct", + "fields": [ + {{ + "name": "c1", + "type": "integer", + "nullable": true, + "metadata": {{ + "{_COLLATIONS_METADATA_KEY}": {{ + "c1": "icu.UNICODE" + }} + }} + }} + ] + }} + """ + + collations_in_map_json = f""" + {{ + "type": "struct", + "fields": [ + {{ + "name": "mapField", + "type": {{ + "type": "map", + "keyType": "string", + "valueType": "integer", + "valueContainsNull": true + }}, + "nullable": true, + "metadata": {{ + "{_COLLATIONS_METADATA_KEY}": {{ + "mapField.value": "icu.UNICODE" Review Comment: what about duplicate keys in this json object (should be a protocol error) ########## python/pyspark/sql/types.py: ########## @@ -263,21 +263,30 @@ def __init__(self, collation: Optional[str] = None): def fromCollationId(self, collationId: int) -> "StringType": return StringType(StringType.collationNames[collationId]) - def collationIdToName(self) -> str: - if self.collationId == 0: - return "" - else: - return " collate %s" % StringType.collationNames[self.collationId] + @classmethod + def collationIdToName(cls, collationId: int) -> str: + return StringType.collationNames[collationId] @classmethod def collationNameToId(cls, collationName: str) -> int: return StringType.collationNames.index(collationName) + @classmethod + def collationProvider(cls, collationName: str) -> str: + if collationName.startswith("UTF8"): + return "spark" + return "icu" + def simpleString(self) -> str: - return "string" + self.collationIdToName() + if self.isUTF8BinaryCollation(): + return "string" + + return f"string collate ${self.collationIdToName(self.collationId)}" + # Due to backwards compatibility all string types are serialized in json as Review Comment: ```suggestion # For backwards compatibility and compatibility with other readers all string types are serialized in json as ``` ########## sql/api/src/main/scala/org/apache/spark/sql/types/DataType.scala: ########## @@ -208,22 +206,33 @@ object DataType { } // NOTE: Map fields must be sorted in alphabetical order to keep consistent with the Python side. - private[sql] def parseDataType(json: JValue): DataType = json match { + private[sql] def parseDataType( + json: JValue, + fieldPath: String = "", Review Comment: single string? How do we resolve ambiguity with dots in names? ########## sql/api/src/main/scala/org/apache/spark/sql/types/DataType.scala: ########## @@ -208,22 +206,33 @@ object DataType { } // NOTE: Map fields must be sorted in alphabetical order to keep consistent with the Python side. - private[sql] def parseDataType(json: JValue): DataType = json match { + private[sql] def parseDataType( + json: JValue, + fieldPath: String = "", + collationsMap: Map[String, String] = Map.empty): DataType = json match { case JString(name) => - nameToType(name) + collationsMap.get(fieldPath) match { Review Comment: I believe names in the schema are case insensitive (or at least can be). Can this cause us a problem here? I guess not because we are building and reading this from the same instance of a schema, but want to double check ########## sql/catalyst/src/test/scala/org/apache/spark/sql/types/StructTypeSuite.scala: ########## @@ -606,4 +612,181 @@ class StructTypeSuite extends SparkFunSuite with SQLHelper { "b STRING NOT NULL,c STRING COMMENT 'nullable comment'") assert(fromDDL(struct.toDDL) === struct) } + + test("simple struct with collations to json") { + val simpleStruct = StructType( + StructField("c1", StringType(UNICODE_COLLATION)) :: Nil) + + val expectedJson = + s""" + |{ + | "type": "struct", + | "fields": [ + | { + | "name": "c1", Review Comment: Let's also have tests with names with dots. Also what about tests looking at case sensitivity? And special characters? ########## python/pyspark/sql/types.py: ########## @@ -1702,9 +1791,16 @@ def _parse_datatype_json_string(json_string: str) -> DataType: return _parse_datatype_json_value(json.loads(json_string)) -def _parse_datatype_json_value(json_value: Union[dict, str]) -> DataType: +def _parse_datatype_json_value( + json_value: Union[dict, str], + fieldPath: str = "", + collationsMap: Optional[Dict[str, str]] = None, +) -> DataType: if not isinstance(json_value, dict): if json_value in _all_atomic_types.keys(): + if collationsMap is not None and fieldPath in collationsMap: + collationName = collationsMap[fieldPath].split(".")[1] Review Comment: How about an error here if the provider is not Spark or ICU? ########## python/pyspark/sql/tests/test_types.py: ########## @@ -549,6 +549,129 @@ def test_convert_list_to_str(self): self.assertEqual(df.count(), 1) self.assertEqual(df.head(), Row(name="[123]", income=120)) + def test_schema_with_collations_json_ser_de(self): + from pyspark.sql.types import _parse_datatype_json_string + + unicode_collation = "UNICODE" + + simple_struct = StructType([StructField("c1", StringType(unicode_collation))]) + + nested_struct = StructType([StructField("nested", simple_struct)]) + + array_in_schema = StructType( + [StructField("array", ArrayType(StringType(unicode_collation)))] + ) + + map_in_schema = StructType( + [ + StructField( + "map", MapType(StringType(unicode_collation), StringType(unicode_collation)) + ) + ] + ) + + array_in_map = StructType( + [ + StructField( + "arrInMap", + MapType( + StringType(unicode_collation), ArrayType(StringType(unicode_collation)) + ), + ) + ] + ) + + nested_array_in_map = StructType( + [ + StructField( + "nestedArrayInMap", + ArrayType( + MapType( + StringType(unicode_collation), + ArrayType(ArrayType(StringType(unicode_collation))), + ) + ), + ) + ] + ) + + schema_with_multiple_fields = StructType( + simple_struct.fields + + nested_struct.fields + + array_in_schema.fields + + map_in_schema.fields + + array_in_map.fields + + nested_array_in_map.fields + ) + + schemas = [ + simple_struct, + nested_struct, + array_in_schema, + map_in_schema, + nested_array_in_map, + array_in_map, + schema_with_multiple_fields, + ] + + for schema in schemas: + scala_datatype = self.spark._jsparkSession.parseDataType(schema.json()) Review Comment: So we are testing that what the Scala code serializes can be deserialized by the Python code. Do we also test that we can deserialize what the Scala code serializes? ########## sql/api/src/main/scala/org/apache/spark/sql/types/StructField.scala: ########## @@ -63,7 +66,60 @@ case class StructField( ("name" -> name) ~ ("type" -> dataType.jsonValue) ~ ("nullable" -> nullable) ~ - ("metadata" -> metadata.jsonValue) + ("metadata" -> metadataJson) + } + + private def metadataJson: JValue = { + val metadataJsonValue = metadata.jsonValue + metadataJsonValue match { + case JObject(fields) if collationMetadata.nonEmpty => + val collationFields = collationMetadata.map(kv => kv._1 -> JString(kv._2)).toList + JObject(fields :+ (DataType.COLLATIONS_METADATA_KEY -> JObject(collationFields))) + + case _ => metadataJsonValue + } + } + + /** Map of field path to collation name. */ + private lazy val collationMetadata: mutable.Map[String, String] = { + val fieldToCollationMap = mutable.Map[String, String]() + + def visitRecursively(dt: DataType, path: String): Unit = dt match { + case at: ArrayType => + processDataType(at.elementType, path + ".element") + + case mt: MapType => + processDataType(mt.keyType, path + ".key") + processDataType(mt.valueType, path + ".value") + + case st: StringType if isCollatedString(st) => + fieldToCollationMap(path) = collationName(st) + + case _ => + } + + def processDataType(dt: DataType, path: String): Unit = { + if (isCollatedString(dt)) { + fieldToCollationMap(path) = collationName(dt) + } else { + visitRecursively(dt, path) + } + } + + visitRecursively(dataType, name) Review Comment: what if the field is named `key.element.value`? ########## python/pyspark/sql/types.py: ########## @@ -876,30 +894,86 @@ def __init__( self.dataType = dataType self.nullable = nullable self.metadata = metadata or {} + self._collationMetadata: Optional[Dict[str, str]] = None def simpleString(self) -> str: return "%s:%s" % (self.name, self.dataType.simpleString()) def __repr__(self) -> str: return "StructField('%s', %s, %s)" % (self.name, self.dataType, str(self.nullable)) + def __eq__(self, other: Any) -> bool: + # since collationMetadata is lazy evaluated we should not use it in equality check Review Comment: For me this also points to the question if we should actually have a _collationMetadata member. It is not part of the state of the struct field, and the problem that can come from caching this value is that we might forget to update it. So do we better drop the caching? And we should have a test for Create schema -> serialize json -> update collation id -> serialize json again -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org