Hi George,
Sorry I'm not familiar with the CDC connector details, but as long as your data
is of STRING type, you can extract fields from elements of a JSON array with a
query like this:
SELECT
JSON_VALUE(acc, '$.accountId') AS account_id,
JSON_VALUE(acc, '$.accountType') AS account_type,
JSON_VALUE(acc, '$.memberName') AS member_name
FROM postgres_catalog.inbound.adults AS t
CROSS JOIN UNNEST(
JSON_QUERY(t.data, '$.account' RETURNING ARRAY<STRING>)
) AS acc;
--
Best regards,
dylanhz
At 2025-09-16 13:34:53, "George" <[email protected]> wrote:
Hi there
Thanks for the reply, appreciated. I'm really seeing double with this one,
tried everything, yes it's simply a case of lack of experience on this side,
Would really appreciate assistance to get this working...
if we look at: Since Flink 1.20.2, if you need to work with a JSON array for
further processing, you can use:
JSON_QUERY(jsonValue, path RETURNING ARRAY<STRING>)
implies me the following:
select JSON_QUERY(data, '$.account') AS accounts FROM
postgres_catalog.inbound.adults;
I would expect this to give me an array of the account documents/objects, but
at this point still as strings. to then unpack/unwind each, instead.
Flink SQL> select JSON_QUERY(data, '$.account') AS accounts FROM
postgres_catalog.inbound.adults;
[ERROR] Could not execute SQL statement. Reason:
java.lang.ClassNotFoundException:
org.apache.flink.streaming.api.functions.source.SourceFunction
Some background. I have that adults.json coming into a field called data as
jsonb in a postgresql table, which is CDC consumed/pushed into flink. I'm
trying to unpack this into a flink table, thats proper/complex structured. Once
I have it there I can unwind the accounts and address objects into other
tables, as I have accounts and addresses coming from other tables also, so will
consolidate them all into one table to be sinkged to paimon and pushed to kafka
for consumption as structured objects.
On Tue, Sep 16, 2025 at 4:59 AM dylanhz <[email protected]> wrote:
Hi george,
JSON_QUERY returns a STRING by default (the JSON fragment as text). It is not
supported to directly CAST this string into complex types such as
ARRAY<ROW<...>>, so attempting to do so will result in a type conversion error.
Since Flink 1.20.2, if you need to work with a JSON array for further
processing, you can use:
JSON_QUERY(jsonValue, path RETURNING ARRAY<STRING>)
This returns the JSON array as an array of string elements, each representing
one array item. You can then UNNEST this array and apply JSON_VALUE on each
element to extract the required fields.
For more details, please refer to the Flink documentation:
https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/dev/table/functions/systemfunctions/
--
Best regards,
dylanhz
----- Original Message -----
From: George <[email protected]>
To: [email protected]
Sent: Mon, 15 Sep 2025 19:35:57 +0200
Subject: Cast error and battling to get around it.
Stuck with this, every where I go, different ideas...
data is a full string payload, as per attached.
the data.account field have an array of accounts
Flink 2.2 seem to be getting some new functionality from_json. but well, it's
not in 1.20 and 2.0 is not allowing me to CDC consume from a postgres table,
even when using the newest 3.4 library.
I've had this below error can't cast now on several attempts....
==> Cast function cannot convert value of type VARCHAR(2147483647) to type
VARCHAR(2147483647) ARRAY
Been stuck with this one for a while, hope someone can help.
Example payload attached.
Flink SQL> select
> JSON_VALUE(data, '$.nationalid') AS nationalid
> ,JSON_VALUE(data, '$._id') AS _id -- UUID
> generated by app, inside 'data' / json payload
> ,JSON_VALUE(data, '$.name') AS name
> ,JSON_VALUE(data, '$.surname') AS surname
> ,JSON_VALUE(data, '$.gender') AS gender
> ,JSON_VALUE(data, '$.dob') AS dob
> ,JSON_VALUE(data, '$.marital_status') AS marital_status
> ,JSON_VALUE(data, '$.status') AS status
> ,JSON_QUERY(data, '$.address') AS address
> ,CAST(
> JSON_QUERY(data, '$.account') AS
> ARRAY<ROW<
> fspiagentaccountid STRING
> ,accountid STRING
> ,fspiid STRING
> ,fspiagentid STRING
> ,accounttype STRING
> ,membername STRING
> ,cardholder STRING
> ,cardnumber STRING
> ,expdate STRING
> ,cardnetwork STRING
> ,issuingbank STRING
> >>
> ) AS account
> ,created_at AS created_at
> FROM postgres_catalog.inbound.adults;
[ERROR] Could not execute SQL statement. Reason:
org.apache.calcite.sql.validate.SqlValidatorException: Cast function cannot
convert value of type VARCHAR(2147483647) to type
RecordType(VARCHAR(2147483647) fspiagentaccountid, VARCHAR(2147483647)
accountid, VARCHAR(2147483647) fspiid, VARCHAR(2147483647) fspiagentid,
VARCHAR(2147483647) accounttype, VARCHAR(2147483647) membername,
VARCHAR(2147483647) cardholder, VARCHAR(2147483647) cardnumber,
VARCHAR(2147483647) expdate, VARCHAR(2147483647) cardnetwork,
VARCHAR(2147483647) issuingbank) ARRAY
Flink SQL> WITH unnested_accounts AS (
> SELECT
> JSON_VALUE(account_item, '$.fspiAgentAccountId') AS
> fspiagentaccountid
> ,JSON_VALUE(account_item, '$.accountId') AS accountid
> ,JSON_VALUE(account_item, '$.fspiId') AS fspiid
> ,JSON_VALUE(account_item, '$.fspiAgentId') AS fspiagentid
> ,JSON_VALUE(account_item, '$.accountType') AS accounttype
> ,JSON_VALUE(account_item, '$.memberName') AS membername
> ,JSON_VALUE(account_item, '$.cardHolder') AS cardholder
> ,JSON_VALUE(account_item, '$.cardNumber') AS cardnumber
> ,JSON_VALUE(account_item, '$.expDate') AS expdate
> ,JSON_VALUE(account_item, '$.cardNetwork') AS cardnetwork
> ,JSON_VALUE(account_item, '$.issuingBank') AS issuingbank
> FROM postgres_catalog.inbound.adults
> CROSS JOIN UNNEST(
> CAST(JSON_QUERY(data, '$.account') AS ARRAY<STRING>)
> ) AS t(account_item)
> )
> SELECT
> ARRAY_AGG(
> ROW(
> fspiagentaccountid
> ,accountid
> ,fspiid
> ,fspiagentid
> ,accounttype
> ,membername
> ,cardhol
der
> ,cardnumber
> ,expdate
> ,cardnetwork
> ,issuingbank
> )
> ) AS accounts
> FROM unnested_accounts;
[ERROR] Could not execute SQL statement. Reason:
org.apache.calcite.sql.validate.SqlValidatorException: Cast function cannot
convert value of type VARCHAR(2147483647) to type VARCHAR(2147483647) ARRAY
--
You have the obligation to inform one honestly of the risk, and as a person
you are committed to educate yourself to the total risk in any activity!
Once informed & totally aware of the risk,
every fool has the right to kill or injure themselves as they see fit!
--
You have the obligation to inform one honestly of the risk, and as a person
you are committed to educate yourself to the total risk in any activity!
Once informed & totally aware of the risk,
every fool has the right to kill or injure themselves as they see fit!