Hi folks,
My name is Anton Rodriguez, I work for New Relic and I'm running a PoC
with Paimon. It's very exciting and promising.
I'm having an issue with the Paimon Sink v0.6-SNAPSHOT. I'm running this
code in my IDE:
| final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
// CDC Source
SourceFunction<String> sourceFunction =
PostgreSQLSource.<String>builder()
.hostname("xxxxx")
.port(5432)
.database("yyyy")
.schemaList("public")
.tableList("aaaaa")
.username("bbb")
.password("zzzz")
.decodingPluginName("pgoutput")
.slotName("jjjjj")
.deserializer(new JsonDebeziumDeserializationSchema())
.build();
|
| DataStreamSource<String> dataStreamSource =
env.addSource(sourceFunction);
StreamTableEnvironment tableEnv =
StreamTableEnvironment.create(env);
tableEnv.executeSql("CREATE CATALOG paimon WITH ('type' =
'paimon', 'warehouse'='file:/tmp/paimon')");
tableEnv.executeSql("USE CATALOG paimon");
tableEnv.registerDataStream("input_table", dataStreamSource);
tableEnv.executeSql("CREATE TABLE IF NOT EXISTS
sink_paimon_table (\n" +
" json STRING PRIMARY KEY NOT ENFORCED\n" +
");");
tableEnv.executeSql("INSERT INTO sink_paimon_table SELECT f0 as
json FROM input_table");|
But it doesn't write to the table or show any error. I tested the reader
with another writer and it's working. I'm also tested the Ververica CDC
Connector source with a Print Sink and it's also working. Any idea about
what could be wrong?
Thanks,
Anton