Hi all Hope someone can assist me.
I have the following Postgres table CREATE TABLE adults ( id SERIAL NOT NULL, nationalid varchar(14) NOT NULL, data JSONB, created_at timestamptz DEFAULT NOW() NOT NULL, PRIMARY KEY (nationalid) ) TABLESPACE pg_default; That contains the following payload. *See attached.* -- Take note of the array of accounts, this is what's causing my problems at the moment. I then use Flink CDC to consume this source table into the following Flink Table. CREATE OR REPLACE TABLE postgres_catalog.inbound.adults ( id BIGINT ,nationalid VARCHAR(14) --NOT NULL ,data STRING ,created_at TIMESTAMP_LTZ(3) ,WATERMARK FOR created_at AS created_at - INTERVAL '15' SECOND ,PRIMARY KEY (nationalid) NOT ENFORCED ) WITH ( 'connector' = 'postgres-cdc' ,'hostname' = 'postgrescdc' ,'port' = '5432' -- NOTE: this is the port of the db on the container, not the external docker exported port via a port mapping. ,'username' = 'dbadmin' ,'password' = 'dbpassword' ,'database-name' = 'demog' ,'schema-name' = 'public' ,'table-name' = 'adults' ,'slot.name' = 'adults0' ,'scan.incremental.snapshot.enabled' = 'true' -- experimental feature: incremental snapshot (default off) ,'scan.startup.mode' = 'initial' -- https://nightlies.apache.org/flink/flink-cdc-docs-release-3.1/docs/connectors/flink-sources/postgres-cdc/#startup-reading-position ,'decoding.plugin.name' = 'pgoutput' ); I'd like to unpack this into a defined Flink Table, structured, something like below ? CREATE OR REPLACE TABLE c_paimon.outbound.adults ( _id STRING ,name STRING ,surname STRING ,gender STRING ,nationalid STRING ,dob STRING ,marital_status STRING ,status STRING ,account ARRAY<ROW< fspiAgentAccountId STRING ,accountId STRING ,fspiId STRING ,memberName STRING ,accountType STRING ,fspiAgentId STRING ,expDate STRING ,cardHolder STRING ,cardNumber STRING ,cardNetwork STRING ,issuingBank STRING >> ,address ROW< street_1 STRING ,street_2 STRING ,neighbourhood STRING ,town STRING ,county STRING ,province STRING ,country STRING ,country_code STRING ,postal_code STRING ,parcel_id STRING > ,created_at TIMESTAMP_LTZ(3) ,WATERMARK FOR created_at AS created_at - INTERVAL '15' SECOND ,PRIMARY KEY (nationalid) NOT ENFORCED ) WITH ( 'file.format' = 'parquet' ,'bucket' = '2' ,'compaction.min.file-num' = '2' ,'compaction.early-max.file-num' = '50' ,'snapshot.time-retained' = '1h' ,'snapshot.num-retained.min' = '5' ,'snapshot.num-retained.max' = '20' ,'table.exec.sink.upsert-materialize'= 'NONE' )then use ; Anyone able to assist with the required insert statement. Intended destination table will be Apache Paimon. Oneplan was to split the inbound table into adults, address & accounts tables. - adults I got working, - address I need an upsert as I have another inbound stream that pushes potentially the same address into the address table. - I then fail on the accounts. Accounts can take 2 forms, Bank accounts or credit cards, it's pretty much based on which columns are completed in the accounts array item. George -- You have the obligation to inform one honestly of the risk, and as a person you are committed to educate yourself to the total risk in any activity! Once informed & totally aware of the risk, every fool has the right to kill or injure themselves as they see fit!
adults.json
Description: application/json
