Hi George,

Sorry I'm not familiar with the CDC connector details, but as long as your data 
is of STRING type, you can extract fields from elements of a JSON array with a 
query like this:


SELECT
    JSON_VALUE(acc, '$.accountId') AS account_id,
    JSON_VALUE(acc, '$.accountType') AS account_type,
    JSON_VALUE(acc, '$.memberName') AS member_name
FROM postgres_catalog.inbound.adults AS t
CROSS JOIN UNNEST(
    JSON_QUERY(t.data, '$.account' RETURNING ARRAY<STRING>)
) AS acc;





--

Best regards,
dylanhz



At 2025-09-16 13:34:53, "George" <[email protected]> wrote:

Hi there


Thanks for the reply, appreciated. I'm really seeing double with this one, 
tried everything, yes it's simply a case of lack of experience on this side, 
Would really appreciate assistance to get this working...


if we look at:  Since Flink 1.20.2, if you need to work with a JSON array for 
further processing, you can use:

JSON_QUERY(jsonValue, path RETURNING ARRAY<STRING>)



implies me the following:


select JSON_QUERY(data, '$.account') AS accounts FROM 
postgres_catalog.inbound.adults;


I would expect this to give me an array of the account documents/objects, but 
at this point still as strings. to then unpack/unwind each, instead.


Flink SQL> select JSON_QUERY(data, '$.account')       AS accounts FROM 
postgres_catalog.inbound.adults;
[ERROR] Could not execute SQL statement. Reason:
java.lang.ClassNotFoundException: 
org.apache.flink.streaming.api.functions.source.SourceFunction





Some background. I have that adults.json coming into a field called data as 
jsonb in a postgresql table, which is CDC consumed/pushed into flink. I'm 
trying to unpack this into a flink table, thats proper/complex structured. Once 
I have it there I can unwind the accounts and address objects into other 
tables, as I have accounts and addresses coming from other tables also, so will 
consolidate them all into one table to be sinkged to paimon and pushed to kafka 
for consumption as structured objects.






On Tue, Sep 16, 2025 at 4:59 AM dylanhz <[email protected]> wrote:


Hi george,




JSON_QUERY returns a STRING by default (the JSON fragment as text). It is not 
supported to directly CAST this string into complex types such as 
ARRAY<ROW<...>>, so attempting to do so will result in a type conversion error.




Since Flink 1.20.2, if you need to work with a JSON array for further 
processing, you can use:

JSON_QUERY(jsonValue, path RETURNING ARRAY<STRING>)

This returns the JSON array as an array of string elements, each representing 
one array item. You can then UNNEST this array and apply JSON_VALUE on each 
element to extract the required fields.



For more details, please refer to the Flink documentation: 
https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/dev/table/functions/systemfunctions/





--

Best regards,
dylanhz


----- Original Message -----
From: George <[email protected]>
To: [email protected]
Sent: Mon, 15 Sep 2025 19:35:57 +0200
Subject: Cast error and battling to get around it.




Stuck with this, every where I go, different ideas...


data is a full string payload, as per attached.


the data.account field have an array of accounts 


Flink 2.2 seem to be getting some new functionality from_json. but well, it's 
not in 1.20 and 2.0 is not allowing me to CDC consume from a postgres table, 
even when using the newest 3.4 library.




I've had this below error can't cast now on several attempts....


==>   Cast function cannot convert value of type VARCHAR(2147483647) to type 
VARCHAR(2147483647) ARRAY


Been stuck with this one for a while, hope someone can help.


Example payload attached.

