zhenlineo commented on code in PR #40796:
URL: https://github.com/apache/spark/pull/40796#discussion_r1183162754


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala:
##########
@@ -1271,10 +1268,35 @@ class Dataset[T] private[sql] (
     val colNames: Seq[String] = col1 +: cols
     new RelationalGroupedDataset(
       toDF(),
-      colNames.map(colName => Column(colName).expr),
+      colNames.map(colName => Column(colName)),
       proto.Aggregate.GroupType.GROUP_TYPE_GROUPBY)
   }
 
+  /**
+   * (Scala-specific) Reduces the elements of this Dataset using the specified 
binary function.
+   * The given `func` must be commutative and associative or the result may be 
non-deterministic.
+   *
+   * @group action
+   * @since 3.5.0
+   */
+  def reduce(func: (T, T) => T): T = {
+    val list = this
+      .groupByKey(UdfUtils.groupAllUnderBoolTrue())(PrimitiveBooleanEncoder)

Review Comment:
   The current code path is rdd ops. It does not go via ReduceAggregator.
   
   If we want to go via ReduceAggregator, we need to make the agg to support 
`RelationalGroupedDataset#agg(TypedColumn)`, which is missing today. That code 
path will actually ignore Dataset[T] and always use `RowEncoder` instead. 
Following this logic, the only problem left is how could we tell the TypedCol 
apart from Col as no more extra info needed to pass to the server?
   
   Suggest leave it as a TODO.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to