Hi folks,

My name is Anton Rodriguez, I work for New Relic and I'm running a PoC with Paimon. It's very exciting and promising.

I'm having an issue with the Paimon Sink v0.6-SNAPSHOT. I'm running this code in my IDE:

|        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // CDC Source
        SourceFunction<String> sourceFunction = PostgreSQLSource.<String>builder()
            .hostname("xxxxx")
            .port(5432)
            .database("yyyy")
            .schemaList("public")
            .tableList("aaaaa")
            .username("bbb")
            .password("zzzz")
            .decodingPluginName("pgoutput")
            .slotName("jjjjj")
            .deserializer(new JsonDebeziumDeserializationSchema())
            .build();
|

|        DataStreamSource<String> dataStreamSource = env.addSource(sourceFunction);

        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);         tableEnv.executeSql("CREATE CATALOG paimon WITH ('type' = 'paimon', 'warehouse'='file:/tmp/paimon')");
        tableEnv.executeSql("USE CATALOG paimon");

        tableEnv.registerDataStream("input_table", dataStreamSource);

        tableEnv.executeSql("CREATE TABLE IF NOT EXISTS sink_paimon_table (\n" +
            "    json STRING PRIMARY KEY NOT ENFORCED\n" +
            ");");

        tableEnv.executeSql("INSERT INTO sink_paimon_table SELECT f0 as json FROM input_table");|

But it doesn't write to the table or show any error. I tested the reader with another writer and it's working. I'm also tested the Ververica CDC Connector source with a Print Sink and it's also working. Any idea about what could be wrong?

Thanks,

Anton


Reply via email to