beliefer commented on code in PR #42398:
URL: https://github.com/apache/spark/pull/42398#discussion_r1369943722


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala:
##########
@@ -920,6 +920,18 @@ private[sql] object QueryCompilationErrors extends 
QueryErrorsBase with Compilat
         "fieldNames" -> v1Table.schema.fieldNames.mkString(", ")))
   }
 
+  def functionAndOrderExpressionMismatchError(
+      functionName: String,
+      functionExpr: Expression,
+      orderExpr: Expression): Throwable = {
+    new AnalysisException(
+      errorClass = "FUNCTION_AND_ORDER_EXPRESSION_MISMATCH",
+      messageParameters = Map(
+        "functionName" -> toSQLStmt(functionName),

Review Comment:
   ```suggestion
           "functionName" -> toSQLId(functionName),
   ```



##########
sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala:
##########
@@ -603,6 +603,41 @@ class DataFrameAggregateSuite extends QueryTest
     )
   }
 
+  test("listagg function") {
+    // normal case
+    val df = Seq(("a", "b"), ("b", "c"), ("c", "d")).toDF("a", "b")
+    checkAnswer(
+      df.selectExpr("listagg(a)", "listagg(b)"),
+      Seq(Row("a,b,c", "b,c,d"))
+    )
+    checkAnswer(
+      df.select(listagg($"a"), listagg($"b")),
+      Seq(Row("a,b,c", "b,c,d"))
+    )
+
+    // distinct case
+    val df2 = Seq(("a", "b"), ("a", "b"), ("b", "d")).toDF("a", "b")
+    checkAnswer(
+      df2.select(listagg_distinct($"a"), listagg_distinct($"b")),
+      Seq(Row("a,b", "b,d"))
+    )
+
+    // null case
+    val df3 = Seq(("a", "b", null), ("a", "b", null), (null, null, 
null)).toDF("a", "b", "c")
+    checkAnswer(
+      df3.select(listagg_distinct($"a"), listagg($"a"), 
listagg_distinct($"b"), listagg($"b"),
+        listagg($"c")),
+      Seq(Row("a", "a,a", "b", "b,b", ""))
+    )
+
+    // custom delimiter
+    val df4 = Seq(("a", "b"), ("b", "c"), ("c", "d")).toDF("a", "b")
+    checkAnswer(
+      df4.selectExpr("listagg(a, '|')", "listagg(b, '|')"),
+      Seq(Row("a|b|c", "b|c|d"))
+    )
+  }

Review Comment:
   Please test the empty input.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/collect.scala:
##########
@@ -245,3 +249,117 @@ case class CollectTopK(
   override def withNewInputAggBufferOffset(newInputAggBufferOffset: Int): 
CollectTopK =
     copy(inputAggBufferOffset = newInputAggBufferOffset)
 }
+
+@ExpressionDescription(
+  usage = "_FUNC_(expr) - Returns the concatenated input values," +
+    " separated by the delimiter string.",
+  examples = """
+    Examples:
+      > SELECT _FUNC_(col) FROM VALUES ('a'), ('b'), ('c') AS tab(col);
+       a,b,c
+      > SELECT _FUNC_(col) FROM VALUES (NULL), ('a'), ('b') AS tab(col);
+       a,b
+      > SELECT _FUNC_(col) FROM VALUES ('a'), ('a') AS tab(col);
+       a,a
+      > SELECT _FUNC_(DISTINCT col) FROM VALUES ('a'), ('a'), ('b') AS 
tab(col);
+       a,b
+      > SELECT _FUNC_(col, '|') FROM VALUES ('a'), ('b') AS tab(col);
+       a|b
+      > SELECT _FUNC_(col) FROM VALUES (NULL), (NULL) AS tab(col);
+
+  """,
+  group = "agg_funcs",
+  since = "4.0.0")
+case class ListAgg(
+    child: Expression,
+    delimiter: Expression = Literal.create(",", StringType),

Review Comment:
   Do we really need the default value?



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/collect.scala:
##########
@@ -245,3 +249,117 @@ case class CollectTopK(
   override def withNewInputAggBufferOffset(newInputAggBufferOffset: Int): 
CollectTopK =
     copy(inputAggBufferOffset = newInputAggBufferOffset)
 }
+
+@ExpressionDescription(
+  usage = "_FUNC_(expr) - Returns the concatenated input values," +
+    " separated by the delimiter string.",
+  examples = """
+    Examples:
+      > SELECT _FUNC_(col) FROM VALUES ('a'), ('b'), ('c') AS tab(col);
+       a,b,c
+      > SELECT _FUNC_(col) FROM VALUES (NULL), ('a'), ('b') AS tab(col);
+       a,b
+      > SELECT _FUNC_(col) FROM VALUES ('a'), ('a') AS tab(col);
+       a,a
+      > SELECT _FUNC_(DISTINCT col) FROM VALUES ('a'), ('a'), ('b') AS 
tab(col);
+       a,b
+      > SELECT _FUNC_(col, '|') FROM VALUES ('a'), ('b') AS tab(col);
+       a|b
+      > SELECT _FUNC_(col) FROM VALUES (NULL), (NULL) AS tab(col);
+
+  """,
+  group = "agg_funcs",
+  since = "4.0.0")
+case class ListAgg(
+    child: Expression,
+    delimiter: Expression = Literal.create(",", StringType),
+    orderExpression: Expression,
+    reverse: Boolean = false,
+    mutableAggBufferOffset: Int = 0,
+    inputAggBufferOffset: Int = 0) extends Collect[mutable.ArrayBuffer[Any]]
+  with BinaryLike[Expression] {
+
+  def this(child: Expression) =
+    this(child, Literal.create(",", StringType), child, false, 0, 0)
+  def this(child: Expression, delimiter: Expression) =
+    this(child, delimiter, child, false, 0, 0)
+
+  override def createAggregationBuffer(): mutable.ArrayBuffer[Any] = 
mutable.ArrayBuffer.empty
+
+  override def nullable: Boolean = false
+
+  override def dataType: DataType = StringType
+
+  override def left: Expression = child
+
+  override def right: Expression = orderExpression
+
+  private lazy val sameExpression = orderExpression.semanticEquals(child)
+

Review Comment:
   Please put `createAggregationBuffer` here and move 
`withNewMutableAggBufferOffset`, `withNewInputAggBufferOffset` together with 
`createAggregationBuffer`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to