[ 
https://issues.apache.org/jira/browse/FLINK-34001?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17803482#comment-17803482
 ] 

yong yang commented on FLINK-34001:
-----------------------------------

{code:java}
//代码占位符
package com.yy.state.OperatorStateTTL

import org.apache.flink.configuration.{Configuration, RestOptions}
import org.apache.flink.runtime.state.filesystem.FsStateBackend
import org.apache.flink.streaming.api.CheckpointingMode
import org.apache.flink.streaming.api.environment.CheckpointConfig
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.bridge.scala.{StreamStatementSet, 
StreamTableEnvironment}

import java.time.ZoneId
import scala.util.Random

/**
 * 看下 EXECUTE PLAN 和 statementset 怎么结合
 * 适配flink web ui 通过打包为jar提交flink任务 报错: Cannot have more than one execute() or 
executeAsync() call in a single environment
 * idea执行没有问题
 * 参考: https://blog.csdn.net/tianlangstudio/article/details/123086300
 */
object TTLDemoV2 {
  def main(args: Array[String]): Unit = {
    val conf = new Configuration
    conf.setInteger(RestOptions.PORT, 28080)

    // 从指定的checkpoint启动 不配置则无状态启动
    //    
conf.setString("execution.savepoint.path","file:///Users/thomas990p/checkpointdir/41f86441111ad4492002188d8d4e1009/chk-136")

    //  不用local 使用下面的方式 就可以使用多个 execute sql 且都可以生效
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    // 本地任务停止时,保留 checkpoint 数据
    
env.getCheckpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)

    env.setParallelism(1)

    env.enableCheckpointing(1000, CheckpointingMode.EXACTLY_ONCE)
    env.setStateBackend(new 
FsStateBackend("file:///Users/thomas990p/checkpointdir"))
    env.disableOperatorChaining() // 禁用全局任务链

    val tEnv: StreamTableEnvironment = StreamTableEnvironment.create(env)

    // 指定国内时区
    tEnv.getConfig.setLocalTimeZone(ZoneId.of("Asia/Shanghai"))


    tEnv.executeSql(
      """
        |CREATE TABLE s1 (
        |  id String
        |  ,name string
        |) WITH (
        |  'connector' = 'kafka',
        |  'topic' = 's1',
        |  'scan.startup.mode' = 'latest-offset',
        |  'properties.group.id' = 'g1',
        |  'properties.bootstrap.servers' = 'localhost:9092',
        |  'format' = 'csv'
        |)
        |""".stripMargin)

    /*
    不允许配置: 'scan.startup.mode' = 'latest-offset' 否则报错:
    Unsupported options found for 'upsert-kafka'.  他必须从earliest消费 无法修改
    */
    tEnv.executeSql(
      """
        |CREATE TABLE s2 (
        |  id String
        |  ,age int
        |  ,primary key(id) not enforced
        |) WITH (
        |  'connector' = 'upsert-kafka',
        |  'topic' = 's3',
        |  'properties.bootstrap.servers' = 'localhost:9092',
        |  'key.format' = 'csv'
        |  ,'value.format' = 'csv'
        |)
        |""".stripMargin)

    tEnv.executeSql(
      """
        |CREATE TABLE s1_sink1 (
        |  id String
        |  ,name string
        |) WITH (
        |  'connector' = 'print'
        |  ,'print-identifier'='s1 sink>>>>'
        |)
        |""".stripMargin)
    tEnv.executeSql(
      """
        |CREATE TABLE s2_sink1 (
        |  id String
        |  ,age int
        |) WITH (
        |  'connector' = 'print'
        |  ,'print-identifier'='s2 sink>>>>'
        |)
        |""".stripMargin)

    tEnv.executeSql(
      """
        |CREATE TABLE sink1 (
        |  id String
        |  ,name string
        |  ,age int
        |) WITH (
        |  'connector' = 'print'
        |  ,'print-identifier'='>>>>'
        |)
        |""".stripMargin)


    tEnv.executeSql("insert into s1_sink1 select * from s1")
    tEnv.executeSql("insert into s2_sink1 select * from s2")
    tEnv.executeSql("EXECUTE PLAN 
'file:///Users/thomas990p/flink-plain/plan1.json' ") // two 0ms keep left and 
right state forever 



  }

}
 {code}

> doc diff from code
> ------------------
>
>                 Key: FLINK-34001
>                 URL: https://issues.apache.org/jira/browse/FLINK-34001
>             Project: Flink
>          Issue Type: Bug
>          Components: API / State Processor
>    Affects Versions: 1.18.0
>            Reporter: yong yang
>            Priority: Major
>
> doc:
> https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/table/concepts/overview/#idle-state-retention-time
> The current TTL value for both left and right side is {{{}"0 ms"{}}}, which 
> means the state retention is not enabled. 
>  
> but i test find :
> The current TTL value for both left and right side is {{{}"0 ms"{}}}, which 
> means the state is permanence keep!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to