Hi Dylon

Not to worry about the CDC bit, was just background information. Basically
I'm posting the record/document into a column called data, Flinks CDC via
PostgreSQL then consumes that and exposes the PostgreSQL table inside
Flink.
For everyone's background information, the PostgreSQL table structure...
Just background

CREATE TABLE adults (
id SERIAL NOT NULL,
nationalid varchar(14) NOT NULL,
data JSONB,
created_at timestamptz DEFAULT NOW() NOT NULL,
PRIMARY KEY (nationalid)
) TABLESPACE pg_default;

For everyone's background information, the Apache Flink table that is
populated with this information, records.

CREATE OR REPLACE TABLE postgres_catalog.inbound.adults (
id BIGINT -- This is a postgresql Serial generated field
,nationalid VARCHAR(14) -- NOT NULL
,data STRING -- JSONB Payload
,created_at TIMESTAMP_LTZ(3)
,WATERMARK FOR created_at AS created_at - INTERVAL '15' SECOND
,PRIMARY KEY (nationalid) NOT ENFORCED
) WITH (
'connector' = 'postgres-cdc'
,'hostname' = 'postgrescdc'
,'port' = '5432' -- NOTE: this is the port of the db on the container, not
the external docker exported port via a port mapping.
,'username' = 'dbadmin'
,'password' = 'dbpassword'
,'database-name' = 'demog'
,'schema-name' = 'public'
,'table-name' = 'adults'
,'slot.name' = 'adults0'
,'scan.incremental.snapshot.enabled' = 'true' -- experimental feature:
incremental snapshot (default off)
,'scan.startup.mode' = 'initial' --
https://nightlies.apache.org/flink/flink-cdc-docs-release-3.1/docs/connectors/flink-sources/postgres-cdc/#startup-reading-position
,'decoding.plugin.name' = 'pgoutput'
);


Flink SQL> SELECT
>     JSON_VALUE(acc, '$.accountId') AS account_id,
>     JSON_VALUE(acc, '$.accountType') AS account_type,
>     JSON_VALUE(acc, '$.memberName') AS member_name
> FROM postgres_catalog.inbound.adults AS t
> CROSS JOIN UNNEST(
>     JSON_QUERY(t.data, '$.account' RETURNING ARRAY<STRING>)
> ) AS acc;


*[ERROR] Could not execute SQL statement.
Reason:java.lang.ClassNotFoundException:
org.apache.flink.streaming.api.functions.source.SourceFunction*

G

On Tue, Sep 16, 2025 at 8:59 AM dylanhz <[email protected]> wrote:

