Hi,

I have a stream reading from postgres-cdc connector version 3.1.0. I read
from two tables:

flink.cleaned_migrations
public.cleaned

I convert the tables into a datastream, do some processing, then write it
to a sink at the end of my stream:

    joined_table_result =
joined_with_metadata.execute_insert(daily_sink_property_map['flink_table_name'])

This works well, however I recently tried to add a second table which
contains state reached in the middle of my stream:

    continuous_metrics_table = table_env.execute_sql("SELECT f1, f2, f3
from joined_processed_table")
 
continuous_metrics_table.execute_insert(continuous_sink_property_map['flink_table_name'])

When I add this second sink, the postgres-cdc connector appears to add a
second reader from the replication log, but with the same slot name. It
seems to behave this way regardless of the sink connector I use, and seems
to happen in addition to the existing slot that is already allocated to the
stream.  This second reader of course cannot use the same replication slot,
and so the connector eventually times out.  Is this expected behavior from
the connector? It seems strange the connector would attempt to use a slot
twice.

I am using incremental snapshots, and I am passing a unique slot per table
connector.

Logs below:

2024-06-14 09:23:59,600 INFO
org.apache.flink.cdc.connectors.postgres.source.utils.TableDiscoveryUtils
[] - Postgres captured tables : flink.cleaned_migrations .

2024-06-14 09:23:59,603 INFO  io.debezium.jdbc.JdbcConnection
                [] - Connection gracefully closed

2024-06-14 09:24:00,198 INFO
org.apache.flink.cdc.connectors.postgres.source.utils.TableDiscoveryUtils
[] - Postgres captured tables : public.cleaned .

2024-06-14 09:24:00,199 INFO  io.debezium.jdbc.JdbcConnection
                [] - Connection gracefully closed

2024-06-14 09:24:00,224 INFO
io.debezium.connector.postgresql.PostgresSnapshotChangeEventSource
[] - Creating initial offset context

2024-06-14 09:24:00,417 INFO
io.debezium.connector.postgresql.PostgresSnapshotChangeEventSource
[] - Read xlogStart at 'LSN{6/C9806378}' from transaction '73559679'

2024-06-14 09:24:00,712 INFO  io.debezium.jdbc.JdbcConnection
                [] - Connection gracefully closed

2024-06-14 09:24:00,712 INFO
org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceReader
[] - Source reader 0 discovers table schema for stream split stream-split
success

2024-06-14 09:24:00,712 INFO
org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceReader
[] - Source reader 0 received the stream split :
StreamSplit{splitId='stream-split', offset=Offset{lsn=LSN{6/C98060F8},
txId=73559674, lastCommitTs=-9223372036854775808],
endOffset=Offset{lsn=LSN{FFFFFFFF/FFFFFFFF}, txId=null,
lastCommitTs=-9223372036853775810], isSuspended=false}.

2024-06-14 09:24:00,714 INFO
org.apache.flink.connector.base.source.reader.SourceReaderBase
[] - Adding split(s) to reader: [StreamSplit{splitId='stream-split',
offset=Offset{lsn=LSN{6/C98060F8}, txId=73559674,
lastCommitTs=-9223372036854775808],
endOffset=Offset{lsn=LSN{FFFFFFFF/FFFFFFFF}, txId=null,
lastCommitTs=-9223372036853775810], isSuspended=false}]

2024-06-14 09:24:00,714 INFO
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher
[] - Starting split fetcher 0

2024-06-14 09:24:00,716 INFO
org.apache.flink.cdc.connectors.base.source.enumerator.IncrementalSourceEnumerator
[] - The enumerator receives notice from subtask 0 for the stream split
assignment.

2024-06-14 09:24:00,721 INFO
org.apache.flink.cdc.connectors.postgres.source.fetch.PostgresSourceFetchTaskContext
[] - PostgresConnectorConfig is

2024-06-14 09:24:00,847 INFO
io.debezium.connector.postgresql.PostgresSnapshotChangeEventSource
[] - Creating initial offset context

2024-06-14 09:24:01,000 INFO
io.debezium.connector.postgresql.PostgresSnapshotChangeEventSource
[] - Read xlogStart at 'LSN{6/C9806430}' from transaction '73559682'

2024-06-14 09:24:01,270 INFO  io.debezium.jdbc.JdbcConnection
                [] - Connection gracefully closed

2024-06-14 09:24:01,271 INFO
org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceReader
[] - Source reader 0 discovers table schema for stream split stream-split
success

2024-06-14 09:24:01,271 INFO
org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceReader
[] - Source reader 0 received the stream split :
StreamSplit{splitId='stream-split', offset=Offset{lsn=LSN{6/C98061B0},
txId=73559676, lastCommitTs=-9223372036854775808],
endOffset=Offset{lsn=LSN{FFFFFFFF/FFFFFFFF}, txId=null,
lastCommitTs=-9223372036853775810], isSuspended=false}.

2024-06-14 09:24:01,272 INFO
org.apache.flink.connector.base.source.reader.SourceReaderBase
[] - Adding split(s) to reader: [StreamSplit{splitId='stream-split',
offset=Offset{lsn=LSN{6/C98061B0}, txId=73559676,
lastCommitTs=-9223372036854775808],
endOffset=Offset{lsn=LSN{FFFFFFFF/FFFFFFFF}, txId=null,
lastCommitTs=-9223372036853775810], isSuspended=false}]

2024-06-14 09:24:01,273 INFO
org.apache.flink.cdc.connectors.base.source.enumerator.IncrementalSourceEnumerator
[] - The enumerator receives notice from subtask 0 for the stream split
assignment.

2024-06-14 09:24:01,274 INFO
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher
[] - Starting split fetcher 0

2024-06-14 09:24:01,276 INFO
org.apache.flink.cdc.connectors.postgres.source.fetch.PostgresSourceFetchTaskContext
[] - PostgresConnectorConfig is

2024-06-14 09:24:02,150 INFO
io.debezium.connector.postgresql.PostgresObjectUtils
        [] - Creating a new replication connection for
io.debezium.connector.postgresql.PostgresTaskContext@23f6230

2024-06-14 09:24:02,176 INFO
org.apache.flink.cdc.connectors.postgres.source.fetch.PostgresStreamFetchTask$StreamSplitReadTask
[] - Execute StreamSplitReadTask for split:
StreamSplit{splitId='stream-split', offset=Offset{lsn=LSN{6/C98060F8},
txId=73559674, lastCommitTs=-9223372036854775808],
endOffset=Offset{lsn=LSN{FFFFFFFF/FFFFFFFF}, txId=null,
lastCommitTs=-9223372036853775810], isSuspended=false}

2024-06-14 09:24:02,176 INFO
io.debezium.connector.postgresql.PostgresStreamingChangeEventSource
[] - Retrieved latest position from stored offset 'LSN{6/C98060F8}'

2024-06-14 09:24:02,177 INFO
io.debezium.connector.postgresql.connection.WalPositionLocator
[] - Looking for WAL restart position for last commit LSN 'null' and last
change LSN 'LSN{6/C98060F8}'

2024-06-14 09:24:02,740 INFO
io.debezium.connector.postgresql.PostgresObjectUtils
        [] - Creating a new replication connection for
io.debezium.connector.postgresql.PostgresTaskContext@3d3d6b08

2024-06-14 09:24:02,742 INFO
org.apache.flink.cdc.connectors.postgres.source.fetch.PostgresStreamFetchTask$StreamSplitReadTask
[] - Execute StreamSplitReadTask for split:
StreamSplit{splitId='stream-split', offset=Offset{lsn=LSN{6/C98061B0},
txId=73559676, lastCommitTs=-9223372036854775808],
endOffset=Offset{lsn=LSN{FFFFFFFF/FFFFFFFF}, txId=null,
lastCommitTs=-9223372036853775810], isSuspended=false}

2024-06-14 09:24:02,742 INFO
io.debezium.connector.postgresql.PostgresStreamingChangeEventSource
[] - Retrieved latest position from stored offset 'LSN{6/C98061B0}'

2024-06-14 09:24:02,743 INFO
io.debezium.connector.postgresql.connection.WalPositionLocator
[] - Looking for WAL restart position for last commit LSN 'null' and last
change LSN 'LSN{6/C98061B0}'

2024-06-14 09:24:02,779 INFO
io.debezium.connector.postgresql.connection.PostgresConnection
[] - Obtained valid replication slot ReplicationSlot [active=true,
latestFlushedLsn=LSN{6/B8C36900}, catalogXmin=73461500]

2024-06-14 09:24:02,783 INFO  io.debezium.jdbc.JdbcConnection
                [] - Connection gracefully closed

2024-06-14 09:24:03,317 INFO
io.debezium.connector.postgresql.connection.PostgresConnection
[] - Obtained valid replication slot ReplicationSlot [active=true,
latestFlushedLsn=LSN{6/B8C36820}, catalogXmin=73461500]

2024-06-14 09:24:03,322 INFO  io.debezium.jdbc.JdbcConnection
                [] - Connection gracefully closed

2024-06-14 09:24:03,488 WARN
io.debezium.connector.postgresql.connection.PostgresReplicationConnection
[] - Failed to start replication stream at LSN{6/C98060F8}, waiting for
PT10S ms and retrying, attempt number 1 over 6

2024-06-14 09:24:04,017 WARN
io.debezium.connector.postgresql.connection.PostgresReplicationConnection
[] - Failed to start replication stream at LSN{6/C98061B0}, waiting for
PT10S ms and retrying, attempt number 1 over 6

2024-06-14 09:24:13,649 WARN
io.debezium.connector.postgresql.connection.PostgresReplicationConnection
[] - Failed to start replication stream at LSN{6/C98060F8}, waiting for
PT10S ms and retrying, attempt number 2 over 6

2024-06-14 09:24:14,167 WARN
io.debezium.connector.postgresql.connection.PostgresReplicationConnection
[] - Failed to start replication stream at LSN{6/C98061B0}, waiting for
PT10S ms and retrying, attempt number 2 over 6

2024-06-14 09:24:23,799 WARN
io.debezium.connector.postgresql.connection.PostgresReplicationConnection
[] - Failed to start replication stream at LSN{6/C98060F8}, waiting for
PT10S ms and retrying, attempt number 3 over 6

2024-06-14 09:24:24,320 WARN
io.debezium.connector.postgresql.connection.PostgresReplicationConnection
[] - Failed to start replication stream at LSN{6/C98061B0}, waiting for
PT10S ms and retrying, attempt number 3 over 6

2024-06-14 09:24:30,481 INFO
io.debezium.connector.postgresql.connection.WalPositionLocator
[] - First LSN 'LSN{6/C9807FD8}' received

2024-06-14 09:24:30,491 INFO
io.debezium.connector.postgresql.PostgresStreamingChangeEventSource
[] - WAL resume position 'LSN{6/C9807FD8}' discovered

2024-06-14 09:24:30,494 INFO  io.debezium.jdbc.JdbcConnection
                [] - Connection gracefully closed

2024-06-14 09:24:30,517 INFO
io.debezium.connector.postgresql.connection.WalPositionLocator
[] - First LSN 'LSN{6/C9807FD8}' received

2024-06-14 09:24:30,517 INFO
io.debezium.connector.postgresql.PostgresStreamingChangeEventSource
[] - WAL resume position 'LSN{6/C9807FD8}' discovered

2024-06-14 09:24:30,520 INFO  io.debezium.jdbc.JdbcConnection
                [] - Connection gracefully closed

2024-06-14 09:24:31,138 INFO  io.debezium.util.Threads
                [] - Requested thread factory for connector
PostgresConnector, id = postgres_cdc_source named = keep-alive

2024-06-14 09:24:31,138 INFO  io.debezium.util.Threads
                [] - Creating thread
debezium-postgresconnector-postgres_cdc_source-keep-alive

2024-06-14 09:24:31,139 INFO
io.debezium.connector.postgresql.PostgresStreamingChangeEventSource
[] - Processing messages

2024-06-14 09:24:31,169 INFO  io.debezium.util.Threads
                [] - Requested thread factory for connector
PostgresConnector, id = postgres_cdc_source named = keep-alive

2024-06-14 09:24:31,169 INFO  io.debezium.util.Threads
                [] - Creating thread
debezium-postgresconnector-postgres_cdc_source-keep-alive

2024-06-14 09:24:31,169 INFO
io.debezium.connector.postgresql.PostgresStreamingChangeEventSource
[] - Processing messages

2024-06-14 09:24:33,958 WARN
io.debezium.connector.postgresql.connection.PostgresReplicationConnection
[] - Failed to start replication stream at LSN{6/C98060F8}, waiting for
PT10S ms and retrying, attempt number 4 over 6

2024-06-14 09:24:34,513 WARN
io.debezium.connector.postgresql.connection.PostgresReplicationConnection
[] - Failed to start replication stream at LSN{6/C98061B0}, waiting for
PT10S ms and retrying, attempt number 4 over 6

2024-06-14 09:24:44,113 WARN
io.debezium.connector.postgresql.connection.PostgresReplicationConnection
[] - Failed to start replication stream at LSN{6/C98060F8}, waiting for
PT10S ms and retrying, attempt number 5 over 6

2024-06-14 09:24:44,668 WARN
io.debezium.connector.postgresql.connection.PostgresReplicationConnection
[] - Failed to start replication stream at LSN{6/C98061B0}, waiting for
PT10S ms and retrying, attempt number 5 over 6

2024-06-14 09:24:54,332 WARN
io.debezium.connector.postgresql.connection.PostgresReplicationConnection
[] - Failed to start replication stream at LSN{6/C98060F8}, waiting for
PT10S ms and retrying, attempt number 6 over 6

2024-06-14 09:24:54,822 WARN
io.debezium.connector.postgresql.connection.PostgresReplicationConnection
[] - Failed to start replication stream at LSN{6/C98061B0}, waiting for
PT10S ms and retrying, attempt number 6 over 6

2024-06-14 09:24:55,788 INFO
io.debezium.connector.postgresql.connection.WalPositionLocator
[] - Message with LSN 'LSN{6/C9807FD8}' arrived, switching off the filtering

2024-06-14 09:24:55,798 INFO
io.debezium.connector.postgresql.connection.WalPositionLocator
[] - Message with LSN 'LSN{6/C9807FD8}' arrived, switching off the filtering

Reply via email to