stefankandic commented on code in PR #47521:
URL: https://github.com/apache/spark/pull/47521#discussion_r1703731222


##########
sql/core/src/test/resources/sql-tests/results/mode.sql.out:
##########
@@ -182,15 +182,9 @@ struct<mode(col):map<int,string>>
 -- !query
 SELECT mode(col, true) FROM VALUES (map(1, 'a')) AS tab(col)
 -- !query schema
-struct<>
+struct<mode() WITHIN GROUP (ORDER BY col DESC):map<int,string>>

Review Comment:
   i get the output part, but why is the query schema different?



##########
sql/core/src/test/scala/org/apache/spark/sql/CollationSuite.scala:
##########
@@ -989,6 +989,93 @@ class CollationSuite extends DatasourceV2SQLBase with 
AdaptiveSparkPlanHelper {
     }
   }
 
+  for (collation <- Seq("UTF8_LCASE", "UNICODE_CI", "UTF8_BINARY", "")) {
+    for (codeGen <- Seq("NO_CODEGEN", "CODEGEN_ONLY")) {
+      val collationSetup = if (collation.isEmpty) "" else "collate " + 
collation
+
+      test(s"Group by on map containing $collationSetup strings ($codeGen)") {
+        val table = "t"
+
+        withTable(table) {
+          withSQLConf(SQLConf.CODEGEN_FACTORY_MODE.key -> codeGen) {
+
+            sql(s"create table $table" +
+              s" (m map<string $collationSetup, string $collationSetup>)")
+            sql(s"insert into $table values (map('aaa', 'AAA'))")
+            sql(s"insert into $table values (map('AAA', 'aaa'))")
+            sql(s"insert into $table values (map('aaa', 'AAA'))")
+            sql(s"insert into $table values (map('bbb', 'BBB'))")
+            sql(s"insert into $table values (map('aAA', 'AaA'))")
+            sql(s"insert into $table values (map('BBb', 'bBB'))")
+            sql(s"insert into $table values (map('aaaa', 'AAA'))")
+
+            val df = sql(s"select count(*) from $table group by m")
+            if (collation.isEmpty ||
+              
CollationFactory.fetchCollation(collation).supportsBinaryEquality) {
+              checkAnswer(df, Seq(Row(2), Row(1), Row(1), Row(1), Row(1), 
Row(1)))
+            } else {
+              checkAnswer(df, Seq(Row(4), Row(2), Row(1)))
+            }
+          }
+        }
+      }
+
+      test(s"Group by on map containing structs with $collationSetup strings 
($codeGen)") {
+        val table = "t"
+
+        withTable(table) {
+          withSQLConf(SQLConf.CODEGEN_FACTORY_MODE.key -> codeGen) {
+            sql(s"create table $table" +
+              s" (m map<struct<fld1: string $collationSetup, fld2: string 
$collationSetup>, " +
+              s"struct<fld1: string $collationSetup, fld2: string 
$collationSetup>>)")
+            sql(s"insert into $table values " +
+              s"(map(struct('aaa', 'bbb'), struct('ccc', 'ddd')))")
+            sql(s"insert into $table values " +
+              s"(map(struct('Aaa', 'BBB'), struct('cCC', 'dDd')))")
+            sql(s"insert into $table values " +
+              s"(map(struct('AAA', 'BBb'), struct('cCc', 'DDD')))")
+            sql(s"insert into $table values " +
+              s"(map(struct('aaa', 'bbB'), struct('CCC', 'DDD')))")
+
+            val df = sql(s"select count(*) from $table group by m")
+            if (collation.isEmpty ||
+              
CollationFactory.fetchCollation(collation).supportsBinaryEquality) {
+              checkAnswer(df, Seq(Row(1), Row(1), Row(1), Row(1)))
+            } else {
+              checkAnswer(df, Seq(Row(4)))
+            }
+          }
+        }
+      }
+
+      test(s"Group by on map containing arrays with $collationSetup strings 
($codeGen)") {
+        val table = "t"
+
+        withTable(table) {
+          withSQLConf(SQLConf.CODEGEN_FACTORY_MODE.key -> codeGen) {
+            sql(s"create table $table " +
+              s"(m map<array<string $collationSetup>, array<string 
$collationSetup>>)")
+            sql(s"insert into $table values (map(array('aaa', 'bbb'), 
array('ccc', 'ddd')))")
+            sql(s"insert into $table values (map(array('AAA', 'BbB'), 
array('Ccc', 'ddD')))")
+            sql(s"insert into $table values (map(array('AAA', 'BbB', 'Ccc'), 
array('ddD')))")
+            sql(s"insert into $table values (map(array('aAa', 'Bbb'), 
array('CCC', 'DDD')))")
+            sql(s"insert into $table values (map(array('AAa', 'BBb'), 
array('cCC', 'DDd')))")
+            sql(s"insert into $table values (map(array('AAA', 'BBB', 'CCC'), 
array('DDD')))")
+
+            val df = sql(s"select count(*) from $table group by m")
+            if (collation.isEmpty ||
+              
CollationFactory.fetchCollation(collation).supportsBinaryEquality) {
+              checkAnswer(df, Seq(Row(1), Row(1), Row(1), Row(1), Row(1), 
Row(1)))
+            } else {
+              checkAnswer(df, Seq(Row(4), Row(2)))
+            }
+          }
+        }
+      }
+    }
+  }

