Thanks for your replies Matthias and Timo.

Converting the Table to DataStream, assigning a new Watermark & Rowtime
attribute, and converting back to Table makes sense. One challenge with
that approach is that Table to DataStream conversion could emit retractable
data stream, however, I think, that can now be handled with the new
TableSource API (in 1.11) that allows TableSource to emit retractions.

I'll try this approach when I migrate to the new API and report back.

Regards,
Satyam

On Tue, Sep 1, 2020 at 4:46 AM Timo Walther <t...@ververica.com> wrote:

> Hi Satyam,
>
> Matthias is right. A rowtime attribute cannot be modified and needs to be
> passed "as is" through the pipeline. The only exceptions are if a newer
> rowtime is offered such as `TUMBLE_ROWTIME` or `MATCH_ROWTIME`. In your
> case, you need to define utime as the time attribute. If this is not
> possible, you either express the computation in regular SQL (with
> non-streaming optimizations) or you go to DataStream API prepare the table
> (assign new watermark and StreamRecord timestamp there) and go back to
> Table API.
>
> I hope this helps.
>
> Regards,
> Timo
>
> On Tue, Sep 1, 2020 at 11:40 AM Matthias Pohl <matth...@ververica.com>
> wrote:
>
>> Hi Satyam,
>> Thanks for your post. Unfortunately, it looks like you cannot change the
>> rowtime column here. The rowtime is strongly coupled with the Watermarks
>> feature. By changing the rowtime column we cannot ensure that the
>> watermarks are still aligned as Fabian mentioned in [1].
>>
>> @Timo Walther <t...@ververica.com> : Could you verify my findings?
>>
>> Best,
>> Matthias
>>
>> [1]
>> https://stackoverflow.com/questions/52784089/flink-table-sql-api-modify-rowtime-attribute-after-session-window-aggregation
>>
>> On Mon, Aug 31, 2020 at 6:44 PM Satyam Shekhar <satyamshek...@gmail.com>
>> wrote:
>>
>>> Hello,
>>>
>>> I use Flink for continuous evaluation of SQL queries on streaming data.
>>> One of the use cases requires us to run recursive SQL queries. I am unable
>>> to find a way to edit rowtime time attribute of the intermediate result
>>> table.
>>>
>>> For example, let's assume that there is a table T0 with schema -
>>> root
>>>  |-- str1: STRING
>>>  |-- int1: BIGINT
>>>  |-- utime: TIMESTAMP(3)
>>>  |-- itime: TIMESTAMP(3) *ROWTIME*
>>>
>>> Now, let's create a view V0 -
>>> var V0 = tEnv_.sqlQuery("select str1, int1, utime, itime from T0");
>>>
>>> I wish to change the rowtime of V0 from itime to utime. I tried doing -
>>>
>>> V0 = V0.addOrReplaceColumns($("utime").as("utime").rowtime());
>>>
>>> but ran into the following exception -
>>>
>>> org.apache.flink.table.api.ValidationException: Window properties can
>>> only be used on windowed tables.
>>> at
>>> org.apache.flink.table.operations.utils.OperationTreeBuilder$NoWindowPropertyChecker.visit(OperationTreeBuilder.java:854)
>>> ~[flink-table-api-java-1.11.1.jar:1.11.1]
>>> at
>>> org.apache.flink.table.operations.utils.OperationTreeBuilder$NoWindowPropertyChecker.visit(OperationTreeBuilder.java:843)
>>> ~[flink-table-api-java-1.11.1.jar:1.11.1]
>>> at
>>> org.apache.flink.table.expressions.ApiExpressionVisitor.visit(ApiExpressionVisitor.java:39)
>>> ~[flink-table-api-java-1.11.1.jar:1.11.1]
>>> at
>>> org.apache.flink.table.expressions.UnresolvedCallExpression.accept(UnresolvedCallExpression.java:132)
>>> ~[flink-table-api-java-1.11.1.jar:1.11.1]
>>> at
>>> org.apache.flink.table.operations.utils.OperationTreeBuilder.lambda$project$1(OperationTreeBuilder.java:158)
>>> ~[flink-table-api-java-1.11.1.jar:1.11.1]
>>> at java.base/java.util.ArrayList.forEach(ArrayList.java:1540) ~[na:na]
>>> at
>>> org.apache.flink.table.operations.utils.OperationTreeBuilder.project(OperationTreeBuilder.java:158)
>>> ~[flink-table-api-java-1.11.1.jar:1.11.1]
>>> at
>>> org.apache.flink.table.operations.utils.OperationTreeBuilder.addColumns(OperationTreeBuilder.java:207)
>>> ~[flink-table-api-java-1.11.1.jar:1.11.1]
>>> at
>>> org.apache.flink.table.api.internal.TableImpl.addColumnsOperation(TableImpl.java:475)
>>> ~[flink-table-api-java-1.11.1.jar:1.11.1]
>>> at
>>> org.apache.flink.table.api.internal.TableImpl.addOrReplaceColumns(TableImpl.java:459)
>>> ~[flink-table-api-java-1.11.1.jar:1.11.1]
>>>
>>> Any guidance on how to address this?
>>>
>>> Regards,
>>> Satyam
>>>
>>
>>
>> --
>>
>> Matthias Pohl | Engineer
>>
>> Follow us @VervericaData Ververica <https://www.ververica.com/>
>>
>> --
>>
>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
>> Conference
>>
>> Stream Processing | Event Driven | Real Time
>>
>> --
>>
>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>>
>> --
>> Ververica GmbH
>> Registered at Amtsgericht Charlottenburg: HRB 158244 B
>> Managing Directors: Yip Park Tung Jason, Jinwei (Kevin) Zhang, Karl Anton
>> Wehner
>>
>
>
> --
>
> Timo Walther | Software Engineer
>
> <https://data-artisans.com/>
>
>
> <https://www.ververica.com/>
>
>
> Follow us @VervericaData
>
> --
>
> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
> Conference
>
> Stream Processing | Event Driven | Real Time
>
> --
>
> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
>
> Ververica GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Yip Park Tung Jason, Jinwei (Kevin) Zhang, Karl Anton
> Wehner
>

Reply via email to