Good news~

On Thu, Nov 23, 2023 at 7:24 AM Anton Rodriguez <[email protected]> wrote:
>
> Found it! "env.enableCheckpointing(10_000);" was missing. Thanks!
>
> On 22/11/23 21:53, Anton Rodriguez wrote:
>
> Hi folks,
>
> My name is Anton Rodriguez, I work for New Relic and I'm running a PoC with 
> Paimon. It's very exciting and promising.
>
> I'm having an issue with the Paimon Sink v0.6-SNAPSHOT. I'm running this code 
> in my IDE:
>
>         final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>
>         // CDC Source
>         SourceFunction<String> sourceFunction = 
> PostgreSQLSource.<String>builder()
>             .hostname("xxxxx")
>             .port(5432)
>             .database("yyyy")
>             .schemaList("public")
>             .tableList("aaaaa")
>             .username("bbb")
>             .password("zzzz")
>             .decodingPluginName("pgoutput")
>             .slotName("jjjjj")
>             .deserializer(new JsonDebeziumDeserializationSchema())
>             .build();
>
>         DataStreamSource<String> dataStreamSource = 
> env.addSource(sourceFunction);
>
>         StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
>         tableEnv.executeSql("CREATE CATALOG paimon WITH ('type' = 'paimon', 
> 'warehouse'='file:/tmp/paimon')");
>         tableEnv.executeSql("USE CATALOG paimon");
>
>         tableEnv.registerDataStream("input_table", dataStreamSource);
>
>         tableEnv.executeSql("CREATE TABLE IF NOT EXISTS sink_paimon_table 
> (\n" +
>             "    json STRING PRIMARY KEY NOT ENFORCED\n" +
>             ");");
>
>         tableEnv.executeSql("INSERT INTO sink_paimon_table SELECT f0 as json 
> FROM input_table");
>
> But it doesn't write to the table or show any error. I tested the reader with 
> another writer and it's working. I'm also tested the Ververica CDC Connector 
> source with a Print Sink and it's also working. Any idea about what could be 
> wrong?
>
> Thanks,
>
> Anton
>
>
>

Reply via email to