[
https://issues.apache.org/jira/browse/FLINK-3612?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15196435#comment-15196435
]
ASF GitHub Bot commented on FLINK-3612:
---------------------------------------
Github user fhueske commented on a diff in the pull request:
https://github.com/apache/flink/pull/1793#discussion_r56256616
--- Diff:
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetAggregate.scala
---
@@ -96,24 +93,32 @@ class DataSetAggregate(
val mappedInput = inputDS.map(aggregateResult._1).name(s"prepare
$aggString")
val groupReduceFunction = aggregateResult._2
- if (groupingKeys.length > 0) {
+ val result = {
+ if (groupingKeys.length > 0) {
+ val inFields = inputType.getFieldNames.asScala.toList
+ val groupByString = s"groupBy:
(${grouping.map(inFields(_)).mkString(", ")})"
- val inFields = inputType.getFieldNames.asScala.toList
- val groupByString = s"groupBy: (${grouping.map( inFields(_)
).mkString(", ")})"
-
- mappedInput.asInstanceOf[DataSet[Row]]
- .groupBy(groupingKeys: _*)
- .reduceGroup(groupReduceFunction)
- .returns(rowTypeInfo)
+ mappedInput.asInstanceOf[DataSet[Row]]
+ .groupBy(groupingKeys: _*)
+ .reduceGroup(groupReduceFunction)
+ .returns(rowTypeInfo)
.name(groupByString + ", " + aggString)
- .asInstanceOf[DataSet[Any]]
+ .asInstanceOf[DataSet[Any]]
+ }
+ else {
+ // global aggregation
+ mappedInput.asInstanceOf[DataSet[Row]]
+ .reduceGroup(groupReduceFunction)
+ .returns(rowTypeInfo)
+ .asInstanceOf[DataSet[Any]]
+ }
}
- else {
- // global aggregation
- mappedInput.asInstanceOf[DataSet[Row]]
- .reduceGroup(groupReduceFunction)
- .returns(rowTypeInfo)
- .asInstanceOf[DataSet[Any]]
+
+ // if the expected type is not a Row, inject a mapper to convert to
the expected type
+ expectedType match {
+ case Some(typeInfo) if typeInfo.getTypeClass != classOf[Row] =>
+ result.map(typeConversion(config, rowTypeInfo, expectedType.get))
--- End diff --
Add a name for the operator.
> Fix/adjust Table API examples
> -----------------------------
>
> Key: FLINK-3612
> URL: https://issues.apache.org/jira/browse/FLINK-3612
> Project: Flink
> Issue Type: Sub-task
> Components: Table API
> Reporter: Vasia Kalavri
> Assignee: Vasia Kalavri
>
> The Table API examples are currently failing with different exceptions. We
> should check whether these signify bugs or whether we need to adjust the
> examples.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)