These sink share the same source.  The pipeline that works looks something
like this:

table1 -> process1 -> process2 -> sink2

When I change it to this:

table1 -> process1 -> process2 -> sink2
                             `--> sink1

I get the errors described, where it appears that a second process is
created that attempts to use the current slot twice.

On Mon, Jun 17, 2024 at 1:58 AM Hongshun Wang <loserwang1...@gmail.com>
wrote:

> Hi David,
> > When I add this second sink, the postgres-cdc connector appears to add a
> second reader from the replication log, but with the same slot name.
>
> I don't understand what you mean by adding a second sink. Do they share
> the same source, or does each have a separate pipeline? If the former one,
> you can share the same source for two sinks, in which case one replication
> slot is sufficient. If the later one, if you want each sink to have its own
> source, you can set a different slot name for each source (the option name
> is slot.name[1]).
>
> [1]
> https://nightlies.apache.org/flink/flink-cdc-docs-master/docs/connectors/flink-sources/postgres-cdc/#connector-options
>
> On Sat, Jun 15, 2024 at 12:40 AM David Bryson <d...@eplant.bio> wrote:
>
>> Hi,
>>
>> I have a stream reading from postgres-cdc connector version 3.1.0. I read
>> from two tables:
>>
>> flink.cleaned_migrations
>> public.cleaned
>>
>> I convert the tables into a datastream, do some processing, then write it
>> to a sink at the end of my stream:
>>
>>     joined_table_result =
>> joined_with_metadata.execute_insert(daily_sink_property_map['flink_table_name'])
>>
>> This works well, however I recently tried to add a second table which
>> contains state reached in the middle of my stream:
>>
>>     continuous_metrics_table = table_env.execute_sql("SELECT f1, f2, f3
>> from joined_processed_table")
>>
>>  
>> continuous_metrics_table.execute_insert(continuous_sink_property_map['flink_table_name'])
>>
>> When I add this second sink, the postgres-cdc connector appears to add a
>> second reader from the replication log, but with the same slot name. It
>> seems to behave this way regardless of the sink connector I use, and seems
>> to happen in addition to the existing slot that is already allocated to the
>> stream.  This second reader of course cannot use the same replication slot,
>> and so the connector eventually times out.  Is this expected behavior from
>> the connector? It seems strange the connector would attempt to use a slot
>> twice.
>>
>> I am using incremental snapshots, and I am passing a unique slot per
>> table connector.
>>
>> Logs below:
>>
>> 2024-06-14 09:23:59,600 INFO  
>> org.apache.flink.cdc.connectors.postgres.source.utils.TableDiscoveryUtils
>> [] - Postgres captured tables : flink.cleaned_migrations .
>>
>> 2024-06-14 09:23:59,603 INFO  io.debezium.jdbc.JdbcConnection
>>                   [] - Connection gracefully closed
>>
>> 2024-06-14 09:24:00,198 INFO  
>> org.apache.flink.cdc.connectors.postgres.source.utils.TableDiscoveryUtils
>> [] - Postgres captured tables : public.cleaned .
>>
>> 2024-06-14 09:24:00,199 INFO  io.debezium.jdbc.JdbcConnection
>>                   [] - Connection gracefully closed
>>
>> 2024-06-14 09:24:00,224 INFO  
>> io.debezium.connector.postgresql.PostgresSnapshotChangeEventSource
>> [] - Creating initial offset context
>>
>> 2024-06-14 09:24:00,417 INFO  
>> io.debezium.connector.postgresql.PostgresSnapshotChangeEventSource
>> [] - Read xlogStart at 'LSN{6/C9806378}' from transaction '73559679'
>>
>> 2024-06-14 09:24:00,712 INFO  io.debezium.jdbc.JdbcConnection
>>                   [] - Connection gracefully closed
>>
>> 2024-06-14 09:24:00,712 INFO  
>> org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceReader
>> [] - Source reader 0 discovers table schema for stream split stream-split
>> success
>>
>> 2024-06-14 09:24:00,712 INFO  
>> org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceReader
>> [] - Source reader 0 received the stream split :
>> StreamSplit{splitId='stream-split', offset=Offset{lsn=LSN{6/C98060F8},
>> txId=73559674, lastCommitTs=-9223372036854775808],
>> endOffset=Offset{lsn=LSN{FFFFFFFF/FFFFFFFF}, txId=null,
>> lastCommitTs=-9223372036853775810], isSuspended=false}.
>>
>> 2024-06-14 09:24:00,714 INFO  
>> org.apache.flink.connector.base.source.reader.SourceReaderBase
>> [] - Adding split(s) to reader: [StreamSplit{splitId='stream-split',
>> offset=Offset{lsn=LSN{6/C98060F8}, txId=73559674,
>> lastCommitTs=-9223372036854775808],
>> endOffset=Offset{lsn=LSN{FFFFFFFF/FFFFFFFF}, txId=null,
>> lastCommitTs=-9223372036853775810], isSuspended=false}]
>>
>> 2024-06-14 09:24:00,714 INFO  
>> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher
>> [] - Starting split fetcher 0
>>
>> 2024-06-14 09:24:00,716 INFO  
>> org.apache.flink.cdc.connectors.base.source.enumerator.IncrementalSourceEnumerator
>> [] - The enumerator receives notice from subtask 0 for the stream split
>> assignment.
>>
>> 2024-06-14 09:24:00,721 INFO  
>> org.apache.flink.cdc.connectors.postgres.source.fetch.PostgresSourceFetchTaskContext
>> [] - PostgresConnectorConfig is
>>
>> 2024-06-14 09:24:00,847 INFO  
>> io.debezium.connector.postgresql.PostgresSnapshotChangeEventSource
>> [] - Creating initial offset context
>>
>> 2024-06-14 09:24:01,000 INFO  
>> io.debezium.connector.postgresql.PostgresSnapshotChangeEventSource
>> [] - Read xlogStart at 'LSN{6/C9806430}' from transaction '73559682'
>>
>> 2024-06-14 09:24:01,270 INFO  io.debezium.jdbc.JdbcConnection
>>                   [] - Connection gracefully closed
>>
>> 2024-06-14 09:24:01,271 INFO  
>> org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceReader
>> [] - Source reader 0 discovers table schema for stream split stream-split
>> success
>>
>> 2024-06-14 09:24:01,271 INFO  
>> org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceReader
>> [] - Source reader 0 received the stream split :
>> StreamSplit{splitId='stream-split', offset=Offset{lsn=LSN{6/C98061B0},
>> txId=73559676, lastCommitTs=-9223372036854775808],
>> endOffset=Offset{lsn=LSN{FFFFFFFF/FFFFFFFF}, txId=null,
>> lastCommitTs=-9223372036853775810], isSuspended=false}.
>>
>> 2024-06-14 09:24:01,272 INFO  
>> org.apache.flink.connector.base.source.reader.SourceReaderBase
>> [] - Adding split(s) to reader: [StreamSplit{splitId='stream-split',
>> offset=Offset{lsn=LSN{6/C98061B0}, txId=73559676,
>> lastCommitTs=-9223372036854775808],
>> endOffset=Offset{lsn=LSN{FFFFFFFF/FFFFFFFF}, txId=null,
>> lastCommitTs=-9223372036853775810], isSuspended=false}]
>>
>> 2024-06-14 09:24:01,273 INFO  
>> org.apache.flink.cdc.connectors.base.source.enumerator.IncrementalSourceEnumerator
>> [] - The enumerator receives notice from subtask 0 for the stream split
>> assignment.
>>
>> 2024-06-14 09:24:01,274 INFO  
>> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher
>> [] - Starting split fetcher 0
>>
>> 2024-06-14 09:24:01,276 INFO  
>> org.apache.flink.cdc.connectors.postgres.source.fetch.PostgresSourceFetchTaskContext
>> [] - PostgresConnectorConfig is
>>
>> 2024-06-14 09:24:02,150 INFO  
>> io.debezium.connector.postgresql.PostgresObjectUtils
>>         [] - Creating a new replication connection for
>> io.debezium.connector.postgresql.PostgresTaskContext@23f6230
>>
>> 2024-06-14 09:24:02,176 INFO  
>> org.apache.flink.cdc.connectors.postgres.source.fetch.PostgresStreamFetchTask$StreamSplitReadTask
>> [] - Execute StreamSplitReadTask for split:
>> StreamSplit{splitId='stream-split', offset=Offset{lsn=LSN{6/C98060F8},
>> txId=73559674, lastCommitTs=-9223372036854775808],
>> endOffset=Offset{lsn=LSN{FFFFFFFF/FFFFFFFF}, txId=null,
>> lastCommitTs=-9223372036853775810], isSuspended=false}
>>
>> 2024-06-14 09:24:02,176 INFO  
>> io.debezium.connector.postgresql.PostgresStreamingChangeEventSource
>> [] - Retrieved latest position from stored offset 'LSN{6/C98060F8}'
>>
>> 2024-06-14 09:24:02,177 INFO  
>> io.debezium.connector.postgresql.connection.WalPositionLocator
>> [] - Looking for WAL restart position for last commit LSN 'null' and last
>> change LSN 'LSN{6/C98060F8}'
>>
>> 2024-06-14 09:24:02,740 INFO  
>> io.debezium.connector.postgresql.PostgresObjectUtils
>>         [] - Creating a new replication connection for
>> io.debezium.connector.postgresql.PostgresTaskContext@3d3d6b08
>>
>> 2024-06-14 09:24:02,742 INFO  
>> org.apache.flink.cdc.connectors.postgres.source.fetch.PostgresStreamFetchTask$StreamSplitReadTask
>> [] - Execute StreamSplitReadTask for split:
>> StreamSplit{splitId='stream-split', offset=Offset{lsn=LSN{6/C98061B0},
>> txId=73559676, lastCommitTs=-9223372036854775808],
>> endOffset=Offset{lsn=LSN{FFFFFFFF/FFFFFFFF}, txId=null,
>> lastCommitTs=-9223372036853775810], isSuspended=false}
>>
>> 2024-06-14 09:24:02,742 INFO  
>> io.debezium.connector.postgresql.PostgresStreamingChangeEventSource
>> [] - Retrieved latest position from stored offset 'LSN{6/C98061B0}'
>>
>> 2024-06-14 09:24:02,743 INFO  
>> io.debezium.connector.postgresql.connection.WalPositionLocator
>> [] - Looking for WAL restart position for last commit LSN 'null' and last
>> change LSN 'LSN{6/C98061B0}'
>>
>> 2024-06-14 09:24:02,779 INFO  
>> io.debezium.connector.postgresql.connection.PostgresConnection
>> [] - Obtained valid replication slot ReplicationSlot [active=true,
>> latestFlushedLsn=LSN{6/B8C36900}, catalogXmin=73461500]
>>
>> 2024-06-14 09:24:02,783 INFO  io.debezium.jdbc.JdbcConnection
>>                   [] - Connection gracefully closed
>>
>> 2024-06-14 09:24:03,317 INFO  
>> io.debezium.connector.postgresql.connection.PostgresConnection
>> [] - Obtained valid replication slot ReplicationSlot [active=true,
>> latestFlushedLsn=LSN{6/B8C36820}, catalogXmin=73461500]
>>
>> 2024-06-14 09:24:03,322 INFO  io.debezium.jdbc.JdbcConnection
>>                   [] - Connection gracefully closed
>>
>> 2024-06-14 09:24:03,488 WARN  
>> io.debezium.connector.postgresql.connection.PostgresReplicationConnection
>> [] - Failed to start replication stream at LSN{6/C98060F8}, waiting for
>> PT10S ms and retrying, attempt number 1 over 6
>>
>> 2024-06-14 09:24:04,017 WARN  
>> io.debezium.connector.postgresql.connection.PostgresReplicationConnection
>> [] - Failed to start replication stream at LSN{6/C98061B0}, waiting for
>> PT10S ms and retrying, attempt number 1 over 6
>>
>> 2024-06-14 09:24:13,649 WARN  
>> io.debezium.connector.postgresql.connection.PostgresReplicationConnection
>> [] - Failed to start replication stream at LSN{6/C98060F8}, waiting for
>> PT10S ms and retrying, attempt number 2 over 6
>>
>> 2024-06-14 09:24:14,167 WARN  
>> io.debezium.connector.postgresql.connection.PostgresReplicationConnection
>> [] - Failed to start replication stream at LSN{6/C98061B0}, waiting for
>> PT10S ms and retrying, attempt number 2 over 6
>>
>> 2024-06-14 09:24:23,799 WARN  
>> io.debezium.connector.postgresql.connection.PostgresReplicationConnection
>> [] - Failed to start replication stream at LSN{6/C98060F8}, waiting for
>> PT10S ms and retrying, attempt number 3 over 6
>>
>> 2024-06-14 09:24:24,320 WARN  
>> io.debezium.connector.postgresql.connection.PostgresReplicationConnection
>> [] - Failed to start replication stream at LSN{6/C98061B0}, waiting for
>> PT10S ms and retrying, attempt number 3 over 6
>>
>> 2024-06-14 09:24:30,481 INFO  
>> io.debezium.connector.postgresql.connection.WalPositionLocator
>> [] - First LSN 'LSN{6/C9807FD8}' received
>>
>> 2024-06-14 09:24:30,491 INFO  
>> io.debezium.connector.postgresql.PostgresStreamingChangeEventSource
>> [] - WAL resume position 'LSN{6/C9807FD8}' discovered
>>
>> 2024-06-14 09:24:30,494 INFO  io.debezium.jdbc.JdbcConnection
>>                   [] - Connection gracefully closed
>>
>> 2024-06-14 09:24:30,517 INFO  
>> io.debezium.connector.postgresql.connection.WalPositionLocator
>> [] - First LSN 'LSN{6/C9807FD8}' received
>>
>> 2024-06-14 09:24:30,517 INFO  
>> io.debezium.connector.postgresql.PostgresStreamingChangeEventSource
>> [] - WAL resume position 'LSN{6/C9807FD8}' discovered
>>
>> 2024-06-14 09:24:30,520 INFO  io.debezium.jdbc.JdbcConnection
>>                   [] - Connection gracefully closed
>>
>> 2024-06-14 09:24:31,138 INFO  io.debezium.util.Threads
>>                   [] - Requested thread factory for connector
>> PostgresConnector, id = postgres_cdc_source named = keep-alive
>>
>> 2024-06-14 09:24:31,138 INFO  io.debezium.util.Threads
>>                   [] - Creating thread
>> debezium-postgresconnector-postgres_cdc_source-keep-alive
>>
>> 2024-06-14 09:24:31,139 INFO  
>> io.debezium.connector.postgresql.PostgresStreamingChangeEventSource
>> [] - Processing messages
>>
>> 2024-06-14 09:24:31,169 INFO  io.debezium.util.Threads
>>                   [] - Requested thread factory for connector
>> PostgresConnector, id = postgres_cdc_source named = keep-alive
>>
>> 2024-06-14 09:24:31,169 INFO  io.debezium.util.Threads
>>                   [] - Creating thread
>> debezium-postgresconnector-postgres_cdc_source-keep-alive
>>
>> 2024-06-14 09:24:31,169 INFO  
>> io.debezium.connector.postgresql.PostgresStreamingChangeEventSource
>> [] - Processing messages
>>
>> 2024-06-14 09:24:33,958 WARN  
>> io.debezium.connector.postgresql.connection.PostgresReplicationConnection
>> [] - Failed to start replication stream at LSN{6/C98060F8}, waiting for
>> PT10S ms and retrying, attempt number 4 over 6
>>
>> 2024-06-14 09:24:34,513 WARN  
>> io.debezium.connector.postgresql.connection.PostgresReplicationConnection
>> [] - Failed to start replication stream at LSN{6/C98061B0}, waiting for
>> PT10S ms and retrying, attempt number 4 over 6
>>
>> 2024-06-14 09:24:44,113 WARN  
>> io.debezium.connector.postgresql.connection.PostgresReplicationConnection
>> [] - Failed to start replication stream at LSN{6/C98060F8}, waiting for
>> PT10S ms and retrying, attempt number 5 over 6
>>
>> 2024-06-14 09:24:44,668 WARN  
>> io.debezium.connector.postgresql.connection.PostgresReplicationConnection
>> [] - Failed to start replication stream at LSN{6/C98061B0}, waiting for
>> PT10S ms and retrying, attempt number 5 over 6
>>
>> 2024-06-14 09:24:54,332 WARN  
>> io.debezium.connector.postgresql.connection.PostgresReplicationConnection
>> [] - Failed to start replication stream at LSN{6/C98060F8}, waiting for
>> PT10S ms and retrying, attempt number 6 over 6
>>
>> 2024-06-14 09:24:54,822 WARN  
>> io.debezium.connector.postgresql.connection.PostgresReplicationConnection
>> [] - Failed to start replication stream at LSN{6/C98061B0}, waiting for
>> PT10S ms and retrying, attempt number 6 over 6
>>
>> 2024-06-14 09:24:55,788 INFO  
>> io.debezium.connector.postgresql.connection.WalPositionLocator
>> [] - Message with LSN 'LSN{6/C9807FD8}' arrived, switching off the filtering
>>
>> 2024-06-14 09:24:55,798 INFO  
>> io.debezium.connector.postgresql.connection.WalPositionLocator
>> [] - Message with LSN 'LSN{6/C9807FD8}' arrived, switching off the filtering
>>
>

Reply via email to