Hello,

I use Flink for continuous evaluation of SQL queries on streaming data. One
of the use cases requires us to run recursive SQL queries. I am unable to
find a way to edit rowtime time attribute of the intermediate result table.

For example, let's assume that there is a table T0 with schema -
root
 |-- str1: STRING
 |-- int1: BIGINT
 |-- utime: TIMESTAMP(3)
 |-- itime: TIMESTAMP(3) *ROWTIME*

Now, let's create a view V0 -
var V0 = tEnv_.sqlQuery("select str1, int1, utime, itime from T0");

I wish to change the rowtime of V0 from itime to utime. I tried doing -

V0 = V0.addOrReplaceColumns($("utime").as("utime").rowtime());

but ran into the following exception -

org.apache.flink.table.api.ValidationException: Window properties can only
be used on windowed tables.
at
org.apache.flink.table.operations.utils.OperationTreeBuilder$NoWindowPropertyChecker.visit(OperationTreeBuilder.java:854)
~[flink-table-api-java-1.11.1.jar:1.11.1]
at
org.apache.flink.table.operations.utils.OperationTreeBuilder$NoWindowPropertyChecker.visit(OperationTreeBuilder.java:843)
~[flink-table-api-java-1.11.1.jar:1.11.1]
at
org.apache.flink.table.expressions.ApiExpressionVisitor.visit(ApiExpressionVisitor.java:39)
~[flink-table-api-java-1.11.1.jar:1.11.1]
at
org.apache.flink.table.expressions.UnresolvedCallExpression.accept(UnresolvedCallExpression.java:132)
~[flink-table-api-java-1.11.1.jar:1.11.1]
at
org.apache.flink.table.operations.utils.OperationTreeBuilder.lambda$project$1(OperationTreeBuilder.java:158)
~[flink-table-api-java-1.11.1.jar:1.11.1]
at java.base/java.util.ArrayList.forEach(ArrayList.java:1540) ~[na:na]
at
org.apache.flink.table.operations.utils.OperationTreeBuilder.project(OperationTreeBuilder.java:158)
~[flink-table-api-java-1.11.1.jar:1.11.1]
at
org.apache.flink.table.operations.utils.OperationTreeBuilder.addColumns(OperationTreeBuilder.java:207)
~[flink-table-api-java-1.11.1.jar:1.11.1]
at
org.apache.flink.table.api.internal.TableImpl.addColumnsOperation(TableImpl.java:475)
~[flink-table-api-java-1.11.1.jar:1.11.1]
at
org.apache.flink.table.api.internal.TableImpl.addOrReplaceColumns(TableImpl.java:459)
~[flink-table-api-java-1.11.1.jar:1.11.1]

Any guidance on how to address this?

Regards,
Satyam

Reply via email to