Hi,
I have a stream reading from postgres-cdc connector version 3.1.0. I read
from two tables:
flink.cleaned_migrations
public.cleaned
I convert the tables into a datastream, do some processing, then write it
to a sink at the end of my stream:
joined_table_result =
joined_with_metadata.execute_insert(daily_sink_property_map['flink_table_name'])
This works well, however I recently tried to add a second table which
contains state reached in the middle of my stream:
continuous_metrics_table = table_env.execute_sql("SELECT f1, f2, f3
from joined_processed_table")
continuous_metrics_table.execute_insert(continuous_sink_property_map['flink_table_name'])
When I add this second sink, the postgres-cdc connector appears to add a
second reader from the replication log, but with the same slot name. It
seems to behave this way regardless of the sink connector I use, and seems
to happen in addition to the existing slot that is already allocated to the
stream. This second reader of course cannot use the same replication slot,
and so the connector eventually times out. Is this expected behavior from
the connector? It seems strange the connector would attempt to use a slot
twice.
I am using incremental snapshots, and I am passing a unique slot per table
connector.
Logs below:
2024-06-14 09:23:59,600 INFO
org.apache.flink.cdc.connectors.postgres.source.utils.TableDiscoveryUtils
[] - Postgres captured tables : flink.cleaned_migrations .
2024-06-14 09:23:59,603 INFO io.debezium.jdbc.JdbcConnection
[] - Connection gracefully closed
2024-06-14 09:24:00,198 INFO
org.apache.flink.cdc.connectors.postgres.source.utils.TableDiscoveryUtils
[] - Postgres captured tables : public.cleaned .
2024-06-14 09:24:00,199 INFO io.debezium.jdbc.JdbcConnection
[] - Connection gracefully closed
2024-06-14 09:24:00,224 INFO
io.debezium.connector.postgresql.PostgresSnapshotChangeEventSource
[] - Creating initial offset context
2024-06-14 09:24:00,417 INFO
io.debezium.connector.postgresql.PostgresSnapshotChangeEventSource
[] - Read xlogStart at 'LSN{6/C9806378}' from transaction '73559679'
2024-06-14 09:24:00,712 INFO io.debezium.jdbc.JdbcConnection
[] - Connection gracefully closed
2024-06-14 09:24:00,712 INFO
org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceReader
[] - Source reader 0 discovers table schema for stream split stream-split
success
2024-06-14 09:24:00,712 INFO
org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceReader
[] - Source reader 0 received the stream split :
StreamSplit{splitId='stream-split', offset=Offset{lsn=LSN{6/C98060F8},
txId=73559674, lastCommitTs=-9223372036854775808],
endOffset=Offset{lsn=LSN{FFFFFFFF/FFFFFFFF}, txId=null,
lastCommitTs=-9223372036853775810], isSuspended=false}.
2024-06-14 09:24:00,714 INFO
org.apache.flink.connector.base.source.reader.SourceReaderBase
[] - Adding split(s) to reader: [StreamSplit{splitId='stream-split',
offset=Offset{lsn=LSN{6/C98060F8}, txId=73559674,
lastCommitTs=-9223372036854775808],
endOffset=Offset{lsn=LSN{FFFFFFFF/FFFFFFFF}, txId=null,
lastCommitTs=-9223372036853775810], isSuspended=false}]
2024-06-14 09:24:00,714 INFO
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher
[] - Starting split fetcher 0
2024-06-14 09:24:00,716 INFO
org.apache.flink.cdc.connectors.base.source.enumerator.IncrementalSourceEnumerator
[] - The enumerator receives notice from subtask 0 for the stream split
assignment.
2024-06-14 09:24:00,721 INFO
org.apache.flink.cdc.connectors.postgres.source.fetch.PostgresSourceFetchTaskContext
[] - PostgresConnectorConfig is
2024-06-14 09:24:00,847 INFO
io.debezium.connector.postgresql.PostgresSnapshotChangeEventSource
[] - Creating initial offset context
2024-06-14 09:24:01,000 INFO
io.debezium.connector.postgresql.PostgresSnapshotChangeEventSource
[] - Read xlogStart at 'LSN{6/C9806430}' from transaction '73559682'
2024-06-14 09:24:01,270 INFO io.debezium.jdbc.JdbcConnection
[] - Connection gracefully closed
2024-06-14 09:24:01,271 INFO
org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceReader
[] - Source reader 0 discovers table schema for stream split stream-split
success
2024-06-14 09:24:01,271 INFO
org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceReader
[] - Source reader 0 received the stream split :
StreamSplit{splitId='stream-split', offset=Offset{lsn=LSN{6/C98061B0},
txId=73559676, lastCommitTs=-9223372036854775808],
endOffset=Offset{lsn=LSN{FFFFFFFF/FFFFFFFF}, txId=null,
lastCommitTs=-9223372036853775810], isSuspended=false}.
2024-06-14 09:24:01,272 INFO
org.apache.flink.connector.base.source.reader.SourceReaderBase
[] - Adding split(s) to reader: [StreamSplit{splitId='stream-split',
offset=Offset{lsn=LSN{6/C98061B0}, txId=73559676,
lastCommitTs=-9223372036854775808],
endOffset=Offset{lsn=LSN{FFFFFFFF/FFFFFFFF}, txId=null,
lastCommitTs=-9223372036853775810], isSuspended=false}]
2024-06-14 09:24:01,273 INFO
org.apache.flink.cdc.connectors.base.source.enumerator.IncrementalSourceEnumerator
[] - The enumerator receives notice from subtask 0 for the stream split
assignment.
2024-06-14 09:24:01,274 INFO
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher
[] - Starting split fetcher 0
2024-06-14 09:24:01,276 INFO
org.apache.flink.cdc.connectors.postgres.source.fetch.PostgresSourceFetchTaskContext
[] - PostgresConnectorConfig is
2024-06-14 09:24:02,150 INFO
io.debezium.connector.postgresql.PostgresObjectUtils
[] - Creating a new replication connection for
io.debezium.connector.postgresql.PostgresTaskContext@23f6230
2024-06-14 09:24:02,176 INFO
org.apache.flink.cdc.connectors.postgres.source.fetch.PostgresStreamFetchTask$StreamSplitReadTask
[] - Execute StreamSplitReadTask for split:
StreamSplit{splitId='stream-split', offset=Offset{lsn=LSN{6/C98060F8},
txId=73559674, lastCommitTs=-9223372036854775808],
endOffset=Offset{lsn=LSN{FFFFFFFF/FFFFFFFF}, txId=null,
lastCommitTs=-9223372036853775810], isSuspended=false}
2024-06-14 09:24:02,176 INFO
io.debezium.connector.postgresql.PostgresStreamingChangeEventSource
[] - Retrieved latest position from stored offset 'LSN{6/C98060F8}'
2024-06-14 09:24:02,177 INFO
io.debezium.connector.postgresql.connection.WalPositionLocator
[] - Looking for WAL restart position for last commit LSN 'null' and last
change LSN 'LSN{6/C98060F8}'
2024-06-14 09:24:02,740 INFO
io.debezium.connector.postgresql.PostgresObjectUtils
[] - Creating a new replication connection for
io.debezium.connector.postgresql.PostgresTaskContext@3d3d6b08
2024-06-14 09:24:02,742 INFO
org.apache.flink.cdc.connectors.postgres.source.fetch.PostgresStreamFetchTask$StreamSplitReadTask
[] - Execute StreamSplitReadTask for split:
StreamSplit{splitId='stream-split', offset=Offset{lsn=LSN{6/C98061B0},
txId=73559676, lastCommitTs=-9223372036854775808],
endOffset=Offset{lsn=LSN{FFFFFFFF/FFFFFFFF}, txId=null,
lastCommitTs=-9223372036853775810], isSuspended=false}
2024-06-14 09:24:02,742 INFO
io.debezium.connector.postgresql.PostgresStreamingChangeEventSource
[] - Retrieved latest position from stored offset 'LSN{6/C98061B0}'
2024-06-14 09:24:02,743 INFO
io.debezium.connector.postgresql.connection.WalPositionLocator
[] - Looking for WAL restart position for last commit LSN 'null' and last
change LSN 'LSN{6/C98061B0}'
2024-06-14 09:24:02,779 INFO
io.debezium.connector.postgresql.connection.PostgresConnection
[] - Obtained valid replication slot ReplicationSlot [active=true,
latestFlushedLsn=LSN{6/B8C36900}, catalogXmin=73461500]
2024-06-14 09:24:02,783 INFO io.debezium.jdbc.JdbcConnection
[] - Connection gracefully closed
2024-06-14 09:24:03,317 INFO
io.debezium.connector.postgresql.connection.PostgresConnection
[] - Obtained valid replication slot ReplicationSlot [active=true,
latestFlushedLsn=LSN{6/B8C36820}, catalogXmin=73461500]
2024-06-14 09:24:03,322 INFO io.debezium.jdbc.JdbcConnection
[] - Connection gracefully closed
2024-06-14 09:24:03,488 WARN
io.debezium.connector.postgresql.connection.PostgresReplicationConnection
[] - Failed to start replication stream at LSN{6/C98060F8}, waiting for
PT10S ms and retrying, attempt number 1 over 6
2024-06-14 09:24:04,017 WARN
io.debezium.connector.postgresql.connection.PostgresReplicationConnection
[] - Failed to start replication stream at LSN{6/C98061B0}, waiting for
PT10S ms and retrying, attempt number 1 over 6
2024-06-14 09:24:13,649 WARN
io.debezium.connector.postgresql.connection.PostgresReplicationConnection
[] - Failed to start replication stream at LSN{6/C98060F8}, waiting for
PT10S ms and retrying, attempt number 2 over 6
2024-06-14 09:24:14,167 WARN
io.debezium.connector.postgresql.connection.PostgresReplicationConnection
[] - Failed to start replication stream at LSN{6/C98061B0}, waiting for
PT10S ms and retrying, attempt number 2 over 6
2024-06-14 09:24:23,799 WARN
io.debezium.connector.postgresql.connection.PostgresReplicationConnection
[] - Failed to start replication stream at LSN{6/C98060F8}, waiting for
PT10S ms and retrying, attempt number 3 over 6
2024-06-14 09:24:24,320 WARN
io.debezium.connector.postgresql.connection.PostgresReplicationConnection
[] - Failed to start replication stream at LSN{6/C98061B0}, waiting for
PT10S ms and retrying, attempt number 3 over 6
2024-06-14 09:24:30,481 INFO
io.debezium.connector.postgresql.connection.WalPositionLocator
[] - First LSN 'LSN{6/C9807FD8}' received
2024-06-14 09:24:30,491 INFO
io.debezium.connector.postgresql.PostgresStreamingChangeEventSource
[] - WAL resume position 'LSN{6/C9807FD8}' discovered
2024-06-14 09:24:30,494 INFO io.debezium.jdbc.JdbcConnection
[] - Connection gracefully closed
2024-06-14 09:24:30,517 INFO
io.debezium.connector.postgresql.connection.WalPositionLocator
[] - First LSN 'LSN{6/C9807FD8}' received
2024-06-14 09:24:30,517 INFO
io.debezium.connector.postgresql.PostgresStreamingChangeEventSource
[] - WAL resume position 'LSN{6/C9807FD8}' discovered
2024-06-14 09:24:30,520 INFO io.debezium.jdbc.JdbcConnection
[] - Connection gracefully closed
2024-06-14 09:24:31,138 INFO io.debezium.util.Threads
[] - Requested thread factory for connector
PostgresConnector, id = postgres_cdc_source named = keep-alive
2024-06-14 09:24:31,138 INFO io.debezium.util.Threads
[] - Creating thread
debezium-postgresconnector-postgres_cdc_source-keep-alive
2024-06-14 09:24:31,139 INFO
io.debezium.connector.postgresql.PostgresStreamingChangeEventSource
[] - Processing messages
2024-06-14 09:24:31,169 INFO io.debezium.util.Threads
[] - Requested thread factory for connector
PostgresConnector, id = postgres_cdc_source named = keep-alive
2024-06-14 09:24:31,169 INFO io.debezium.util.Threads
[] - Creating thread
debezium-postgresconnector-postgres_cdc_source-keep-alive
2024-06-14 09:24:31,169 INFO
io.debezium.connector.postgresql.PostgresStreamingChangeEventSource
[] - Processing messages
2024-06-14 09:24:33,958 WARN
io.debezium.connector.postgresql.connection.PostgresReplicationConnection
[] - Failed to start replication stream at LSN{6/C98060F8}, waiting for
PT10S ms and retrying, attempt number 4 over 6
2024-06-14 09:24:34,513 WARN
io.debezium.connector.postgresql.connection.PostgresReplicationConnection
[] - Failed to start replication stream at LSN{6/C98061B0}, waiting for
PT10S ms and retrying, attempt number 4 over 6
2024-06-14 09:24:44,113 WARN
io.debezium.connector.postgresql.connection.PostgresReplicationConnection
[] - Failed to start replication stream at LSN{6/C98060F8}, waiting for
PT10S ms and retrying, attempt number 5 over 6
2024-06-14 09:24:44,668 WARN
io.debezium.connector.postgresql.connection.PostgresReplicationConnection
[] - Failed to start replication stream at LSN{6/C98061B0}, waiting for
PT10S ms and retrying, attempt number 5 over 6
2024-06-14 09:24:54,332 WARN
io.debezium.connector.postgresql.connection.PostgresReplicationConnection
[] - Failed to start replication stream at LSN{6/C98060F8}, waiting for
PT10S ms and retrying, attempt number 6 over 6
2024-06-14 09:24:54,822 WARN
io.debezium.connector.postgresql.connection.PostgresReplicationConnection
[] - Failed to start replication stream at LSN{6/C98061B0}, waiting for
PT10S ms and retrying, attempt number 6 over 6
2024-06-14 09:24:55,788 INFO
io.debezium.connector.postgresql.connection.WalPositionLocator
[] - Message with LSN 'LSN{6/C9807FD8}' arrived, switching off the filtering
2024-06-14 09:24:55,798 INFO
io.debezium.connector.postgresql.connection.WalPositionLocator
[] - Message with LSN 'LSN{6/C9807FD8}' arrived, switching off the filtering