[ 
https://issues.apache.org/jira/browse/FLINK-18540?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17157130#comment-17157130
 ] 

WeiqiangSun commented on FLINK-18540:
-------------------------------------

It should be consider as a side effect of having a retention config.  Every 
message may cause one or two message to emit. It will make big pressure on the 
downstream operator. For example, unnecessary retract messages  may result in 
extremely high IO when sink to a database .

> Unnecessary retract messages when setIdleStateRetentionTime before converting 
> dynamic table to retract stream
> -------------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-18540
>                 URL: https://issues.apache.org/jira/browse/FLINK-18540
>             Project: Flink
>          Issue Type: Improvement
>          Components: Table SQL / Planner, Table SQL / Runtime
>    Affects Versions: 1.10.0
>            Reporter: WeiqiangSun
>            Priority: Critical
>
> Unnecessary retract messages when setIdleStateRetentionTime before converting 
> dynamic table to retract stream。  
>  
> {code:java}
> final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setParallelism(1);
> EnvironmentSettings es = 
> EnvironmentSettings.newInstance().inStreamingMode().build();
> StreamTableEnvironment fsTableEnv = StreamTableEnvironment.create(env,es);
> DataStream<Integer> ds = env.fromElements(5,4,3,2,1);
> // if do not setIdleStateRetentionTime, it will output only one message as 
> expected like '(true,5)' 
> // if do setIdleStateRetentionTime, it will output unnecessary retract 
> messages like '(false,5)/(true,5)'
> fsTableEnv.getConfig().setIdleStateRetentionTime(Time.hours(1000000),Time.hours(10000000));
> fsTableEnv.createTemporaryView("test",ds,"id");
> Table test = fsTableEnv.sqlQuery("select max(id) from test");
> fsTableEnv.toRetractStream(test,Row.class).printToErr();
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to