Review Comment:
   please add a test that order by map still fails



##########
sql/core/src/test/scala/org/apache/spark/sql/CollationSuite.scala:
##########
@@ -989,6 +989,93 @@ class CollationSuite extends DatasourceV2SQLBase with 
AdaptiveSparkPlanHelper {
     }
   }
 
+  for (collation <- Seq("UTF8_LCASE", "UNICODE_CI", "UTF8_BINARY", "")) {
+    for (codeGen <- Seq("NO_CODEGEN", "CODEGEN_ONLY")) {
+      val collationSetup = if (collation.isEmpty) "" else "collate " + 
collation
+
+      test(s"Group by on map containing $collationSetup strings ($codeGen)") {
+        val table = "t"

Review Comment:
   also for the tests below



##########
sql/core/src/test/scala/org/apache/spark/sql/CollationSuite.scala:
##########
@@ -989,6 +989,93 @@ class CollationSuite extends DatasourceV2SQLBase with 
AdaptiveSparkPlanHelper {
     }
   }
 
+  for (collation <- Seq("UTF8_LCASE", "UNICODE_CI", "UTF8_BINARY", "")) {
+    for (codeGen <- Seq("NO_CODEGEN", "CODEGEN_ONLY")) {
+      val collationSetup = if (collation.isEmpty) "" else "collate " + 
collation
+
+      test(s"Group by on map containing $collationSetup strings ($codeGen)") {
+        val table = "t"
+
+        withTable(table) {
+          withSQLConf(SQLConf.CODEGEN_FACTORY_MODE.key -> codeGen) {
+
+            sql(s"create table $table" +
+              s" (m map<string $collationSetup, string $collationSetup>)")
+            sql(s"insert into $table values (map('aaa', 'AAA'))")
+            sql(s"insert into $table values (map('AAA', 'aaa'))")
+            sql(s"insert into $table values (map('aaa', 'AAA'))")
+            sql(s"insert into $table values (map('bbb', 'BBB'))")
+            sql(s"insert into $table values (map('aAA', 'AaA'))")
+            sql(s"insert into $table values (map('BBb', 'bBB'))")
+            sql(s"insert into $table values (map('aaaa', 'AAA'))")
+
+            val df = sql(s"select count(*) from $table group by m")
+            if (collation.isEmpty ||

Review Comment:
   can we extract this predicate into a variable at the top?



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/PhysicalDataType.scala:
##########
@@ -234,10 +234,69 @@ case object PhysicalLongType extends PhysicalLongType
 
 case class PhysicalMapType(keyType: DataType, valueType: DataType, 
valueContainsNull: Boolean)
     extends PhysicalDataType {
-  override private[sql] def ordering =
-    throw 
QueryExecutionErrors.orderedOperationUnsupportedByDataTypeError("PhysicalMapType")
-  override private[sql] type InternalType = Any
+  override private[sql] def ordering = interpretedOrdering
+  override private[sql] type InternalType = MapData
   @transient private[sql] lazy val tag = typeTag[InternalType]
+
+  @transient
+  private[sql] lazy val interpretedOrdering: Ordering[MapData] = new 
Ordering[MapData] {
+    private[this] val keyOrdering =
+      PhysicalDataType(keyType).ordering.asInstanceOf[Ordering[Any]]
+    private[this] val valuesOrdering =
+      PhysicalDataType(valueType).ordering.asInstanceOf[Ordering[Any]]
+
+    override def compare(x: MapData, y: MapData): Int = {
+      val lengthX = x.numElements()
+      val lengthY = y.numElements()
+      val keyArrayX = x.keyArray()
+      val valueArrayX = x.valueArray()
+      val keyArrayY = y.keyArray()
+      val valueArrayY = y.valueArray()
+      val minLength = math.min(lengthX, lengthY)
+      var i = 0
+      while (i < minLength) {
+        var comp = compareElements(keyArrayX, keyArrayY, keyType, i, 
keyOrdering)
+        if (comp != 0) {
+          return comp
+        }
+        comp = compareElements(valueArrayX, valueArrayY, valueType, i, 
valuesOrdering)
+        if (comp != 0) {
+          return comp
+        }
+
+        i += 1
+      }
+
+      if (lengthX < lengthY) {
+        -1
+      }
+      else if (lengthX > lengthY) {
+        1
+      }
+      else {
+        0
+      }
+    }
+
+    private def compareElements(arrayX: ArrayData, arrayY: ArrayData, 
dataType: DataType,

Review Comment:
   please follow dbx scala style guide on indentation for method declarations: 
https://github.com/databricks/scala-style-guide?tab=readme-ov-file#indent



##########
sql/core/src/test/scala/org/apache/spark/sql/CollationSuite.scala:
##########
@@ -989,6 +989,93 @@ class CollationSuite extends DatasourceV2SQLBase with 
AdaptiveSparkPlanHelper {
     }
   }
 
+  for (collation <- Seq("UTF8_LCASE", "UNICODE_CI", "UTF8_BINARY", "")) {
+    for (codeGen <- Seq("NO_CODEGEN", "CODEGEN_ONLY")) {
+      val collationSetup = if (collation.isEmpty) "" else "collate " + 
collation
+
+      test(s"Group by on map containing $collationSetup strings ($codeGen)") {
+        val table = "t"

Review Comment:
   ```suggestion
           val tableName = "t"
   ```



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/PhysicalDataType.scala:
##########
@@ -234,10 +234,69 @@ case object PhysicalLongType extends PhysicalLongType
 
 case class PhysicalMapType(keyType: DataType, valueType: DataType, 
valueContainsNull: Boolean)
     extends PhysicalDataType {
-  override private[sql] def ordering =
-    throw 
QueryExecutionErrors.orderedOperationUnsupportedByDataTypeError("PhysicalMapType")
-  override private[sql] type InternalType = Any
+  override private[sql] def ordering = interpretedOrdering
+  override private[sql] type InternalType = MapData
   @transient private[sql] lazy val tag = typeTag[InternalType]
+
+  @transient
+  private[sql] lazy val interpretedOrdering: Ordering[MapData] = new 
Ordering[MapData] {

Review Comment:
   can we add a comment here explaining that maps are not in fact orderable but 
that we need this just to support the group by scenario?



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/PhysicalDataType.scala:
##########
@@ -234,10 +234,69 @@ case object PhysicalLongType extends PhysicalLongType
 
 case class PhysicalMapType(keyType: DataType, valueType: DataType, 
valueContainsNull: Boolean)
     extends PhysicalDataType {
-  override private[sql] def ordering =
-    throw 
QueryExecutionErrors.orderedOperationUnsupportedByDataTypeError("PhysicalMapType")
-  override private[sql] type InternalType = Any
+  override private[sql] def ordering = interpretedOrdering
+  override private[sql] type InternalType = MapData
   @transient private[sql] lazy val tag = typeTag[InternalType]
+
+  @transient
+  private[sql] lazy val interpretedOrdering: Ordering[MapData] = new 
Ordering[MapData] {
+    private[this] val keyOrdering =
+      PhysicalDataType(keyType).ordering.asInstanceOf[Ordering[Any]]
+    private[this] val valuesOrdering =
+      PhysicalDataType(valueType).ordering.asInstanceOf[Ordering[Any]]
+
+    override def compare(x: MapData, y: MapData): Int = {
+      val lengthX = x.numElements()

Review Comment:
   can we follow the `PhysicalArrayType` convention of naming these variables 
`left` and `right` instead of `x` and `y`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to