[ 
https://issues.apache.org/jira/browse/FLINK-26098?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17518841#comment-17518841
 ] 

Marios Trivyzas commented on FLINK-26098:
-----------------------------------------

With the following code, where the {{WatermarkAssignerOperator}} is also used:
{noformat}
Configuration configuration = new Configuration();
configuration.set(TABLE_EXEC_SOURCE_IDLE_TIMEOUT, Duration.ofSeconds(100));
EnvironmentSettings settings =
        EnvironmentSettings.newInstance()
                .inStreamingMode()
                .withConfiguration(configuration)
                .build();
TableEnvironment tEnv = TableEnvironment.create(settings);

tEnv.executeSql(
        "CREATE TABLE Orders (\n"
                + " amount INT,\n"
                + " currency STRING,\n"
                + " rowtime TIMESTAMP(3),\n"
                + " proctime AS PROCTIME(),\n"
                + " WATERMARK FOR rowtime AS rowtime\n"
                + ") WITH (\n"
                + " 'connector' = 'datagen',\n"
                + " 'number-of-rows' = '10000'"
                + ")");
tEnv.executeSql(
        "CREATE TABLE RatesHistory (\n"
                + " currency STRING,\n"
                + " rate INT,\n"
                + " rowtime TIMESTAMP(3),\n"
                + " WATERMARK FOR rowtime AS rowtime,\n"
                + " PRIMARY KEY(currency) NOT ENFORCED\n"
                + ") WITH (\n"
                + " 'connector' = 'datagen',\n"
                + " 'number-of-rows' = '10000'"
                + ")");
TemporalTableFunction ratesHistory =
        tEnv.from("RatesHistory").createTemporalTableFunction($("rowtime"), 
$("currency"));
tEnv.createTemporarySystemFunction("Rates", ratesHistory);

String sinkTableDdl =
        "CREATE TABLE MySink (\n"
                + "  a int\n"
                + ") with (\n"
                + "  'connector' = 'blackhole')";
tEnv.executeSql(sinkTableDdl);
tEnv.executeSql(
        "INSERT INTO MySink "
                + "SELECT amount * r.rate "
                + "FROM Orders AS o  "
                + "JOIN RatesHistory  FOR SYSTEM_TIME AS OF o.rowtime AS r "
                + "ON o.currency = r.currency ");{noformat}
We can see in in the {{processElement}} as well:

!Screenshot_20220407_151012.png!

> TableAPI does not forward idleness configuration from DataStream
> ----------------------------------------------------------------
>
>                 Key: FLINK-26098
>                 URL: https://issues.apache.org/jira/browse/FLINK-26098
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / API
>    Affects Versions: 1.15.0, 1.14.3
>            Reporter: Till Rohrmann
>            Assignee: Marios Trivyzas
>            Priority: Major
>         Attachments: Screenshot_20220407_150020.png, 
> Screenshot_20220407_151012.png
>
>
> The TableAPI does not forward the idleness configuration from a DataStream 
> source. That can lead to the halt of processing if all sources are idle 
> because {{WatermarkAssignerOperator}} [1] will never set a channel to active 
> again. The only way to mitigate the problem is to explicitly configure the 
> idleness for table sources via {{table.exec.source.idle-timeout}}. 
> Configuring this value is actually not easy because creating a 
> {{StreamExecutionEnvironment}} via {{create(StreamExecutionEnvironment, 
> TableConfig)}} is deprecated.
> [1] 
> https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/wmassigners/WatermarkAssignerOperator.java#L103



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to