Hi all

Posting some additional information...

Problem. when selecting in flink from the table=> "*select * fro*
*m postgres_catalog.inbound.children;*"

*results in:*

Both the id column and created_at column have values.

the entire payload as per children.json is also present in the data field.

the nationalId column is null however.

if i inspect the table in postgres using pgAdmin then all columns have
values as expected. Note the nationalId is a not null column in postgres
and also the primary key.

see attached pg_tbl.sql create sql for postgres table create.

and flink.tbl.sql used to create the target cdc table.

I'm stuck. please advise.

G

On Wed, Aug 27, 2025 at 3:53 PM George <[email protected]> wrote:

> Hi all...
>
> I have a data stream similar to the below going into a postgres table.
>
> Flink 1.20.1
> PostgreSql 12
> flink-sql-connector-postgres-cdc-3.1.1.jar
>
> {
> "_id": "96d35eb2-dc7f-40df-8128-30c58b250692",
> "dob": "19/03/28",
> "name": "Shaun",
> "gender": "Male",
> "address": {
> "town": "Galway City",
> "county": "Galway",
> "country": "Ireland",
> "province": "Connacht",
> "street_1": "99 Fresh Street Street",
> "street_2": "",
> "parcel_id": "H91 Y9P7-22470",
> "postal_code": "H91 Y9P7",
> "country_code": "IE",
> "neighbourhood": "Salthill"
> },
> "surname": "Doudigan",
> "uniqueId": "0002003P",
> "family_id": "4e6f3e02-91ac-42f9-a518-408e780a7c7b",
> "father_idNumber": "7934317B",
> "mother_idNumber": "0181947G"
>
> The above is pushed into the below table.
> The entire message into the data column
> uniqueID is extracted and pushed into the uniqueId column
> _id is extracted and pushed into the _id column and
> created_at is auto populated with the current time.
>
> If I inspect the data via pgAdmin all is good, as it is supposed to be,
> all columns got values.
>
> CREATE TABLE adults (
> id SERIAL NOT NULL,
> uniqueId varchar(14) NOT NULL,
> data JSONB,
> created_at timestamptz DEFAULT NOW() NOT NULL,
> PRIMARY KEY (uniqueId)
> ) TABLESPACE pg_default;
>
> this is my Flink table create into which the data is suppose to be pushed
> via the CDC process.
>
> CREATE TABLE postgres_catalog.datasource.children (
> `id` BIGINT,
> `uniqueId` VARCHAR(14),
> `data` STRING,
> `created_at` TIMESTAMP_LTZ(3),
> WATERMARK FOR `created_at` AS `created_at` - INTERVAL '5' SECOND
> PRIMARY KEY (uniqueId) NOT ENFORCED
> ) WITH (
> 'connector' = 'postgres-cdc'
> ,'hostname' = 'postgrescdc'
> ,'port' = '5432' -- NOTE: this is the port of the db on the container,
> not the external docker exported port via a port mapping.
> ,'username' = 'dbadmin'
> ,'password' = 'dbpassword'
> ,'database-name' = 'demog'
> ,'schema-name' = 'public'
> ,'table-name' = 'children'
> ,'slot.name' = 'children'
> ,'scan.incremental.snapshot.enabled' = 'true' -- experimental feature:
> incremental snapshot (default off)
> ,'scan.startup.mode' = 'initial' --
> https://nightlies.apache.org/flink/flink-cdc-docs-release-3.1/docs/connectors/flink-sources/postgres-cdc/#startup-reading-position
> ,'decoding.plugin.name' = 'pgoutput'
> );
>
> when I query this I got data in id, data, and created_at.
>
> *uniqueId is NULL though.*
>
> Once I got it into the above children table I want to unpack the data
> string/column into a table that includes a nested structure.
>
> G
>
> For reference...once the above is made to work I need to replicate it for
> the below message payload.
>
> {
> "_id": "47868ae5-7266-4c08-baad-5cc9f09a29bf",
> "dob": "95/09/03",
> "name": "Cahir",
> "gender": "M",
> "status": "Living",
> "account": [
> {
> "fspiId": "AIBKGB2",
> "accountId": "88126271",
> "memberName": "Allied Irish Banks plc",
> "accountType": "Savings/Deposit",
> "fspiAgentId": "AIBKGB2",
> "fspiAgentAccountId": "AIBKGB2-88126271"
> },
> {
> "expDate": "09/25",
> "cardHolder": "C Skehan",
> "cardNumber": "4969843748181157",
> "cardNetwork": "Visa",
> "issuingBank": "Bank of Ireland plc"
> },
> {
> "expDate": "10/25",
> "cardHolder": "C Skehan",
> "cardNumber": "4936893387414336",
> "cardNetwork": "Visa",
> "issuingBank": "Citibank Europe plc"
> }
> ],
> "address": {
> "town": "South Dublin",
> "county": "Dublin",
> "country": "Ireland",
> "province": "Leinster",
> "street_1": "77 Mageean Street Street",
> "street_2": "",
> "parcel_id": "D24-22536",
> "postal_code": "D24",
> "country_code": "IE",
> "neighbourhood": "Tallaght"
> },
> "partner": "2337266M",
> "surname": "Skehan",
> "uniqueId": "7087973R",
> "family_id": "3dd62d47-cf0b-4e77-9d2d-7f58de28a404",
> "marital_status": "Married"
> }
>
>
> --
> You have the obligation to inform one honestly of the risk, and as a person
> you are committed to educate yourself to the total risk in any activity!
>
> Once informed & totally aware of the risk,
> every fool has the right to kill or injure themselves as they see fit!
>


-- 
You have the obligation to inform one honestly of the risk, and as a person
you are committed to educate yourself to the total risk in any activity!

Once informed & totally aware of the risk,
every fool has the right to kill or injure themselves as they see fit!

Attachment: children.json
Description: application/json

Attachment: flink_tbl.sql
Description: Binary data

Attachment: pg_tbl.sql
Description: Binary data

Reply via email to