Hi all Posting some additional information...
Problem. when selecting in flink from the table=> "*select * fro* *m postgres_catalog.inbound.children;*" *results in:* Both the id column and created_at column have values. the entire payload as per children.json is also present in the data field. the nationalId column is null however. if i inspect the table in postgres using pgAdmin then all columns have values as expected. Note the nationalId is a not null column in postgres and also the primary key. see attached pg_tbl.sql create sql for postgres table create. and flink.tbl.sql used to create the target cdc table. I'm stuck. please advise. G On Wed, Aug 27, 2025 at 3:53 PM George <[email protected]> wrote: > Hi all... > > I have a data stream similar to the below going into a postgres table. > > Flink 1.20.1 > PostgreSql 12 > flink-sql-connector-postgres-cdc-3.1.1.jar > > { > "_id": "96d35eb2-dc7f-40df-8128-30c58b250692", > "dob": "19/03/28", > "name": "Shaun", > "gender": "Male", > "address": { > "town": "Galway City", > "county": "Galway", > "country": "Ireland", > "province": "Connacht", > "street_1": "99 Fresh Street Street", > "street_2": "", > "parcel_id": "H91 Y9P7-22470", > "postal_code": "H91 Y9P7", > "country_code": "IE", > "neighbourhood": "Salthill" > }, > "surname": "Doudigan", > "uniqueId": "0002003P", > "family_id": "4e6f3e02-91ac-42f9-a518-408e780a7c7b", > "father_idNumber": "7934317B", > "mother_idNumber": "0181947G" > > The above is pushed into the below table. > The entire message into the data column > uniqueID is extracted and pushed into the uniqueId column > _id is extracted and pushed into the _id column and > created_at is auto populated with the current time. > > If I inspect the data via pgAdmin all is good, as it is supposed to be, > all columns got values. > > CREATE TABLE adults ( > id SERIAL NOT NULL, > uniqueId varchar(14) NOT NULL, > data JSONB, > created_at timestamptz DEFAULT NOW() NOT NULL, > PRIMARY KEY (uniqueId) > ) TABLESPACE pg_default; > > this is my Flink table create into which the data is suppose to be pushed > via the CDC process. > > CREATE TABLE postgres_catalog.datasource.children ( > `id` BIGINT, > `uniqueId` VARCHAR(14), > `data` STRING, > `created_at` TIMESTAMP_LTZ(3), > WATERMARK FOR `created_at` AS `created_at` - INTERVAL '5' SECOND > PRIMARY KEY (uniqueId) NOT ENFORCED > ) WITH ( > 'connector' = 'postgres-cdc' > ,'hostname' = 'postgrescdc' > ,'port' = '5432' -- NOTE: this is the port of the db on the container, > not the external docker exported port via a port mapping. > ,'username' = 'dbadmin' > ,'password' = 'dbpassword' > ,'database-name' = 'demog' > ,'schema-name' = 'public' > ,'table-name' = 'children' > ,'slot.name' = 'children' > ,'scan.incremental.snapshot.enabled' = 'true' -- experimental feature: > incremental snapshot (default off) > ,'scan.startup.mode' = 'initial' -- > https://nightlies.apache.org/flink/flink-cdc-docs-release-3.1/docs/connectors/flink-sources/postgres-cdc/#startup-reading-position > ,'decoding.plugin.name' = 'pgoutput' > ); > > when I query this I got data in id, data, and created_at. > > *uniqueId is NULL though.* > > Once I got it into the above children table I want to unpack the data > string/column into a table that includes a nested structure. > > G > > For reference...once the above is made to work I need to replicate it for > the below message payload. > > { > "_id": "47868ae5-7266-4c08-baad-5cc9f09a29bf", > "dob": "95/09/03", > "name": "Cahir", > "gender": "M", > "status": "Living", > "account": [ > { > "fspiId": "AIBKGB2", > "accountId": "88126271", > "memberName": "Allied Irish Banks plc", > "accountType": "Savings/Deposit", > "fspiAgentId": "AIBKGB2", > "fspiAgentAccountId": "AIBKGB2-88126271" > }, > { > "expDate": "09/25", > "cardHolder": "C Skehan", > "cardNumber": "4969843748181157", > "cardNetwork": "Visa", > "issuingBank": "Bank of Ireland plc" > }, > { > "expDate": "10/25", > "cardHolder": "C Skehan", > "cardNumber": "4936893387414336", > "cardNetwork": "Visa", > "issuingBank": "Citibank Europe plc" > } > ], > "address": { > "town": "South Dublin", > "county": "Dublin", > "country": "Ireland", > "province": "Leinster", > "street_1": "77 Mageean Street Street", > "street_2": "", > "parcel_id": "D24-22536", > "postal_code": "D24", > "country_code": "IE", > "neighbourhood": "Tallaght" > }, > "partner": "2337266M", > "surname": "Skehan", > "uniqueId": "7087973R", > "family_id": "3dd62d47-cf0b-4e77-9d2d-7f58de28a404", > "marital_status": "Married" > } > > > -- > You have the obligation to inform one honestly of the risk, and as a person > you are committed to educate yourself to the total risk in any activity! > > Once informed & totally aware of the risk, > every fool has the right to kill or injure themselves as they see fit! > -- You have the obligation to inform one honestly of the risk, and as a person you are committed to educate yourself to the total risk in any activity! Once informed & totally aware of the risk, every fool has the right to kill or injure themselves as they see fit!
children.json
Description: application/json
flink_tbl.sql
Description: Binary data
pg_tbl.sql
Description: Binary data
