Hi Wang As per previous, this is already how my CDC is configured. any other ideas.
here is my table create. CREATE OR REPLACE TABLE postgres_catalog.inbound.adults ( id BIGINT -- This is a postgresql Serial generated field ,nationalid VARCHAR(14) -- NOT NULL ,data STRING -- JSONB Payload ,created_at TIMESTAMP_LTZ(3) ,WATERMARK FOR created_at AS created_at - INTERVAL '15' SECOND ,PRIMARY KEY (nationalid) NOT ENFORCED ) WITH ( 'connector' = 'postgres-cdc' ,'hostname' = 'postgrescdc' ,'port' = '5432' -- NOTE: this is the port of the db on the container, not the external docker exported port via a port mapping. ,'username' = 'dbadmin' ,'password' = 'dbpassword' ,'database-name' = 'demog' ,'schema-name' = 'public' ,'table-name' = 'adults' ,'slot.name' = 'adults0' ,'scan.incremental.snapshot.enabled' = 'true' -- experimental feature: incremental snapshot (default off) ,'scan.startup.mode' = 'initial' -- https://nightlies.apache.org/flink/flink-cdc-docs-release-3.1/docs/connectors/flink-sources/postgres-cdc/#startup-reading-position ,'decoding.plugin.name' = 'pgoutput' ); G On Thu, Sep 18, 2025 at 3:55 AM Hongshun Wang <[email protected]> wrote: > Hi George, > Please use ,'scan.incremental.snapshot.enabled' = 'true' . The old > SouceFunction has been removed in flink 2.0. Otherwise, you can use flink > 1.20. > > Best, > Hongshun > > On Sun, Sep 14, 2025 at 11:48 PM George <[email protected]> wrote: > >> Hi all... >> >> Below is the jars included in my flink 2.0 build. and then the catalog >> create and the query... If I drop flink to 1.20.2 and associated jars then >> all works, but for 2.0 I'm a bit stuck... >> >> *Dockerfile* >> RUN echo "--> Install JARs: Flink's S3 plugin" && \ >> mkdir ./plugins/s3-fs-hadoop && \ >> mv ./opt/flink-s3-fs-hadoop-2.0.0.jar ./plugins/s3-fs-hadoop/ >> >> RUN echo "--> Install Flink JARs: Generic" >> COPY stage/bundle-2.31.9.jar /opt/flink/lib/bundle-2.31.9.jar >> COPY stage/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar >> /opt/flink/lib/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar >> COPY stage/flink-sql-connector-postgres-cdc-3.4.0.jar >> /opt/flink/lib/flink-sql-connector-postgres-cdc-3.4.0.jar >> COPY stage/postgresql-42.7.6.jar /opt/flink/lib/postgresql-42.7.6.jar >> >> # Required by Debezium >> COPY stage/flink-sql-json-2.0.0.jar >> /opt/flink/lib/flink-sql-json-2.0.0.jar >> COPY stage/flink-json-2.0.0.jar /opt/flink/lib/stage/flink-json-2.0.0.jar >> COPY stage/flink-sql-parquet-2.0.0.jar >> /opt/flink/lib/flink-sql-parquet-2.0.02.jar >> >> # Paimon >> COPY stage/paimon-flink-2.0-1.2.0.jar >> /opt/flink/lib/paimon-flink-2.0-1.2.0.jar >> >> RUN chown -R flink:flink /opt/flink >> >> create_cat.sql >> -- Inbound from PostgreSQL via CDC >> CREATE CATALOG postgres_catalog WITH >> ('type'='generic_in_memory'); >> >> Create_inboude_cdc.sql >> CREATE OR REPLACE TABLE postgres_catalog.inbound.children ( >> id BIGINT -- This is a postgresql Serial generated field >> ,nationalid VARCHAR(14) -- NOT NULL >> ,data STRING -- JSONB Payload >> ,created_at TIMESTAMP_LTZ(3) >> ,WATERMARK FOR created_at AS created_at - INTERVAL '15' SECOND >> ,PRIMARY KEY (nationalid) NOT ENFORCED >> ) WITH ( >> 'connector' = 'postgres-cdc' >> ,'hostname' = 'postgrescdc' >> ,'port' = '5432' -- NOTE: this is the port of the db on the container, >> not the external docker exported port via a port mapping. >> ,'username' = 'dbadmin' >> ,'password' = 'dbpassword' >> ,'database-name' = 'demog' >> ,'schema-name' = 'public' >> ,'table-name' = 'children' >> ,'slot.name' = 'children' >> ,'scan.incremental.snapshot.enabled' = 'false' -- experimental feature: >> incremental snapshot (default of >> ,'scan.startup.mode' = 'initial' -- >> https://nightlies.apache.org/flink/flink-cdc-docs-release-3.1/docs/connectors/flink-sources/postgres-cdc/#startup-reading-position >> ,'decoding.plugin.name' = 'pgoutput' >> ); >> >> >> *Flink SQL> select * from postgres_catalog.inbound.children;* >> [ERROR] Could not execute SQL statement. Reason: >> java.lang.ClassNotFoundException: >> org.apache.flink.streaming.api.functions.source.SourceFunction >> Flink SQL> >> >> G >> -- >> You have the obligation to inform one honestly of the risk, and as a >> person >> you are committed to educate yourself to the total risk in any activity! >> >> Once informed & totally aware of the risk, >> every fool has the right to kill or injure themselves as they see fit! >> > -- You have the obligation to inform one honestly of the risk, and as a person you are committed to educate yourself to the total risk in any activity! Once informed & totally aware of the risk, every fool has the right to kill or injure themselves as they see fit!
