Anil Dasari created FLINK-36584:
-----------------------------------
Summary: PostgresIncrementalSource is not exiting the flink
execution when StartupOptions is snapshot and create multiple replication slots
Key: FLINK-36584
URL: https://issues.apache.org/jira/browse/FLINK-36584
Project: Flink
Issue Type: Bug
Components: Flink CDC
Affects Versions: 3.0.0
Reporter: Anil Dasari
Issue-1. PostgresIncrementalSource is not exiting the Flink execution when
StartupOptions is snapshot.
Postgres cdc module is using HybridSplitAssigner for batch scan and is
[https://github.com/apache/flink-cdc/blob/master/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/HybridSplitAssigner.java#L128]
still trying to create streaming split when snapshot splits are completed.
Issue-2. When source parallelism is > 1 i.e 2, PostgresIncrementalSource is
creating multiple replication slots.
postgres logs:
{code:java}
flink-postgres-1 | 2024-10-22 18:56:57.649 UTC [48] LOG: logical decoding
found consistent point at 0/1690B28
flink-postgres-1 | 2024-10-22 18:56:57.649 UTC [48] DETAIL: There are no
running transactions.
flink-postgres-1 | 2024-10-22 18:56:57.649 UTC [48] LOG: exported logical
decoding snapshot: "00000006-00000003-1" with 0 transaction IDs
flink-postgres-1 | 2024-10-22 18:56:58.226 UTC [51] LOG: logical decoding
found consistent point at 0/1690BF8
flink-postgres-1 | 2024-10-22 18:56:58.226 UTC [51] DETAIL: There are no
running transactions.
flink-postgres-1 | 2024-10-22 18:56:58.226 UTC [51] LOG: exported logical
decoding snapshot: "00000008-00000003-1" with 0 transaction IDs
flink-postgres-1 | 2024-10-22 18:56:58.266 UTC [52] LOG: logical decoding
found consistent point at 0/1690C30
flink-postgres-1 | 2024-10-22 18:56:58.266 UTC [52] DETAIL: There are no
running transactions.
flink-postgres-1 | 2024-10-22 18:56:58.267 UTC [52] LOG: exported logical
decoding snapshot: "00000009-00000003-1" with 0 transaction IDs
flink-postgres-1 | 2024-10-22 18:56:58.612 UTC [51] LOG: starting logical
decoding for slot "flinkpostgres_0"
flink-postgres-1 | 2024-10-22 18:56:58.612 UTC [51] DETAIL: Streaming
transactions committing after 0/1690C30, reading WAL from 0/1690BF8.
flink-postgres-1 | 2024-10-22 18:56:58.614 UTC [51] LOG: logical decoding
found consistent point at 0/1690BF8
flink-postgres-1 | 2024-10-22 18:56:58.614 UTC [51] DETAIL: There are no
running transactions.
flink-postgres-1 | 2024-10-22 18:56:58.753 UTC [56] ERROR: replication
slot "flinkpostgres_1" does not exist
flink-postgres-1 | 2024-10-22 18:56:58.753 UTC [56] STATEMENT: select
pg_drop_replication_slot('flinkpostgres_1')
flink-postgres-1 | 2024-10-22 18:56:59.347 UTC [57] LOG: starting logical
decoding for slot "flinkpostgres_0"
flink-postgres-1 | 2024-10-22 18:56:59.347 UTC [57] DETAIL: Streaming
transactions committing after 0/1690C30, reading WAL from 0/1690C30.
flink-postgres-1 | 2024-10-22 18:56:59.348 UTC [57] LOG: logical decoding
found consistent point at 0/1690C30
flink-postgres-1 | 2024-10-22 18:56:59.348 UTC [57] DETAIL: There are no
running transactions.
flink-postgres-1 | 2024-10-22 18:56:59.423 UTC [59] ERROR: replication
slot "flinkpostgres_0" does not exist
flink-postgres-1 | 2024-10-22 18:56:59.423 UTC [59] STATEMENT: select
pg_drop_replication_slot('flinkpostgres_0')
flink-postgres-1 | 2024-10-22 18:56:59.673 UTC [60] ERROR: replication
slot "flinkpostgres_0" does not exist
flink-postgres-1 | 2024-10-22 18:56:59.673 UTC [60] STATEMENT: select
pg_drop_replication_slot('flinkpostgres_0') {code}
Setup:
flink-cdc version : 3.2.0
flink version: 1.19
Steps to reproduce the issue:
1. main code:
{code:java}
DebeziumDeserializationSchema<String> deserializer =
new JsonDebeziumDeserializationSchema();
JdbcIncrementalSource<String> postgresIncrementalSource =
PostgresSourceBuilder.PostgresIncrementalSource.<String>builder()
.startupOptions(StartupOptions.snapshot())
.hostname("localhost")
.port(5432)
.database("test")
.schemaList("public")
.username("postgres")
.password("postgres")
.slotName("flinkpostgres")
.decodingPluginName("pgoutput")
.deserializer(deserializer)
// .splitSize(2)
.build();
Configuration config = new Configuration();
config.set(RestartStrategyOptions.RESTART_STRATEGY, "none");
config.setString("heartbeat.interval", "6000000"); // 100 minutes
config.setString("heartbeat.timeout", "18000000");
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment(config);
env.enableCheckpointing(300000);
env.fromSource(
postgresIncrementalSource,
WatermarkStrategy.noWatermarks(),
"PostgresParallelSource")
.setParallelism(2)
.print();
env.execute("Output Postgres Snapshot"); {code}
2. Create two tables and records in postgres
3. Run step#1 code.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)