Good news~
On Thu, Nov 23, 2023 at 7:24 AM Anton Rodriguez <[email protected]> wrote: > > Found it! "env.enableCheckpointing(10_000);" was missing. Thanks! > > On 22/11/23 21:53, Anton Rodriguez wrote: > > Hi folks, > > My name is Anton Rodriguez, I work for New Relic and I'm running a PoC with > Paimon. It's very exciting and promising. > > I'm having an issue with the Paimon Sink v0.6-SNAPSHOT. I'm running this code > in my IDE: > > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > > // CDC Source > SourceFunction<String> sourceFunction = > PostgreSQLSource.<String>builder() > .hostname("xxxxx") > .port(5432) > .database("yyyy") > .schemaList("public") > .tableList("aaaaa") > .username("bbb") > .password("zzzz") > .decodingPluginName("pgoutput") > .slotName("jjjjj") > .deserializer(new JsonDebeziumDeserializationSchema()) > .build(); > > DataStreamSource<String> dataStreamSource = > env.addSource(sourceFunction); > > StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); > tableEnv.executeSql("CREATE CATALOG paimon WITH ('type' = 'paimon', > 'warehouse'='file:/tmp/paimon')"); > tableEnv.executeSql("USE CATALOG paimon"); > > tableEnv.registerDataStream("input_table", dataStreamSource); > > tableEnv.executeSql("CREATE TABLE IF NOT EXISTS sink_paimon_table > (\n" + > " json STRING PRIMARY KEY NOT ENFORCED\n" + > ");"); > > tableEnv.executeSql("INSERT INTO sink_paimon_table SELECT f0 as json > FROM input_table"); > > But it doesn't write to the table or show any error. I tested the reader with > another writer and it's working. I'm also tested the Ververica CDC Connector > source with a Print Sink and it's also working. Any idea about what could be > wrong? > > Thanks, > > Anton > > >
