zhenlineo commented on code in PR #40796: URL: https://github.com/apache/spark/pull/40796#discussion_r1183162754
########## connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala: ########## @@ -1271,10 +1268,35 @@ class Dataset[T] private[sql] ( val colNames: Seq[String] = col1 +: cols new RelationalGroupedDataset( toDF(), - colNames.map(colName => Column(colName).expr), + colNames.map(colName => Column(colName)), proto.Aggregate.GroupType.GROUP_TYPE_GROUPBY) } + /** + * (Scala-specific) Reduces the elements of this Dataset using the specified binary function. + * The given `func` must be commutative and associative or the result may be non-deterministic. + * + * @group action + * @since 3.5.0 + */ + def reduce(func: (T, T) => T): T = { + val list = this + .groupByKey(UdfUtils.groupAllUnderBoolTrue())(PrimitiveBooleanEncoder) Review Comment: The current code path is rdd ops. It does not go via ReduceAggregator. If we want to go via ReduceAggregator, we need to make the agg to support `RelationalGroupedDataset#agg(TypedColumn)`, which is missing today. That code path will actually ignore Dataset[T] and always use `RowEncoder` instead. Following this logic, the only problem left is how could we tell the TypedCol apart from Col as no more extra info needed to pass to the server? Suggest leave it as a TODO. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org