Hi,

I have below use case


Insert bounded data into dynamic table(upsert-kafka) using Flink 1.12 on yarn, 
but  yarn application is still running when insert job finished, and yarn 
container is not released.


I try to use BatchTableEnvironment, but “Primary key and unique key are not 
supported yet”; i try to use 
StreamingExecutionEnvironment.setRuntimeMode(RuntimeExecutionMode.BATCH), but 
it not works.


Please help to offer some advice. 


Regards




```
[test case code]
val (senv, btenv) = FlinkSession.getOrCreate()
val table = btenv.fromValues(
 Row.ofKind(RowKind.INSERT, "1"),
 Row.ofKind(RowKind.INSERT, "2")).select("f0")

btenv.createTemporaryView("bound", table)
btenv.executeSql(s"create table if not exists test_result(" +
 "id string, PRIMARY KEY(id) NOT ENFORCED) WITH(" +
 
s"'connector'='kafka','topic'='test_result','properties.bootstrap.servers'='${KAFKA_SERVER}',"
 +
 s"'key.format'='json','value.format'='json')")
btenv.executeSql("insert into test_result select f0 from bound")


```
  • [no subject] vtygoss

Reply via email to