Hi,
I have below use case Insert bounded data into dynamic table(upsert-kafka) using Flink 1.12 on yarn, but yarn application is still running when insert job finished, and yarn container is not released. I try to use BatchTableEnvironment, but “Primary key and unique key are not supported yet”; i try to use StreamingExecutionEnvironment.setRuntimeMode(RuntimeExecutionMode.BATCH), but it not works. Please help to offer some advice. Regards ``` [test case code] val (senv, btenv) = FlinkSession.getOrCreate() val table = btenv.fromValues( Row.ofKind(RowKind.INSERT, "1"), Row.ofKind(RowKind.INSERT, "2")).select("f0") btenv.createTemporaryView("bound", table) btenv.executeSql(s"create table if not exists test_result(" + "id string, PRIMARY KEY(id) NOT ENFORCED) WITH(" + s"'connector'='kafka','topic'='test_result','properties.bootstrap.servers'='${KAFKA_SERVER}'," + s"'key.format'='json','value.format'='json')") btenv.executeSql("insert into test_result select f0 from bound") ```