[ https://issues.apache.org/jira/browse/FLINK-34001?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17803482#comment-17803482 ]
yong yang commented on FLINK-34001: ----------------------------------- {code:java} //代码占位符 package com.yy.state.OperatorStateTTL import org.apache.flink.configuration.{Configuration, RestOptions} import org.apache.flink.runtime.state.filesystem.FsStateBackend import org.apache.flink.streaming.api.CheckpointingMode import org.apache.flink.streaming.api.environment.CheckpointConfig import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.table.api.bridge.scala.{StreamStatementSet, StreamTableEnvironment} import java.time.ZoneId import scala.util.Random /** * 看下 EXECUTE PLAN 和 statementset 怎么结合 * 适配flink web ui 通过打包为jar提交flink任务 报错: Cannot have more than one execute() or executeAsync() call in a single environment * idea执行没有问题 * 参考: https://blog.csdn.net/tianlangstudio/article/details/123086300 */ object TTLDemoV2 { def main(args: Array[String]): Unit = { val conf = new Configuration conf.setInteger(RestOptions.PORT, 28080) // 从指定的checkpoint启动 不配置则无状态启动 // conf.setString("execution.savepoint.path","file:///Users/thomas990p/checkpointdir/41f86441111ad4492002188d8d4e1009/chk-136") // 不用local 使用下面的方式 就可以使用多个 execute sql 且都可以生效 val env = StreamExecutionEnvironment.getExecutionEnvironment // 本地任务停止时,保留 checkpoint 数据 env.getCheckpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION) env.setParallelism(1) env.enableCheckpointing(1000, CheckpointingMode.EXACTLY_ONCE) env.setStateBackend(new FsStateBackend("file:///Users/thomas990p/checkpointdir")) env.disableOperatorChaining() // 禁用全局任务链 val tEnv: StreamTableEnvironment = StreamTableEnvironment.create(env) // 指定国内时区 tEnv.getConfig.setLocalTimeZone(ZoneId.of("Asia/Shanghai")) tEnv.executeSql( """ |CREATE TABLE s1 ( | id String | ,name string |) WITH ( | 'connector' = 'kafka', | 'topic' = 's1', | 'scan.startup.mode' = 'latest-offset', | 'properties.group.id' = 'g1', | 'properties.bootstrap.servers' = 'localhost:9092', | 'format' = 'csv' |) |""".stripMargin) /* 不允许配置: 'scan.startup.mode' = 'latest-offset' 否则报错: Unsupported options found for 'upsert-kafka'. 他必须从earliest消费 无法修改 */ tEnv.executeSql( """ |CREATE TABLE s2 ( | id String | ,age int | ,primary key(id) not enforced |) WITH ( | 'connector' = 'upsert-kafka', | 'topic' = 's3', | 'properties.bootstrap.servers' = 'localhost:9092', | 'key.format' = 'csv' | ,'value.format' = 'csv' |) |""".stripMargin) tEnv.executeSql( """ |CREATE TABLE s1_sink1 ( | id String | ,name string |) WITH ( | 'connector' = 'print' | ,'print-identifier'='s1 sink>>>>' |) |""".stripMargin) tEnv.executeSql( """ |CREATE TABLE s2_sink1 ( | id String | ,age int |) WITH ( | 'connector' = 'print' | ,'print-identifier'='s2 sink>>>>' |) |""".stripMargin) tEnv.executeSql( """ |CREATE TABLE sink1 ( | id String | ,name string | ,age int |) WITH ( | 'connector' = 'print' | ,'print-identifier'='>>>>' |) |""".stripMargin) tEnv.executeSql("insert into s1_sink1 select * from s1") tEnv.executeSql("insert into s2_sink1 select * from s2") tEnv.executeSql("EXECUTE PLAN 'file:///Users/thomas990p/flink-plain/plan1.json' ") // two 0ms keep left and right state forever } } {code} > doc diff from code > ------------------ > > Key: FLINK-34001 > URL: https://issues.apache.org/jira/browse/FLINK-34001 > Project: Flink > Issue Type: Bug > Components: API / State Processor > Affects Versions: 1.18.0 > Reporter: yong yang > Priority: Major > > doc: > https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/table/concepts/overview/#idle-state-retention-time > The current TTL value for both left and right side is {{{}"0 ms"{}}}, which > means the state retention is not enabled. > > but i test find : > The current TTL value for both left and right side is {{{}"0 ms"{}}}, which > means the state is permanence keep! -- This message was sent by Atlassian Jira (v8.20.10#820010)