[
https://issues.apache.org/jira/browse/FLINK-37479?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17946395#comment-17946395
]
Hongshun Wang commented on FLINK-37479:
---------------------------------------
[~phamvinh1712] I have test it, you are right, let's push forwards it. My test
is here.
# Prepare a postgres(13+), create a partition table.
{code:java}
CREATE TABLE measurement (
city_id int not null,
logdate date not null
)PARTITION BY RANGE (logdate);CREATE TABLE measurement_y2006m02 PARTITION OF
measurement
FOR VALUES FROM ('2006-02-01') TO ('2006-03-01');CREATE TABLE
measurement_y2006m03 PARTITION OF measurement
FOR VALUES FROM ('2006-03-01') TO ('2006-04-01'); {code}
# create a publication with publish_via_partition_root in postgres
{code:java}
ALTER TABLE public.measurement
REPLICA IDENTITY FULL ;
CREATE PUBLICATION test_partition_publication
FOR TABLE public.measurement
WITH (
publish_via_partition_root=true
); {code}
# create a flink cdc job using this publication, and read latest data.
{code:java}
CREATE TEMPORARY TABLE pg_source (
city_id int not null,
logdate date not null
) WITH (
'connector' = 'postgres-cdc',
'hostname' = '<yourHostname>',
'port' = '5432',
'username' = '<yourUserName>',
'password' = '<yourPassWord>',
'database-name' = '<yourDatabaseName>',
'schema-name' = 'public',
'table-name' = 'measurement',
'scan.incremental.snapshot.enabled' = 'true',
'scan.startup.mode' = 'latest-offset',
'debezium.publication.autocreate.mode' = 'disabled',
'decoding.plugin.name'= 'pgoutput',
'slot.name' = 'test_partition',
);CREATE TEMPORARY TABLE print_sink (
city_id int not null,
logdate date not null
) WITH (
'connector' = 'print'
);
insert into print_sink select * from pg_source; {code}
# After job starts, insert data into source table
{code:java}
INSERT INTO measurement values (1, '2006-02-03'),(2, '2006-03-02'); {code}
# finally, we can see flink job prints:
{code:java}
+I[1, 2006-02-03]
+I[2, 2006-03-02] {code}
> postgres cdc connector support discover PARTITIONED TABLE
> ---------------------------------------------------------
>
> Key: FLINK-37479
> URL: https://issues.apache.org/jira/browse/FLINK-37479
> Project: Flink
> Issue Type: Improvement
> Components: Flink CDC
> Reporter: Vinh Pham
> Priority: Minor
>
> At the moment, Postgresql connector doesn't support discover PARTITIONED
> TABLE, making the connector not possible to take snapshot on partitioned
> table.
> [https://github.com/apache/flink-cdc/blob/master/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/utils/TableDiscoveryUtils.java#L41]
>
> This is inconsistent with the other implementation in PostgresConnection
> class
> [https://github.com/apache/flink-cdc/blob/master/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/io/debezium/connector/postgresql/connection/PostgresConnection.java#L859]
--
This message was sent by Atlassian Jira
(v8.20.10#820010)