Hi Dylon Not to worry about the CDC bit, was just background information. Basically I'm posting the record/document into a column called data, Flinks CDC via PostgreSQL then consumes that and exposes the PostgreSQL table inside Flink. For everyone's background information, the PostgreSQL table structure... Just background
CREATE TABLE adults ( id SERIAL NOT NULL, nationalid varchar(14) NOT NULL, data JSONB, created_at timestamptz DEFAULT NOW() NOT NULL, PRIMARY KEY (nationalid) ) TABLESPACE pg_default; For everyone's background information, the Apache Flink table that is populated with this information, records. CREATE OR REPLACE TABLE postgres_catalog.inbound.adults ( id BIGINT -- This is a postgresql Serial generated field ,nationalid VARCHAR(14) -- NOT NULL ,data STRING -- JSONB Payload ,created_at TIMESTAMP_LTZ(3) ,WATERMARK FOR created_at AS created_at - INTERVAL '15' SECOND ,PRIMARY KEY (nationalid) NOT ENFORCED ) WITH ( 'connector' = 'postgres-cdc' ,'hostname' = 'postgrescdc' ,'port' = '5432' -- NOTE: this is the port of the db on the container, not the external docker exported port via a port mapping. ,'username' = 'dbadmin' ,'password' = 'dbpassword' ,'database-name' = 'demog' ,'schema-name' = 'public' ,'table-name' = 'adults' ,'slot.name' = 'adults0' ,'scan.incremental.snapshot.enabled' = 'true' -- experimental feature: incremental snapshot (default off) ,'scan.startup.mode' = 'initial' -- https://nightlies.apache.org/flink/flink-cdc-docs-release-3.1/docs/connectors/flink-sources/postgres-cdc/#startup-reading-position ,'decoding.plugin.name' = 'pgoutput' ); Flink SQL> SELECT > JSON_VALUE(acc, '$.accountId') AS account_id, > JSON_VALUE(acc, '$.accountType') AS account_type, > JSON_VALUE(acc, '$.memberName') AS member_name > FROM postgres_catalog.inbound.adults AS t > CROSS JOIN UNNEST( > JSON_QUERY(t.data, '$.account' RETURNING ARRAY<STRING>) > ) AS acc; *[ERROR] Could not execute SQL statement. Reason:java.lang.ClassNotFoundException: org.apache.flink.streaming.api.functions.source.SourceFunction* G On Tue, Sep 16, 2025 at 8:59 AM dylanhz <[email protected]> wrote: > Hi George, > > Sorry I'm not familiar with the CDC connector details, but as long as your > data is of STRING type, you can extract fields from elements of a JSON > array with a query like this: > > SELECT > JSON_VALUE(acc, '$.accountId') AS account_id, > JSON_VALUE(acc, '$.accountType') AS account_type, > JSON_VALUE(acc, '$.memberName') AS member_name > FROM postgres_catalog.inbound.adults AS t > CROSS JOIN UNNEST( > JSON_QUERY(t.data, '$.account' RETURNING ARRAY<STRING>) > ) AS acc; > > > -- > Best regards, > dylanhz > > At 2025-09-16 13:34:53, "George" <[email protected]> wrote: > > Hi there > > Thanks for the reply, appreciated. I'm really seeing double with this one, > tried everything, yes it's simply a case of lack of experience on this > side, Would really appreciate assistance to get this working... > > if we look at: *Since Flink 1.20.2, if you need to work with a JSON > array for further processing, you can use:* > > *JSON_QUERY(jsonValue, path RETURNING ARRAY<STRING>)* > > implies me the following: > > select JSON_QUERY(data, '$.account') AS accounts FROM > postgres_catalog.inbound.adults; > > I would expect this to give me an array of the account documents/objects, > but at this point still as strings. to then unpack/unwind each, instead. > > Flink SQL> select JSON_QUERY(data, '$.account') AS accounts FROM > postgres_catalog.inbound.adults; > [ERROR] Could not execute SQL statement. Reason: > java.lang.ClassNotFoundException: > org.apache.flink.streaming.api.functions.source.SourceFunction > > > Some background. I have that adults.json coming into a field called data > as jsonb in a postgresql table, which is CDC consumed/pushed into flink. > I'm trying to unpack this into a flink table, thats proper/complex > structured. Once I have it there I can unwind the accounts and address > objects into other tables, as I have accounts and addresses coming from > other tables also, so will consolidate them all into one table to be > sinkged to paimon and pushed to kafka for consumption as structured objects. > > > > On Tue, Sep 16, 2025 at 4:59 AM dylanhz <[email protected]> wrote: > >> Hi george, >> >> >> JSON_QUERY returns a STRING by default (the JSON fragment as text). It >> is not supported to directly CAST this string into complex types such as >> ARRAY<ROW<...>>, so attempting to do so will result in a type conversion >> error. >> >> >> Since Flink 1.20.2, if you need to work with a JSON array for further >> processing, you can use: >> >> JSON_QUERY(jsonValue, path RETURNING ARRAY<STRING>) >> >> This returns the JSON array as an array of string elements, each >> representing one array item. You can then UNNEST this array and apply >> JSON_VALUE on each element to extract the required fields. >> >> For more details, please refer to the Flink documentation: >> >> https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/dev/table/functions/systemfunctions/ >> >> >> -- >> Best regards, >> dylanhz >> >> ----- Original Message ----- >> *From*: George <[email protected]> >> *To*: [email protected] >> *Sent*: Mon, 15 Sep 2025 19:35:57 +0200 >> *Subject*: Cast error and battling to get around it. >> >> >> Stuck with this, every where I go, different ideas... >> >> data is a full string payload, as per attached. >> >> the data.account field have an array of accounts >> >> Flink 2.2 seem to be getting some new functionality from_json. but well, >> it's not in 1.20 and 2.0 is not allowing me to CDC consume from a postgres >> table, even when using the newest 3.4 library. >> >> >> I've had this below error can't cast now on several attempts.... >> >> ==> Cast function cannot convert value of type VARCHAR(2147483647) to >> type VARCHAR(2147483647) ARRAY >> >> Been stuck with this one for a while, hope someone can help. >> >> Example payload attached. >> >> Flink SQL> select >> > JSON_VALUE(data, '$.nationalid') AS nationalid >> > ,JSON_VALUE(data, '$._id') AS _id -- UUID >> generated by app, inside 'data' / json payload >> > ,JSON_VALUE(data, '$.name') AS name >> > ,JSON_VALUE(data, '$.surname') AS surname >> > ,JSON_VALUE(data, '$.gender') AS gender >> > ,JSON_VALUE(data, '$.dob') AS dob >> > ,JSON_VALUE(data, '$.marital_status') AS marital_status >> > ,JSON_VALUE(data, '$.status') AS status >> > ,JSON_QUERY(data, '$.address') AS address >> > ,CAST( >> > JSON_QUERY(data, '$.account') AS >> > ARRAY<ROW< >> > fspiagentaccountid STRING >> > ,accountid STRING >> > ,fspiid STRING >> > ,fspiagentid STRING >> > ,accounttype STRING >> > ,membername STRING >> > ,cardholder STRING >> > ,cardnumber STRING >> > ,expdate STRING >> > ,cardnetwork STRING >> > ,issuingbank STRING >> > >> >> > ) AS account >> > ,created_at AS created_at >> > FROM postgres_catalog.inbound.adults; >> [ERROR] Could not execute SQL statement. Reason: >> org.apache.calcite.sql.validate.SqlValidatorException: Cast function >> cannot convert value of type VARCHAR(2147483647) to type >> RecordType(VARCHAR(2147483647) fspiagentaccountid, VARCHAR(2147483647) >> accountid, VARCHAR(2147483647) fspiid, VARCHAR(2147483647) fspiagentid, >> VARCHAR(2147483647) accounttype, VARCHAR(2147483647) membername, >> VARCHAR(2147483647) cardholder, VARCHAR(2147483647) cardnumber, >> VARCHAR(2147483647) expdate, VARCHAR(2147483647) cardnetwork, >> VARCHAR(2147483647) issuingbank) ARRAY >> >> Flink SQL> WITH unnested_accounts AS ( >> > SELECT >> > JSON_VALUE(account_item, '$.fspiAgentAccountId') AS >> fspiagentaccountid >> > ,JSON_VALUE(account_item, '$.accountId') AS accountid >> > ,JSON_VALUE(account_item, '$.fspiId') AS fspiid >> > ,JSON_VALUE(account_item, '$.fspiAgentId') AS fspiagentid >> > ,JSON_VALUE(account_item, '$.accountType') AS accounttype >> > ,JSON_VALUE(account_item, '$.memberName') AS membername >> > ,JSON_VALUE(account_item, '$.cardHolder') AS cardholder >> > ,JSON_VALUE(account_item, '$.cardNumber') AS cardnumber >> > ,JSON_VALUE(account_item, '$.expDate') AS expdate >> > ,JSON_VALUE(account_item, '$.cardNetwork') AS cardnetwork >> > ,JSON_VALUE(account_item, '$.issuingBank') AS issuingbank >> > FROM postgres_catalog.inbound.adults >> > CROSS JOIN UNNEST( >> > CAST(JSON_QUERY(data, '$.account') AS ARRAY<STRING>) >> > ) AS t(account_item) >> > ) >> > SELECT >> > ARRAY_AGG( >> > ROW( >> > fspiagentaccountid >> > ,accountid >> > ,fspiid >> > ,fspiagentid >> > ,accounttype >> > ,membername >> > ,cardhol >> >> der >> > ,cardnumber >> > ,expdate >> > ,cardnetwork >> > ,issuingbank >> > ) >> > ) AS accounts >> > FROM unnested_accounts; >> [ERROR] Could not execute SQL statement. Reason: >> org.apache.calcite.sql.validate.SqlValidatorException: Cast function >> cannot convert value of type VARCHAR(2147483647) to type >> VARCHAR(2147483647) ARRAY >> >> >> >> >> -- >> You have the obligation to inform one honestly of the risk, and as a >> person >> you are committed to educate yourself to the total risk in any activity! >> >> Once informed & totally aware of the risk, >> every fool has the right to kill or injure themselves as they see fit! >> >> > > -- > You have the obligation to inform one honestly of the risk, and as a person > you are committed to educate yourself to the total risk in any activity! > > Once informed & totally aware of the risk, > every fool has the right to kill or injure themselves as they see fit! > > -- You have the obligation to inform one honestly of the risk, and as a person you are committed to educate yourself to the total risk in any activity! Once informed & totally aware of the risk, every fool has the right to kill or injure themselves as they see fit!
