Flink 1.20.x does not allow me to unpack/repack the json as a string into
the array of rows as a complex structure...

my inbound json payload is packed into a column called "data" as type
jsonb...

with the 1.20.x functionality I can't unpack jsonify it... and the
functionality needed is only in 2.2... and I'm yet to get it working on a
lower version, even with allot of attempts and suggestions.

G

On Thu, Sep 18, 2025 at 3:55 AM Hongshun Wang <[email protected]>
wrote:

> Hi George,
> Please use ,'scan.incremental.snapshot.enabled'    = 'true' . The old
> SouceFunction has been removed in flink 2.0. Otherwise, you can use flink
> 1.20.
>
> Best,
> Hongshun
>
> On Sun, Sep 14, 2025 at 11:48 PM George <[email protected]> wrote:
>
>> Hi all...
>>
>> Below is the jars included in my flink 2.0 build. and then the catalog
>> create and the query... If I drop flink to 1.20.2 and associated jars then
>> all works, but for 2.0 I'm a bit stuck...
>>
>> *Dockerfile*
>> RUN echo "--> Install JARs: Flink's S3 plugin" && \
>> mkdir ./plugins/s3-fs-hadoop && \
>> mv ./opt/flink-s3-fs-hadoop-2.0.0.jar ./plugins/s3-fs-hadoop/
>>
>> RUN echo "--> Install Flink JARs: Generic"
>> COPY stage/bundle-2.31.9.jar /opt/flink/lib/bundle-2.31.9.jar
>> COPY stage/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar
>> /opt/flink/lib/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar
>> COPY stage/flink-sql-connector-postgres-cdc-3.4.0.jar
>> /opt/flink/lib/flink-sql-connector-postgres-cdc-3.4.0.jar
>> COPY stage/postgresql-42.7.6.jar /opt/flink/lib/postgresql-42.7.6.jar
>>
>> # Required by Debezium
>> COPY stage/flink-sql-json-2.0.0.jar
>> /opt/flink/lib/flink-sql-json-2.0.0.jar
>> COPY stage/flink-json-2.0.0.jar /opt/flink/lib/stage/flink-json-2.0.0.jar
>> COPY stage/flink-sql-parquet-2.0.0.jar
>> /opt/flink/lib/flink-sql-parquet-2.0.02.jar
>>
>> # Paimon
>> COPY stage/paimon-flink-2.0-1.2.0.jar
>> /opt/flink/lib/paimon-flink-2.0-1.2.0.jar
>>
>> RUN chown -R flink:flink /opt/flink
>>
>> create_cat.sql
>> -- Inbound from PostgreSQL via CDC
>> CREATE CATALOG postgres_catalog WITH
>> ('type'='generic_in_memory');
>>
>> Create_inboude_cdc.sql
>> CREATE OR REPLACE TABLE postgres_catalog.inbound.children (
>> id BIGINT -- This is a postgresql Serial generated field
>> ,nationalid VARCHAR(14) -- NOT NULL
>> ,data STRING -- JSONB Payload
>> ,created_at TIMESTAMP_LTZ(3)
>> ,WATERMARK FOR created_at AS created_at - INTERVAL '15' SECOND
>> ,PRIMARY KEY (nationalid) NOT ENFORCED
>> ) WITH (
>> 'connector' = 'postgres-cdc'
>> ,'hostname' = 'postgrescdc'
>> ,'port' = '5432' -- NOTE: this is the port of the db on the container,
>> not the external docker exported port via a port mapping.
>> ,'username' = 'dbadmin'
>> ,'password' = 'dbpassword'
>> ,'database-name' = 'demog'
>> ,'schema-name' = 'public'
>> ,'table-name' = 'children'
>> ,'slot.name' = 'children'
>> ,'scan.incremental.snapshot.enabled' = 'false' -- experimental feature:
>> incremental snapshot (default of
>> ,'scan.startup.mode' = 'initial' --
>> https://nightlies.apache.org/flink/flink-cdc-docs-release-3.1/docs/connectors/flink-sources/postgres-cdc/#startup-reading-position
>> ,'decoding.plugin.name' = 'pgoutput'
>> );
>>
>>
>> *Flink SQL> select * from postgres_catalog.inbound.children;*
>> [ERROR] Could not execute SQL statement. Reason:
>> java.lang.ClassNotFoundException:
>> org.apache.flink.streaming.api.functions.source.SourceFunction
>> Flink SQL>
>>
>> G
>> --
>> You have the obligation to inform one honestly of the risk, and as a
>> person
>> you are committed to educate yourself to the total risk in any activity!
>>
>> Once informed & totally aware of the risk,
>> every fool has the right to kill or injure themselves as they see fit!
>>
>

-- 
You have the obligation to inform one honestly of the risk, and as a person
you are committed to educate yourself to the total risk in any activity!

Once informed & totally aware of the risk,
every fool has the right to kill or injure themselves as they see fit!

Reply via email to