Hi, I have a stream reading from postgres-cdc connector version 3.1.0. I read from two tables:
flink.cleaned_migrations public.cleaned I convert the tables into a datastream, do some processing, then write it to a sink at the end of my stream: joined_table_result = joined_with_metadata.execute_insert(daily_sink_property_map['flink_table_name']) This works well, however I recently tried to add a second table which contains state reached in the middle of my stream: continuous_metrics_table = table_env.execute_sql("SELECT f1, f2, f3 from joined_processed_table") continuous_metrics_table.execute_insert(continuous_sink_property_map['flink_table_name']) When I add this second sink, the postgres-cdc connector appears to add a second reader from the replication log, but with the same slot name. It seems to behave this way regardless of the sink connector I use, and seems to happen in addition to the existing slot that is already allocated to the stream. This second reader of course cannot use the same replication slot, and so the connector eventually times out. Is this expected behavior from the connector? It seems strange the connector would attempt to use a slot twice. I am using incremental snapshots, and I am passing a unique slot per table connector. Logs below: 2024-06-14 09:23:59,600 INFO org.apache.flink.cdc.connectors.postgres.source.utils.TableDiscoveryUtils [] - Postgres captured tables : flink.cleaned_migrations . 2024-06-14 09:23:59,603 INFO io.debezium.jdbc.JdbcConnection [] - Connection gracefully closed 2024-06-14 09:24:00,198 INFO org.apache.flink.cdc.connectors.postgres.source.utils.TableDiscoveryUtils [] - Postgres captured tables : public.cleaned . 2024-06-14 09:24:00,199 INFO io.debezium.jdbc.JdbcConnection [] - Connection gracefully closed 2024-06-14 09:24:00,224 INFO io.debezium.connector.postgresql.PostgresSnapshotChangeEventSource [] - Creating initial offset context 2024-06-14 09:24:00,417 INFO io.debezium.connector.postgresql.PostgresSnapshotChangeEventSource [] - Read xlogStart at 'LSN{6/C9806378}' from transaction '73559679' 2024-06-14 09:24:00,712 INFO io.debezium.jdbc.JdbcConnection [] - Connection gracefully closed 2024-06-14 09:24:00,712 INFO org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceReader [] - Source reader 0 discovers table schema for stream split stream-split success 2024-06-14 09:24:00,712 INFO org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceReader [] - Source reader 0 received the stream split : StreamSplit{splitId='stream-split', offset=Offset{lsn=LSN{6/C98060F8}, txId=73559674, lastCommitTs=-9223372036854775808], endOffset=Offset{lsn=LSN{FFFFFFFF/FFFFFFFF}, txId=null, lastCommitTs=-9223372036853775810], isSuspended=false}. 2024-06-14 09:24:00,714 INFO org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Adding split(s) to reader: [StreamSplit{splitId='stream-split', offset=Offset{lsn=LSN{6/C98060F8}, txId=73559674, lastCommitTs=-9223372036854775808], endOffset=Offset{lsn=LSN{FFFFFFFF/FFFFFFFF}, txId=null, lastCommitTs=-9223372036853775810], isSuspended=false}] 2024-06-14 09:24:00,714 INFO org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - Starting split fetcher 0 2024-06-14 09:24:00,716 INFO org.apache.flink.cdc.connectors.base.source.enumerator.IncrementalSourceEnumerator [] - The enumerator receives notice from subtask 0 for the stream split assignment. 2024-06-14 09:24:00,721 INFO org.apache.flink.cdc.connectors.postgres.source.fetch.PostgresSourceFetchTaskContext [] - PostgresConnectorConfig is 2024-06-14 09:24:00,847 INFO io.debezium.connector.postgresql.PostgresSnapshotChangeEventSource [] - Creating initial offset context 2024-06-14 09:24:01,000 INFO io.debezium.connector.postgresql.PostgresSnapshotChangeEventSource [] - Read xlogStart at 'LSN{6/C9806430}' from transaction '73559682' 2024-06-14 09:24:01,270 INFO io.debezium.jdbc.JdbcConnection [] - Connection gracefully closed 2024-06-14 09:24:01,271 INFO org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceReader [] - Source reader 0 discovers table schema for stream split stream-split success 2024-06-14 09:24:01,271 INFO org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceReader [] - Source reader 0 received the stream split : StreamSplit{splitId='stream-split', offset=Offset{lsn=LSN{6/C98061B0}, txId=73559676, lastCommitTs=-9223372036854775808], endOffset=Offset{lsn=LSN{FFFFFFFF/FFFFFFFF}, txId=null, lastCommitTs=-9223372036853775810], isSuspended=false}. 2024-06-14 09:24:01,272 INFO org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Adding split(s) to reader: [StreamSplit{splitId='stream-split', offset=Offset{lsn=LSN{6/C98061B0}, txId=73559676, lastCommitTs=-9223372036854775808], endOffset=Offset{lsn=LSN{FFFFFFFF/FFFFFFFF}, txId=null, lastCommitTs=-9223372036853775810], isSuspended=false}] 2024-06-14 09:24:01,273 INFO org.apache.flink.cdc.connectors.base.source.enumerator.IncrementalSourceEnumerator [] - The enumerator receives notice from subtask 0 for the stream split assignment. 2024-06-14 09:24:01,274 INFO org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - Starting split fetcher 0 2024-06-14 09:24:01,276 INFO org.apache.flink.cdc.connectors.postgres.source.fetch.PostgresSourceFetchTaskContext [] - PostgresConnectorConfig is 2024-06-14 09:24:02,150 INFO io.debezium.connector.postgresql.PostgresObjectUtils [] - Creating a new replication connection for io.debezium.connector.postgresql.PostgresTaskContext@23f6230 2024-06-14 09:24:02,176 INFO org.apache.flink.cdc.connectors.postgres.source.fetch.PostgresStreamFetchTask$StreamSplitReadTask [] - Execute StreamSplitReadTask for split: StreamSplit{splitId='stream-split', offset=Offset{lsn=LSN{6/C98060F8}, txId=73559674, lastCommitTs=-9223372036854775808], endOffset=Offset{lsn=LSN{FFFFFFFF/FFFFFFFF}, txId=null, lastCommitTs=-9223372036853775810], isSuspended=false} 2024-06-14 09:24:02,176 INFO io.debezium.connector.postgresql.PostgresStreamingChangeEventSource [] - Retrieved latest position from stored offset 'LSN{6/C98060F8}' 2024-06-14 09:24:02,177 INFO io.debezium.connector.postgresql.connection.WalPositionLocator [] - Looking for WAL restart position for last commit LSN 'null' and last change LSN 'LSN{6/C98060F8}' 2024-06-14 09:24:02,740 INFO io.debezium.connector.postgresql.PostgresObjectUtils [] - Creating a new replication connection for io.debezium.connector.postgresql.PostgresTaskContext@3d3d6b08 2024-06-14 09:24:02,742 INFO org.apache.flink.cdc.connectors.postgres.source.fetch.PostgresStreamFetchTask$StreamSplitReadTask [] - Execute StreamSplitReadTask for split: StreamSplit{splitId='stream-split', offset=Offset{lsn=LSN{6/C98061B0}, txId=73559676, lastCommitTs=-9223372036854775808], endOffset=Offset{lsn=LSN{FFFFFFFF/FFFFFFFF}, txId=null, lastCommitTs=-9223372036853775810], isSuspended=false} 2024-06-14 09:24:02,742 INFO io.debezium.connector.postgresql.PostgresStreamingChangeEventSource [] - Retrieved latest position from stored offset 'LSN{6/C98061B0}' 2024-06-14 09:24:02,743 INFO io.debezium.connector.postgresql.connection.WalPositionLocator [] - Looking for WAL restart position for last commit LSN 'null' and last change LSN 'LSN{6/C98061B0}' 2024-06-14 09:24:02,779 INFO io.debezium.connector.postgresql.connection.PostgresConnection [] - Obtained valid replication slot ReplicationSlot [active=true, latestFlushedLsn=LSN{6/B8C36900}, catalogXmin=73461500] 2024-06-14 09:24:02,783 INFO io.debezium.jdbc.JdbcConnection [] - Connection gracefully closed 2024-06-14 09:24:03,317 INFO io.debezium.connector.postgresql.connection.PostgresConnection [] - Obtained valid replication slot ReplicationSlot [active=true, latestFlushedLsn=LSN{6/B8C36820}, catalogXmin=73461500] 2024-06-14 09:24:03,322 INFO io.debezium.jdbc.JdbcConnection [] - Connection gracefully closed 2024-06-14 09:24:03,488 WARN io.debezium.connector.postgresql.connection.PostgresReplicationConnection [] - Failed to start replication stream at LSN{6/C98060F8}, waiting for PT10S ms and retrying, attempt number 1 over 6 2024-06-14 09:24:04,017 WARN io.debezium.connector.postgresql.connection.PostgresReplicationConnection [] - Failed to start replication stream at LSN{6/C98061B0}, waiting for PT10S ms and retrying, attempt number 1 over 6 2024-06-14 09:24:13,649 WARN io.debezium.connector.postgresql.connection.PostgresReplicationConnection [] - Failed to start replication stream at LSN{6/C98060F8}, waiting for PT10S ms and retrying, attempt number 2 over 6 2024-06-14 09:24:14,167 WARN io.debezium.connector.postgresql.connection.PostgresReplicationConnection [] - Failed to start replication stream at LSN{6/C98061B0}, waiting for PT10S ms and retrying, attempt number 2 over 6 2024-06-14 09:24:23,799 WARN io.debezium.connector.postgresql.connection.PostgresReplicationConnection [] - Failed to start replication stream at LSN{6/C98060F8}, waiting for PT10S ms and retrying, attempt number 3 over 6 2024-06-14 09:24:24,320 WARN io.debezium.connector.postgresql.connection.PostgresReplicationConnection [] - Failed to start replication stream at LSN{6/C98061B0}, waiting for PT10S ms and retrying, attempt number 3 over 6 2024-06-14 09:24:30,481 INFO io.debezium.connector.postgresql.connection.WalPositionLocator [] - First LSN 'LSN{6/C9807FD8}' received 2024-06-14 09:24:30,491 INFO io.debezium.connector.postgresql.PostgresStreamingChangeEventSource [] - WAL resume position 'LSN{6/C9807FD8}' discovered 2024-06-14 09:24:30,494 INFO io.debezium.jdbc.JdbcConnection [] - Connection gracefully closed 2024-06-14 09:24:30,517 INFO io.debezium.connector.postgresql.connection.WalPositionLocator [] - First LSN 'LSN{6/C9807FD8}' received 2024-06-14 09:24:30,517 INFO io.debezium.connector.postgresql.PostgresStreamingChangeEventSource [] - WAL resume position 'LSN{6/C9807FD8}' discovered 2024-06-14 09:24:30,520 INFO io.debezium.jdbc.JdbcConnection [] - Connection gracefully closed 2024-06-14 09:24:31,138 INFO io.debezium.util.Threads [] - Requested thread factory for connector PostgresConnector, id = postgres_cdc_source named = keep-alive 2024-06-14 09:24:31,138 INFO io.debezium.util.Threads [] - Creating thread debezium-postgresconnector-postgres_cdc_source-keep-alive 2024-06-14 09:24:31,139 INFO io.debezium.connector.postgresql.PostgresStreamingChangeEventSource [] - Processing messages 2024-06-14 09:24:31,169 INFO io.debezium.util.Threads [] - Requested thread factory for connector PostgresConnector, id = postgres_cdc_source named = keep-alive 2024-06-14 09:24:31,169 INFO io.debezium.util.Threads [] - Creating thread debezium-postgresconnector-postgres_cdc_source-keep-alive 2024-06-14 09:24:31,169 INFO io.debezium.connector.postgresql.PostgresStreamingChangeEventSource [] - Processing messages 2024-06-14 09:24:33,958 WARN io.debezium.connector.postgresql.connection.PostgresReplicationConnection [] - Failed to start replication stream at LSN{6/C98060F8}, waiting for PT10S ms and retrying, attempt number 4 over 6 2024-06-14 09:24:34,513 WARN io.debezium.connector.postgresql.connection.PostgresReplicationConnection [] - Failed to start replication stream at LSN{6/C98061B0}, waiting for PT10S ms and retrying, attempt number 4 over 6 2024-06-14 09:24:44,113 WARN io.debezium.connector.postgresql.connection.PostgresReplicationConnection [] - Failed to start replication stream at LSN{6/C98060F8}, waiting for PT10S ms and retrying, attempt number 5 over 6 2024-06-14 09:24:44,668 WARN io.debezium.connector.postgresql.connection.PostgresReplicationConnection [] - Failed to start replication stream at LSN{6/C98061B0}, waiting for PT10S ms and retrying, attempt number 5 over 6 2024-06-14 09:24:54,332 WARN io.debezium.connector.postgresql.connection.PostgresReplicationConnection [] - Failed to start replication stream at LSN{6/C98060F8}, waiting for PT10S ms and retrying, attempt number 6 over 6 2024-06-14 09:24:54,822 WARN io.debezium.connector.postgresql.connection.PostgresReplicationConnection [] - Failed to start replication stream at LSN{6/C98061B0}, waiting for PT10S ms and retrying, attempt number 6 over 6 2024-06-14 09:24:55,788 INFO io.debezium.connector.postgresql.connection.WalPositionLocator [] - Message with LSN 'LSN{6/C9807FD8}' arrived, switching off the filtering 2024-06-14 09:24:55,798 INFO io.debezium.connector.postgresql.connection.WalPositionLocator [] - Message with LSN 'LSN{6/C9807FD8}' arrived, switching off the filtering