[ 
https://issues.apache.org/jira/browse/FLINK-6886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16050535#comment-16050535
 ] 

Fabian Hueske commented on FLINK-6886:
--------------------------------------

Maybe there's another way to fix this problem. I played a bit around and found 
the following:

The following Table API query is executed correctly:

{code}
val table = stream.toTable(tEnv, 'l, 'i, 'n, 'proctime.proctime)

    val windowedTable = table
      .window(Tumble over 2.seconds on 'proctime as 'w)
      .groupBy('w, 'n)
      .select('n, 'i.count as 'cnt, 'w.start as 's, 'w.end as 'e)
val results = windowedTable.toAppendStream[MP](queryConfig)

// POJO

class MP(var s: Timestamp, var e: Timestamp, var cnt: Long, var n: String) {
  def this() { this(null, null, 0, null) }
  override def toString: String = s"$n,${s.toString},${e.toString},$cnt"
}
{code}

whereas the equivalent SQL query fails with the reported exception ("The field 
types of physical and logical row types do not match")

{code}
val sqlTable = tEnv.sql(
      s"""SELECT TUMBLE_START(proctime, INTERVAL '2' SECOND) AS s,
        |  TUMBLE_END(proctime, INTERVAL '2' SECOND) AS e,
        |  n,
        |  COUNT(i) as cnt
        |FROM $table
        |GROUP BY n, TUMBLE(proctime, INTERVAL '2' SECOND)
        |
      """.stripMargin)

val results = sqlTable.toAppendStream[MP](queryConfig)
{code}

The plans of both queries look similar, but the SQL plan seems to lack the 
correct final projection:

{code}
// Table API plan
== Abstract Syntax Tree ==
LogicalProject(n=[$0], cnt=[AS($1, 'cnt')], s=[AS($2, 's')], e=[AS($3, 'e')])
  LogicalWindowAggregate(group=[{0}], TMP_0=[COUNT($1)])
    LogicalProject(n=[$2], i=[$1], proctime=[$3])
      LogicalTableScan(table=[[_DataStreamTable_0]])

== Optimized Logical Plan ==
DataStreamCalc(select=[n, TMP_0 AS cnt, TMP_1 AS s, TMP_2 AS e])
  DataStreamGroupWindowAggregate(groupBy=[n], window=[TumblingGroupWindow('w, 
'proctime, 2000.millis)], select=[n, COUNT(i) AS TMP_0, start('w) AS TMP_1, 
end('w) AS TMP_2])
    DataStreamCalc(select=[n, i, proctime])
      DataStreamScan(table=[[_DataStreamTable_0]])

// SQL plans
== Abstract Syntax Tree ==
LogicalProject(s=[TUMBLE_START($1)], e=[TUMBLE_END($1)], n=[$0], cnt=[$2])
  LogicalAggregate(group=[{0, 1}], cnt=[COUNT($2)])
    LogicalProject(n=[$2], $f1=[TUMBLE($3, 2000)], i=[$1])
      LogicalTableScan(table=[[UnnamedTable$3]])

== Optimized Logical Plan ==
DataStreamCalc(select=[w$start, w$end, n, cnt])
  DataStreamGroupWindowAggregate(groupBy=[n], window=[TumblingGroupWindow('w$, 
'proctime, 2000.millis)], select=[n, COUNT(i) AS cnt, start('w$) AS w$start, 
end('w$) AS w$end])
    DataStreamCalc(select=[n, proctime, i])
      DataStreamScan(table=[[_DataStreamTable_0]])
{code}

So this doesn't seem to be a principled issue with the time attributes or 
window properties but rather an issue of the SQL optimization.

What do you think [~sunjincheng121] and [~jark]?

> Fix Timestamp field can not be selected in event time case when  
> toDataStream[T], `T` not a `Row` Type.
> -------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-6886
>                 URL: https://issues.apache.org/jira/browse/FLINK-6886
>             Project: Flink
>          Issue Type: Bug
>          Components: Table API & SQL
>    Affects Versions: 1.4.0
>            Reporter: sunjincheng
>            Assignee: sunjincheng
>
> Currently for event-time window(group/over), When contain `Timestamp` type 
> field in `SELECT Clause`, And toDataStream[T], `T` not a `Row` Type, Such 
> `PojoType`, will throw a exception. In this JIRA. will fix this bug. For 
> example:
> Group Window on SQL:
> {code}
> SELECT name, max(num) as myMax, TUMBLE_START(rowtime, INTERVAL '5' SECOND) as 
> winStart,TUMBLE_END(rowtime, INTERVAL '5' SECOND) as winEnd FROM T1 GROUP BY 
> name, TUMBLE(rowtime, INTERVAL '5' SECOND)
> {code}
> Throw Exception:
> {code}
> org.apache.flink.table.api.TableException: The field types of physical and 
> logical row types do not match.This is a bug and should not happen. Please 
> file an issue.
>       at org.apache.flink.table.api.TableException$.apply(exceptions.scala:53)
>       at 
> org.apache.flink.table.api.TableEnvironment.generateRowConverterFunction(TableEnvironment.scala:721)
>       at 
> org.apache.flink.table.api.StreamTableEnvironment.getConversionMapper(StreamTableEnvironment.scala:247)
>       at 
> org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:647)
> {code}
> In fact, when we solve this exception,subsequent other exceptions will be 
> thrown. The real reason is {{TableEnvironment#generateRowConverterFunction}} 
> method bug. So in this JIRA. will fix it.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to