[ https://issues.apache.org/jira/browse/FLINK-26098?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17518841#comment-17518841 ]
Marios Trivyzas commented on FLINK-26098: ----------------------------------------- With the following code, where the {{WatermarkAssignerOperator}} is also used: {noformat} Configuration configuration = new Configuration(); configuration.set(TABLE_EXEC_SOURCE_IDLE_TIMEOUT, Duration.ofSeconds(100)); EnvironmentSettings settings = EnvironmentSettings.newInstance() .inStreamingMode() .withConfiguration(configuration) .build(); TableEnvironment tEnv = TableEnvironment.create(settings); tEnv.executeSql( "CREATE TABLE Orders (\n" + " amount INT,\n" + " currency STRING,\n" + " rowtime TIMESTAMP(3),\n" + " proctime AS PROCTIME(),\n" + " WATERMARK FOR rowtime AS rowtime\n" + ") WITH (\n" + " 'connector' = 'datagen',\n" + " 'number-of-rows' = '10000'" + ")"); tEnv.executeSql( "CREATE TABLE RatesHistory (\n" + " currency STRING,\n" + " rate INT,\n" + " rowtime TIMESTAMP(3),\n" + " WATERMARK FOR rowtime AS rowtime,\n" + " PRIMARY KEY(currency) NOT ENFORCED\n" + ") WITH (\n" + " 'connector' = 'datagen',\n" + " 'number-of-rows' = '10000'" + ")"); TemporalTableFunction ratesHistory = tEnv.from("RatesHistory").createTemporalTableFunction($("rowtime"), $("currency")); tEnv.createTemporarySystemFunction("Rates", ratesHistory); String sinkTableDdl = "CREATE TABLE MySink (\n" + " a int\n" + ") with (\n" + " 'connector' = 'blackhole')"; tEnv.executeSql(sinkTableDdl); tEnv.executeSql( "INSERT INTO MySink " + "SELECT amount * r.rate " + "FROM Orders AS o " + "JOIN RatesHistory FOR SYSTEM_TIME AS OF o.rowtime AS r " + "ON o.currency = r.currency ");{noformat} We can see in in the {{processElement}} as well: !Screenshot_20220407_151012.png! > TableAPI does not forward idleness configuration from DataStream > ---------------------------------------------------------------- > > Key: FLINK-26098 > URL: https://issues.apache.org/jira/browse/FLINK-26098 > Project: Flink > Issue Type: Bug > Components: Table SQL / API > Affects Versions: 1.15.0, 1.14.3 > Reporter: Till Rohrmann > Assignee: Marios Trivyzas > Priority: Major > Attachments: Screenshot_20220407_150020.png, > Screenshot_20220407_151012.png > > > The TableAPI does not forward the idleness configuration from a DataStream > source. That can lead to the halt of processing if all sources are idle > because {{WatermarkAssignerOperator}} [1] will never set a channel to active > again. The only way to mitigate the problem is to explicitly configure the > idleness for table sources via {{table.exec.source.idle-timeout}}. > Configuring this value is actually not easy because creating a > {{StreamExecutionEnvironment}} via {{create(StreamExecutionEnvironment, > TableConfig)}} is deprecated. > [1] > https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/wmassigners/WatermarkAssignerOperator.java#L103 -- This message was sent by Atlassian Jira (v8.20.1#820001)