Hi all / Hong. Slight contradiction picked up, currently Flink CDC 3.4 (Postgres) is not compatible with Flink 2.x https://nightlies.apache.org/flink/flink-cdc-docs-release-3.4/docs/connectors/flink-sources/overview/#supported-flink-versions
So the setting above wont make a difference. G On Thu, Sep 18, 2025 at 4:57 PM George <[email protected]> wrote: > Hi Wang > > As per previous, this is already how my CDC is configured. > any other ideas. > > here is my table create. > > CREATE OR REPLACE TABLE postgres_catalog.inbound.adults ( > id BIGINT -- This is a postgresql Serial generated field > ,nationalid VARCHAR(14) -- NOT NULL > ,data STRING -- JSONB Payload > ,created_at TIMESTAMP_LTZ(3) > ,WATERMARK FOR created_at AS created_at - INTERVAL '15' SECOND > ,PRIMARY KEY (nationalid) NOT ENFORCED > ) WITH ( > 'connector' = 'postgres-cdc' > ,'hostname' = 'postgrescdc' > ,'port' = '5432' -- NOTE: this is the port of the db on the container, > not the external docker exported port via a port mapping. > ,'username' = 'dbadmin' > ,'password' = 'dbpassword' > ,'database-name' = 'demog' > ,'schema-name' = 'public' > ,'table-name' = 'adults' > ,'slot.name' = 'adults0' > ,'scan.incremental.snapshot.enabled' = 'true' -- experimental feature: > incremental snapshot (default off) > ,'scan.startup.mode' = 'initial' -- > https://nightlies.apache.org/flink/flink-cdc-docs-release-3.1/docs/connectors/flink-sources/postgres-cdc/#startup-reading-position > ,'decoding.plugin.name' = 'pgoutput' > ); > > G > > On Thu, Sep 18, 2025 at 3:55 AM Hongshun Wang <[email protected]> > wrote: > >> Hi George, >> Please use ,'scan.incremental.snapshot.enabled' = 'true' . The old >> SouceFunction has been removed in flink 2.0. Otherwise, you can use flink >> 1.20. >> >> Best, >> Hongshun >> >> On Sun, Sep 14, 2025 at 11:48 PM George <[email protected]> wrote: >> >>> Hi all... >>> >>> Below is the jars included in my flink 2.0 build. and then the catalog >>> create and the query... If I drop flink to 1.20.2 and associated jars then >>> all works, but for 2.0 I'm a bit stuck... >>> >>> *Dockerfile* >>> RUN echo "--> Install JARs: Flink's S3 plugin" && \ >>> mkdir ./plugins/s3-fs-hadoop && \ >>> mv ./opt/flink-s3-fs-hadoop-2.0.0.jar ./plugins/s3-fs-hadoop/ >>> >>> RUN echo "--> Install Flink JARs: Generic" >>> COPY stage/bundle-2.31.9.jar /opt/flink/lib/bundle-2.31.9.jar >>> COPY stage/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar >>> /opt/flink/lib/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar >>> COPY stage/flink-sql-connector-postgres-cdc-3.4.0.jar >>> /opt/flink/lib/flink-sql-connector-postgres-cdc-3.4.0.jar >>> COPY stage/postgresql-42.7.6.jar /opt/flink/lib/postgresql-42.7.6.jar >>> >>> # Required by Debezium >>> COPY stage/flink-sql-json-2.0.0.jar >>> /opt/flink/lib/flink-sql-json-2.0.0.jar >>> COPY stage/flink-json-2.0.0.jar >>> /opt/flink/lib/stage/flink-json-2.0.0.jar >>> COPY stage/flink-sql-parquet-2.0.0.jar >>> /opt/flink/lib/flink-sql-parquet-2.0.02.jar >>> >>> # Paimon >>> COPY stage/paimon-flink-2.0-1.2.0.jar >>> /opt/flink/lib/paimon-flink-2.0-1.2.0.jar >>> >>> RUN chown -R flink:flink /opt/flink >>> >>> create_cat.sql >>> -- Inbound from PostgreSQL via CDC >>> CREATE CATALOG postgres_catalog WITH >>> ('type'='generic_in_memory'); >>> >>> Create_inboude_cdc.sql >>> CREATE OR REPLACE TABLE postgres_catalog.inbound.children ( >>> id BIGINT -- This is a postgresql Serial generated field >>> ,nationalid VARCHAR(14) -- NOT NULL >>> ,data STRING -- JSONB Payload >>> ,created_at TIMESTAMP_LTZ(3) >>> ,WATERMARK FOR created_at AS created_at - INTERVAL '15' SECOND >>> ,PRIMARY KEY (nationalid) NOT ENFORCED >>> ) WITH ( >>> 'connector' = 'postgres-cdc' >>> ,'hostname' = 'postgrescdc' >>> ,'port' = '5432' -- NOTE: this is the port of the db on the container, >>> not the external docker exported port via a port mapping. >>> ,'username' = 'dbadmin' >>> ,'password' = 'dbpassword' >>> ,'database-name' = 'demog' >>> ,'schema-name' = 'public' >>> ,'table-name' = 'children' >>> ,'slot.name' = 'children' >>> ,'scan.incremental.snapshot.enabled' = 'false' -- experimental feature: >>> incremental snapshot (default of >>> ,'scan.startup.mode' = 'initial' -- >>> https://nightlies.apache.org/flink/flink-cdc-docs-release-3.1/docs/connectors/flink-sources/postgres-cdc/#startup-reading-position >>> ,'decoding.plugin.name' = 'pgoutput' >>> ); >>> >>> >>> *Flink SQL> select * from postgres_catalog.inbound.children;* >>> [ERROR] Could not execute SQL statement. Reason: >>> java.lang.ClassNotFoundException: >>> org.apache.flink.streaming.api.functions.source.SourceFunction >>> Flink SQL> >>> >>> G >>> -- >>> You have the obligation to inform one honestly of the risk, and as a >>> person >>> you are committed to educate yourself to the total risk in any activity! >>> >>> Once informed & totally aware of the risk, >>> every fool has the right to kill or injure themselves as they see fit! >>> >> > > -- > You have the obligation to inform one honestly of the risk, and as a person > you are committed to educate yourself to the total risk in any activity! > > Once informed & totally aware of the risk, > every fool has the right to kill or injure themselves as they see fit! > -- You have the obligation to inform one honestly of the risk, and as a person you are committed to educate yourself to the total risk in any activity! Once informed & totally aware of the risk, every fool has the right to kill or injure themselves as they see fit!
