[ 
https://issues.apache.org/jira/browse/FLINK-3612?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15196435#comment-15196435
 ] 

ASF GitHub Bot commented on FLINK-3612:
---------------------------------------

Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1793#discussion_r56256616
  
    --- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetAggregate.scala
 ---
    @@ -96,24 +93,32 @@ class DataSetAggregate(
         val mappedInput = inputDS.map(aggregateResult._1).name(s"prepare 
$aggString")
         val groupReduceFunction = aggregateResult._2
     
    -    if (groupingKeys.length > 0) {
    +    val result = {
    +      if (groupingKeys.length > 0) {
    +        val inFields = inputType.getFieldNames.asScala.toList
    +        val groupByString = s"groupBy: 
(${grouping.map(inFields(_)).mkString(", ")})"
     
    -      val inFields = inputType.getFieldNames.asScala.toList
    -      val groupByString = s"groupBy: (${grouping.map( inFields(_) 
).mkString(", ")})"
    -
    -      mappedInput.asInstanceOf[DataSet[Row]]
    -        .groupBy(groupingKeys: _*)
    -        .reduceGroup(groupReduceFunction)
    -        .returns(rowTypeInfo)
    +        mappedInput.asInstanceOf[DataSet[Row]]
    +          .groupBy(groupingKeys: _*)
    +          .reduceGroup(groupReduceFunction)
    +          .returns(rowTypeInfo)
               .name(groupByString + ", " + aggString)
    -        .asInstanceOf[DataSet[Any]]
    +          .asInstanceOf[DataSet[Any]]
    +      }
    +      else {
    +        // global aggregation
    +        mappedInput.asInstanceOf[DataSet[Row]]
    +          .reduceGroup(groupReduceFunction)
    +          .returns(rowTypeInfo)
    +          .asInstanceOf[DataSet[Any]]
    +      }
         }
    -    else {
    -      // global aggregation
    -      mappedInput.asInstanceOf[DataSet[Row]]
    -        .reduceGroup(groupReduceFunction)
    -        .returns(rowTypeInfo)
    -        .asInstanceOf[DataSet[Any]]
    +
    +    // if the expected type is not a Row, inject a mapper to convert to 
the expected type
    +    expectedType match {
    +      case Some(typeInfo) if typeInfo.getTypeClass != classOf[Row] =>
    +        result.map(typeConversion(config, rowTypeInfo, expectedType.get))
    --- End diff --
    
    Add a name for the operator.


> Fix/adjust Table API examples
> -----------------------------
>
>                 Key: FLINK-3612
>                 URL: https://issues.apache.org/jira/browse/FLINK-3612
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Table API
>            Reporter: Vasia Kalavri
>            Assignee: Vasia Kalavri
>
> The Table API examples are currently failing with different exceptions. We 
> should check whether these signify bugs or whether we need to adjust the 
> examples.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to