Flink SQL> select
>      JSON_VALUE(data, '$.nationalid')       AS nationalid
>     ,JSON_VALUE(data, '$._id')              AS _id              -- UUID 
> generated by app, inside 'data' / json payload
>     ,JSON_VALUE(data, '$.name')             AS name
>     ,JSON_VALUE(data, '$.surname')          AS surname
>     ,JSON_VALUE(data, '$.gender')           AS gender
>     ,JSON_VALUE(data, '$.dob')              AS dob
>     ,JSON_VALUE(data, '$.marital_status')   AS marital_status
>     ,JSON_VALUE(data, '$.status')           AS status
>     ,JSON_QUERY(data, '$.address')          AS address
>     ,CAST(
>         JSON_QUERY(data, '$.account')       AS
>         ARRAY<ROW<
>              fspiagentaccountid STRING
>             ,accountid   STRING
>             ,fspiid      STRING
>             ,fspiagentid STRING
>             ,accounttype STRING
>             ,membername  STRING
>             ,cardholder  STRING
>             ,cardnumber  STRING
>             ,expdate     STRING
>             ,cardnetwork STRING
>             ,issuingbank STRING
>         >>
>     ) AS account
>     ,created_at                             AS created_at
> FROM postgres_catalog.inbound.adults;
[ERROR] Could not execute SQL statement. Reason:
org.apache.calcite.sql.validate.SqlValidatorException: Cast function cannot 
convert value of type VARCHAR(2147483647) to type 
RecordType(VARCHAR(2147483647) fspiagentaccountid, VARCHAR(2147483647) 
accountid, VARCHAR(2147483647) fspiid, VARCHAR(2147483647) fspiagentid, 
VARCHAR(2147483647) accounttype, VARCHAR(2147483647) membername, 
VARCHAR(2147483647) cardholder, VARCHAR(2147483647) cardnumber, 
VARCHAR(2147483647) expdate, VARCHAR(2147483647) cardnetwork, 
VARCHAR(2147483647) issuingbank) ARRAY

Flink SQL> WITH unnested_accounts AS (
>     SELECT
>          JSON_VALUE(account_item, '$.fspiAgentAccountId') AS 
> fspiagentaccountid
>         ,JSON_VALUE(account_item, '$.accountId')         AS accountid
>         ,JSON_VALUE(account_item, '$.fspiId')            AS fspiid
>         ,JSON_VALUE(account_item, '$.fspiAgentId')       AS fspiagentid
>         ,JSON_VALUE(account_item, '$.accountType')       AS accounttype
>         ,JSON_VALUE(account_item, '$.memberName')        AS membername
>         ,JSON_VALUE(account_item, '$.cardHolder')        AS cardholder
>         ,JSON_VALUE(account_item, '$.cardNumber')        AS cardnumber
>         ,JSON_VALUE(account_item, '$.expDate')           AS expdate
>         ,JSON_VALUE(account_item, '$.cardNetwork')       AS cardnetwork
>         ,JSON_VALUE(account_item, '$.issuingBank')       AS issuingbank
>     FROM postgres_catalog.inbound.adults
>     CROSS JOIN UNNEST(
>         CAST(JSON_QUERY(data, '$.account') AS ARRAY<STRING>)
>     ) AS t(account_item)
> )
> SELECT
>     ARRAY_AGG(
>         ROW(
>              fspiagentaccountid
>             ,accountid
>             ,fspiid
>             ,fspiagentid
>             ,accounttype
>             ,membername
>             ,cardhol


der
>             ,cardnumber
>             ,expdate
>             ,cardnetwork
>             ,issuingbank
>         )
>     ) AS accounts
> FROM unnested_accounts;
[ERROR] Could not execute SQL statement. Reason:
org.apache.calcite.sql.validate.SqlValidatorException: Cast function cannot 
convert value of type VARCHAR(2147483647) to type VARCHAR(2147483647) ARRAY









--

You have the obligation to inform one honestly of the risk, and as a person
you are committed to educate yourself to the total risk in any activity!

Once informed & totally aware of the risk,
every fool has the right to kill or injure themselves as they see fit!




--

You have the obligation to inform one honestly of the risk, and as a person
you are committed to educate yourself to the total risk in any activity!

Once informed & totally aware of the risk,
every fool has the right to kill or injure themselves as they see fit!

Reply via email to