Hi all

Hope someone can assist me.

I have the following Postgres table

CREATE TABLE adults (
id SERIAL NOT NULL,
nationalid varchar(14) NOT NULL,
data JSONB,
created_at timestamptz DEFAULT NOW() NOT NULL,
PRIMARY KEY (nationalid)
) TABLESPACE pg_default;

That contains the following payload.

*See attached.*

-- Take note of the array of accounts, this is what's causing my problems
at the moment.

I then use Flink CDC to consume this source table into the following Flink
Table.

CREATE OR REPLACE TABLE postgres_catalog.inbound.adults (
id BIGINT
,nationalid VARCHAR(14) --NOT NULL
,data STRING
,created_at TIMESTAMP_LTZ(3)
,WATERMARK FOR created_at AS created_at - INTERVAL '15' SECOND
,PRIMARY KEY (nationalid) NOT ENFORCED
) WITH (
'connector' = 'postgres-cdc'
,'hostname' = 'postgrescdc'
,'port' = '5432' -- NOTE: this is the port of the db on the container, not
the external docker exported port via a port mapping.
,'username' = 'dbadmin'
,'password' = 'dbpassword'
,'database-name' = 'demog'
,'schema-name' = 'public'
,'table-name' = 'adults'
,'slot.name' = 'adults0'
,'scan.incremental.snapshot.enabled' = 'true' -- experimental feature:
incremental snapshot (default off)
,'scan.startup.mode' = 'initial' --
https://nightlies.apache.org/flink/flink-cdc-docs-release-3.1/docs/connectors/flink-sources/postgres-cdc/#startup-reading-position
,'decoding.plugin.name' = 'pgoutput'
);

I'd like to unpack this into a defined Flink Table, structured, something
like below ?

CREATE OR REPLACE TABLE c_paimon.outbound.adults (
_id STRING
,name STRING
,surname STRING
,gender STRING
,nationalid STRING
,dob STRING
,marital_status STRING
,status STRING
,account ARRAY<ROW<
fspiAgentAccountId STRING
,accountId STRING
,fspiId STRING
,memberName STRING
,accountType STRING
,fspiAgentId STRING
,expDate STRING
,cardHolder STRING
,cardNumber STRING
,cardNetwork STRING
,issuingBank STRING
>>
,address ROW<
street_1 STRING
,street_2 STRING
,neighbourhood STRING
,town STRING
,county STRING
,province STRING
,country STRING
,country_code STRING
,postal_code STRING
,parcel_id STRING
>
,created_at TIMESTAMP_LTZ(3)
,WATERMARK FOR created_at AS created_at - INTERVAL '15' SECOND
,PRIMARY KEY (nationalid) NOT ENFORCED
) WITH (
'file.format' = 'parquet'
,'bucket' = '2'
,'compaction.min.file-num' = '2'
,'compaction.early-max.file-num' = '50'
,'snapshot.time-retained' = '1h'
,'snapshot.num-retained.min' = '5'
,'snapshot.num-retained.max' = '20'
,'table.exec.sink.upsert-materialize'= 'NONE'

)then use ;

Anyone able to assist with the required insert statement.

Intended destination table will be Apache Paimon.

Oneplan was to split the inbound table into adults, address & accounts
tables.

   - adults I got working,
   - address I need an upsert as I have another inbound stream that pushes
   potentially the same address into the address table.
   - I then fail on the accounts.


Accounts can take 2 forms,
Bank accounts or credit cards, it's pretty much based on which columns are
completed in the accounts array item.

George

-- 
You have the obligation to inform one honestly of the risk, and as a person
you are committed to educate yourself to the total risk in any activity!

Once informed & totally aware of the risk,
every fool has the right to kill or injure themselves as they see fit!

Attachment: adults.json
Description: application/json

Reply via email to