> Hi George,
>
> Sorry I'm not familiar with the CDC connector details, but as long as your
> data is of STRING type, you can extract fields from elements of a JSON
> array with a query like this:
>
> SELECT
>     JSON_VALUE(acc, '$.accountId') AS account_id,
>     JSON_VALUE(acc, '$.accountType') AS account_type,
>     JSON_VALUE(acc, '$.memberName') AS member_name
> FROM postgres_catalog.inbound.adults AS t
> CROSS JOIN UNNEST(
>     JSON_QUERY(t.data, '$.account' RETURNING ARRAY<STRING>)
> ) AS acc;
>
>
> --
> Best regards,
> dylanhz
>
> At 2025-09-16 13:34:53, "George" <[email protected]> wrote:
>
> Hi there
>
> Thanks for the reply, appreciated. I'm really seeing double with this one,
> tried everything, yes it's simply a case of lack of experience on this
> side, Would really appreciate assistance to get this working...
>
> if we look at:  *Since Flink 1.20.2, if you need to work with a JSON
> array for further processing, you can use:*
>
> *JSON_QUERY(jsonValue, path RETURNING ARRAY<STRING>)*
>
> implies me the following:
>
> select JSON_QUERY(data, '$.account') AS accounts FROM
> postgres_catalog.inbound.adults;
>
> I would expect this to give me an array of the account documents/objects,
> but at this point still as strings. to then unpack/unwind each, instead.
>
> Flink SQL> select JSON_QUERY(data, '$.account')       AS accounts FROM
> postgres_catalog.inbound.adults;
> [ERROR] Could not execute SQL statement. Reason:
> java.lang.ClassNotFoundException:
> org.apache.flink.streaming.api.functions.source.SourceFunction
>
>
> Some background. I have that adults.json coming into a field called data
> as jsonb in a postgresql table, which is CDC consumed/pushed into flink.
> I'm trying to unpack this into a flink table, thats proper/complex
> structured. Once I have it there I can unwind the accounts and address
> objects into other tables, as I have accounts and addresses coming from
> other tables also, so will consolidate them all into one table to be
> sinkged to paimon and pushed to kafka for consumption as structured objects.
>
>
>
> On Tue, Sep 16, 2025 at 4:59 AM dylanhz <[email protected]> wrote:
>
>> Hi george,
>>
>>
>> JSON_QUERY returns a STRING by default (the JSON fragment as text). It
>> is not supported to directly CAST this string into complex types such as
>> ARRAY<ROW<...>>, so attempting to do so will result in a type conversion
>> error.
>>
>>
>> Since Flink 1.20.2, if you need to work with a JSON array for further
>> processing, you can use:
>>
>> JSON_QUERY(jsonValue, path RETURNING ARRAY<STRING>)
>>
>> This returns the JSON array as an array of string elements, each
>> representing one array item. You can then UNNEST this array and apply
>> JSON_VALUE on each element to extract the required fields.
>>
>> For more details, please refer to the Flink documentation:
>>
>> https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/dev/table/functions/systemfunctions/
>>
>>
>> --
>> Best regards,
>> dylanhz
>>
>> ----- Original Message -----
>> *From*: George <[email protected]>
>> *To*: [email protected]
>> *Sent*: Mon, 15 Sep 2025 19:35:57 +0200
>> *Subject*: Cast error and battling to get around it.
>>
>>
>> Stuck with this, every where I go, different ideas...
>>
>> data is a full string payload, as per attached.
>>
>> the data.account field have an array of accounts
>>
>> Flink 2.2 seem to be getting some new functionality from_json. but well,
>> it's not in 1.20 and 2.0 is not allowing me to CDC consume from a postgres
>> table, even when using the newest 3.4 library.
>>
>>
>> I've had this below error can't cast now on several attempts....
>>
>> ==>   Cast function cannot convert value of type VARCHAR(2147483647) to
>> type VARCHAR(2147483647) ARRAY
>>
>> Been stuck with this one for a while, hope someone can help.
>>
>> Example payload attached.
>>
>> Flink SQL> select
>> >      JSON_VALUE(data, '$.nationalid')       AS nationalid
>> >     ,JSON_VALUE(data, '$._id')              AS _id              -- UUID
>> generated by app, inside 'data' / json payload
>> >     ,JSON_VALUE(data, '$.name')             AS name
>> >     ,JSON_VALUE(data, '$.surname')          AS surname
>> >     ,JSON_VALUE(data, '$.gender')           AS gender
>> >     ,JSON_VALUE(data, '$.dob')              AS dob
>> >     ,JSON_VALUE(data, '$.marital_status')   AS marital_status
>> >     ,JSON_VALUE(data, '$.status')           AS status
>> >     ,JSON_QUERY(data, '$.address')          AS address
>> >     ,CAST(
>> >         JSON_QUERY(data, '$.account')       AS
>> >         ARRAY<ROW<
>> >              fspiagentaccountid STRING
>> >             ,accountid   STRING
>> >             ,fspiid      STRING
>> >             ,fspiagentid STRING
>> >             ,accounttype STRING
>> >             ,membername  STRING
>> >             ,cardholder  STRING
>> >             ,cardnumber  STRING
>> >             ,expdate     STRING
>> >             ,cardnetwork STRING
>> >             ,issuingbank STRING
>> >         >>
>> >     ) AS account
>> >     ,created_at                             AS created_at
>> > FROM postgres_catalog.inbound.adults;
>> [ERROR] Could not execute SQL statement. Reason:
>> org.apache.calcite.sql.validate.SqlValidatorException: Cast function
>> cannot convert value of type VARCHAR(2147483647) to type
>> RecordType(VARCHAR(2147483647) fspiagentaccountid, VARCHAR(2147483647)
>> accountid, VARCHAR(2147483647) fspiid, VARCHAR(2147483647) fspiagentid,
>> VARCHAR(2147483647) accounttype, VARCHAR(2147483647) membername,
>> VARCHAR(2147483647) cardholder, VARCHAR(2147483647) cardnumber,
>> VARCHAR(2147483647) expdate, VARCHAR(2147483647) cardnetwork,
>> VARCHAR(2147483647) issuingbank) ARRAY
>>
>> Flink SQL> WITH unnested_accounts AS (
>> >     SELECT
>> >          JSON_VALUE(account_item, '$.fspiAgentAccountId') AS
>> fspiagentaccountid
>> >         ,JSON_VALUE(account_item, '$.accountId')         AS accountid
>> >         ,JSON_VALUE(account_item, '$.fspiId')            AS fspiid
>> >         ,JSON_VALUE(account_item, '$.fspiAgentId')       AS fspiagentid
>> >         ,JSON_VALUE(account_item, '$.accountType')       AS accounttype
>> >         ,JSON_VALUE(account_item, '$.memberName')        AS membername
>> >         ,JSON_VALUE(account_item, '$.cardHolder')        AS cardholder
>> >         ,JSON_VALUE(account_item, '$.cardNumber')        AS cardnumber
>> >         ,JSON_VALUE(account_item, '$.expDate')           AS expdate
>> >         ,JSON_VALUE(account_item, '$.cardNetwork')       AS cardnetwork
>> >         ,JSON_VALUE(account_item, '$.issuingBank')       AS issuingbank
>> >     FROM postgres_catalog.inbound.adults
>> >     CROSS JOIN UNNEST(
>> >         CAST(JSON_QUERY(data, '$.account') AS ARRAY<STRING>)
>> >     ) AS t(account_item)
>> > )
>> > SELECT
>> >     ARRAY_AGG(
>> >         ROW(
>> >              fspiagentaccountid
>> >             ,accountid
>> >             ,fspiid
>> >             ,fspiagentid
>> >             ,accounttype
>> >             ,membername
>> >             ,cardhol
>>
>> der
>> >             ,cardnumber
>> >             ,expdate
>> >             ,cardnetwork
>> >             ,issuingbank
>> >         )
>> >     ) AS accounts
>> > FROM unnested_accounts;
>> [ERROR] Could not execute SQL statement. Reason:
>> org.apache.calcite.sql.validate.SqlValidatorException: Cast function
>> cannot convert value of type VARCHAR(2147483647) to type
>> VARCHAR(2147483647) ARRAY
>>
>>
>>
>>
>> --
>> You have the obligation to inform one honestly of the risk, and as a
>> person
>> you are committed to educate yourself to the total risk in any activity!
>>
>> Once informed & totally aware of the risk,
>> every fool has the right to kill or injure themselves as they see fit!
>>
>>
>
> --
> You have the obligation to inform one honestly of the risk, and as a person
> you are committed to educate yourself to the total risk in any activity!
>
> Once informed & totally aware of the risk,
> every fool has the right to kill or injure themselves as they see fit!
>
>

-- 
You have the obligation to inform one honestly of the risk, and as a person
you are committed to educate yourself to the total risk in any activity!

Once informed & totally aware of the risk,
every fool has the right to kill or injure themselves as they see fit!

Reply via email to