Hi all / Hong.

Slight contradiction picked up, currently Flink CDC 3.4 (Postgres) is not
compatible with Flink 2.x
https://nightlies.apache.org/flink/flink-cdc-docs-release-3.4/docs/connectors/flink-sources/overview/#supported-flink-versions

So the setting above wont make a difference.

G

On Thu, Sep 18, 2025 at 4:57 PM George <[email protected]> wrote:

> Hi Wang
>
> As per previous, this is already how my CDC is configured.
> any other ideas.
>
> here is my table create.
>
> CREATE OR REPLACE TABLE postgres_catalog.inbound.adults (
> id BIGINT -- This is a postgresql Serial generated field
> ,nationalid VARCHAR(14) -- NOT NULL
> ,data STRING -- JSONB Payload
> ,created_at TIMESTAMP_LTZ(3)
> ,WATERMARK FOR created_at AS created_at - INTERVAL '15' SECOND
> ,PRIMARY KEY (nationalid) NOT ENFORCED
> ) WITH (
> 'connector' = 'postgres-cdc'
> ,'hostname' = 'postgrescdc'
> ,'port' = '5432' -- NOTE: this is the port of the db on the container,
> not the external docker exported port via a port mapping.
> ,'username' = 'dbadmin'
> ,'password' = 'dbpassword'
> ,'database-name' = 'demog'
> ,'schema-name' = 'public'
> ,'table-name' = 'adults'
> ,'slot.name' = 'adults0'
> ,'scan.incremental.snapshot.enabled' = 'true' -- experimental feature:
> incremental snapshot (default off)
> ,'scan.startup.mode' = 'initial' --
> https://nightlies.apache.org/flink/flink-cdc-docs-release-3.1/docs/connectors/flink-sources/postgres-cdc/#startup-reading-position
> ,'decoding.plugin.name' = 'pgoutput'
> );
>
> G
>
> On Thu, Sep 18, 2025 at 3:55 AM Hongshun Wang <[email protected]>
> wrote:
>
>> Hi George,
>> Please use ,'scan.incremental.snapshot.enabled'    = 'true' . The old
>> SouceFunction has been removed in flink 2.0. Otherwise, you can use flink
>> 1.20.
>>
>> Best,
>> Hongshun
>>
>> On Sun, Sep 14, 2025 at 11:48 PM George <[email protected]> wrote:
>>
>>> Hi all...
>>>
>>> Below is the jars included in my flink 2.0 build. and then the catalog
>>> create and the query... If I drop flink to 1.20.2 and associated jars then
>>> all works, but for 2.0 I'm a bit stuck...
>>>
>>> *Dockerfile*
>>> RUN echo "--> Install JARs: Flink's S3 plugin" && \
>>> mkdir ./plugins/s3-fs-hadoop && \
>>> mv ./opt/flink-s3-fs-hadoop-2.0.0.jar ./plugins/s3-fs-hadoop/
>>>
>>> RUN echo "--> Install Flink JARs: Generic"
>>> COPY stage/bundle-2.31.9.jar /opt/flink/lib/bundle-2.31.9.jar
>>> COPY stage/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar
>>> /opt/flink/lib/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar
>>> COPY stage/flink-sql-connector-postgres-cdc-3.4.0.jar
>>> /opt/flink/lib/flink-sql-connector-postgres-cdc-3.4.0.jar
>>> COPY stage/postgresql-42.7.6.jar /opt/flink/lib/postgresql-42.7.6.jar
>>>
>>> # Required by Debezium
>>> COPY stage/flink-sql-json-2.0.0.jar
>>> /opt/flink/lib/flink-sql-json-2.0.0.jar
>>> COPY stage/flink-json-2.0.0.jar
>>> /opt/flink/lib/stage/flink-json-2.0.0.jar
>>> COPY stage/flink-sql-parquet-2.0.0.jar
>>> /opt/flink/lib/flink-sql-parquet-2.0.02.jar
>>>
>>> # Paimon
>>> COPY stage/paimon-flink-2.0-1.2.0.jar
>>> /opt/flink/lib/paimon-flink-2.0-1.2.0.jar
>>>
>>> RUN chown -R flink:flink /opt/flink
>>>
>>> create_cat.sql
>>> -- Inbound from PostgreSQL via CDC
>>> CREATE CATALOG postgres_catalog WITH
>>> ('type'='generic_in_memory');
>>>
>>> Create_inboude_cdc.sql
>>> CREATE OR REPLACE TABLE postgres_catalog.inbound.children (
>>> id BIGINT -- This is a postgresql Serial generated field
>>> ,nationalid VARCHAR(14) -- NOT NULL
>>> ,data STRING -- JSONB Payload
>>> ,created_at TIMESTAMP_LTZ(3)
>>> ,WATERMARK FOR created_at AS created_at - INTERVAL '15' SECOND
>>> ,PRIMARY KEY (nationalid) NOT ENFORCED
>>> ) WITH (
>>> 'connector' = 'postgres-cdc'
>>> ,'hostname' = 'postgrescdc'
>>> ,'port' = '5432' -- NOTE: this is the port of the db on the container,
>>> not the external docker exported port via a port mapping.
>>> ,'username' = 'dbadmin'
>>> ,'password' = 'dbpassword'
>>> ,'database-name' = 'demog'
>>> ,'schema-name' = 'public'
>>> ,'table-name' = 'children'
>>> ,'slot.name' = 'children'
>>> ,'scan.incremental.snapshot.enabled' = 'false' -- experimental feature:
>>> incremental snapshot (default of
>>> ,'scan.startup.mode' = 'initial' --
>>> https://nightlies.apache.org/flink/flink-cdc-docs-release-3.1/docs/connectors/flink-sources/postgres-cdc/#startup-reading-position
>>> ,'decoding.plugin.name' = 'pgoutput'
>>> );
>>>
>>>
>>> *Flink SQL> select * from postgres_catalog.inbound.children;*
>>> [ERROR] Could not execute SQL statement. Reason:
>>> java.lang.ClassNotFoundException:
>>> org.apache.flink.streaming.api.functions.source.SourceFunction
>>> Flink SQL>
>>>
>>> G
>>> --
>>> You have the obligation to inform one honestly of the risk, and as a
>>> person
>>> you are committed to educate yourself to the total risk in any activity!
>>>
>>> Once informed & totally aware of the risk,
>>> every fool has the right to kill or injure themselves as they see fit!
>>>
>>
>
> --
> You have the obligation to inform one honestly of the risk, and as a person
> you are committed to educate yourself to the total risk in any activity!
>
> Once informed & totally aware of the risk,
> every fool has the right to kill or injure themselves as they see fit!
>


-- 
You have the obligation to inform one honestly of the risk, and as a person
you are committed to educate yourself to the total risk in any activity!

Once informed & totally aware of the risk,
every fool has the right to kill or injure themselves as they see fit!

Reply via email to