Hi David,
In your modified pipeline, just one source from table1 is sufficient, with
both sink1 and process2 sharing a single source from process1. However,
based on your log, it appears that two sources have been generated. Do you
have the execution graph available in the Flink UI?

Best,
Hongshun

On Mon, Jun 17, 2024 at 11:40 PM David Bryson <d...@eplant.bio> wrote:

> These sink share the same source.  The pipeline that works looks something
> like this:
>
> table1 -> process1 -> process2 -> sink2
>
> When I change it to this:
>
> table1 -> process1 -> process2 -> sink2
>                              `--> sink1
>
> I get the errors described, where it appears that a second process is
> created that attempts to use the current slot twice.
>
> On Mon, Jun 17, 2024 at 1:58 AM Hongshun Wang <loserwang1...@gmail.com>
> wrote:
>
>> Hi David,
>> > When I add this second sink, the postgres-cdc connector appears to add
>> a second reader from the replication log, but with the same slot name.
>>
>> I don't understand what you mean by adding a second sink. Do they share
>> the same source, or does each have a separate pipeline? If the former one,
>> you can share the same source for two sinks, in which case one replication
>> slot is sufficient. If the later one, if you want each sink to have its own
>> source, you can set a different slot name for each source (the option name
>> is slot.name[1]).
>>
>> [1]
>> https://nightlies.apache.org/flink/flink-cdc-docs-master/docs/connectors/flink-sources/postgres-cdc/#connector-options
>>
>> On Sat, Jun 15, 2024 at 12:40 AM David Bryson <d...@eplant.bio> wrote:
>>
>>> Hi,
>>>
>>> I have a stream reading from postgres-cdc connector version 3.1.0. I
>>> read from two tables:
>>>
>>> flink.cleaned_migrations
>>> public.cleaned
>>>
>>> I convert the tables into a datastream, do some processing, then write
>>> it to a sink at the end of my stream:
>>>
>>>     joined_table_result =
>>> joined_with_metadata.execute_insert(daily_sink_property_map['flink_table_name'])
>>>
>>> This works well, however I recently tried to add a second table which
>>> contains state reached in the middle of my stream:
>>>
>>>     continuous_metrics_table = table_env.execute_sql("SELECT f1, f2, f3
>>> from joined_processed_table")
>>>
>>>  
>>> continuous_metrics_table.execute_insert(continuous_sink_property_map['flink_table_name'])
>>>
>>> When I add this second sink, the postgres-cdc connector appears to add a
>>> second reader from the replication log, but with the same slot name. It
>>> seems to behave this way regardless of the sink connector I use, and seems
>>> to happen in addition to the existing slot that is already allocated to the
>>> stream.  This second reader of course cannot use the same replication slot,
>>> and so the connector eventually times out.  Is this expected behavior from
>>> the connector? It seems strange the connector would attempt to use a slot
>>> twice.
>>>
>>> I am using incremental snapshots, and I am passing a unique slot per
>>> table connector.
>>>
>>> Logs below:
>>>
>>> 2024-06-14 09:23:59,600 INFO  
>>> org.apache.flink.cdc.connectors.postgres.source.utils.TableDiscoveryUtils
>>> [] - Postgres captured tables : flink.cleaned_migrations .
>>>
>>> 2024-06-14 09:23:59,603 INFO  io.debezium.jdbc.JdbcConnection
>>>                     [] - Connection gracefully closed
>>>
>>> 2024-06-14 09:24:00,198 INFO  
>>> org.apache.flink.cdc.connectors.postgres.source.utils.TableDiscoveryUtils
>>> [] - Postgres captured tables : public.cleaned .
>>>
>>> 2024-06-14 09:24:00,199 INFO  io.debezium.jdbc.JdbcConnection
>>>                     [] - Connection gracefully closed
>>>
>>> 2024-06-14 09:24:00,224 INFO  
>>> io.debezium.connector.postgresql.PostgresSnapshotChangeEventSource
>>> [] - Creating initial offset context
>>>
>>> 2024-06-14 09:24:00,417 INFO  
>>> io.debezium.connector.postgresql.PostgresSnapshotChangeEventSource
>>> [] - Read xlogStart at 'LSN{6/C9806378}' from transaction '73559679'
>>>
>>> 2024-06-14 09:24:00,712 INFO  io.debezium.jdbc.JdbcConnection
>>>                     [] - Connection gracefully closed
>>>
>>> 2024-06-14 09:24:00,712 INFO  
>>> org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceReader
>>> [] - Source reader 0 discovers table schema for stream split stream-split
>>> success
>>>
>>> 2024-06-14 09:24:00,712 INFO  
>>> org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceReader
>>> [] - Source reader 0 received the stream split :
>>> StreamSplit{splitId='stream-split', offset=Offset{lsn=LSN{6/C98060F8},
>>> txId=73559674, lastCommitTs=-9223372036854775808],
>>> endOffset=Offset{lsn=LSN{FFFFFFFF/FFFFFFFF}, txId=null,
>>> lastCommitTs=-9223372036853775810], isSuspended=false}.
>>>
>>> 2024-06-14 09:24:00,714 INFO  
>>> org.apache.flink.connector.base.source.reader.SourceReaderBase
>>> [] - Adding split(s) to reader: [StreamSplit{splitId='stream-split',
>>> offset=Offset{lsn=LSN{6/C98060F8}, txId=73559674,
>>> lastCommitTs=-9223372036854775808],
>>> endOffset=Offset{lsn=LSN{FFFFFFFF/FFFFFFFF}, txId=null,
>>> lastCommitTs=-9223372036853775810], isSuspended=false}]
>>>
>>> 2024-06-14 09:24:00,714 INFO  
>>> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher
>>> [] - Starting split fetcher 0
>>>
>>> 2024-06-14 09:24:00,716 INFO  
>>> org.apache.flink.cdc.connectors.base.source.enumerator.IncrementalSourceEnumerator
>>> [] - The enumerator receives notice from subtask 0 for the stream split
>>> assignment.
>>>
>>> 2024-06-14 09:24:00,721 INFO  
>>> org.apache.flink.cdc.connectors.postgres.source.fetch.PostgresSourceFetchTaskContext
>>> [] - PostgresConnectorConfig is
>>>
>>> 2024-06-14 09:24:00,847 INFO  
>>> io.debezium.connector.postgresql.PostgresSnapshotChangeEventSource
>>> [] - Creating initial offset context
>>>
>>> 2024-06-14 09:24:01,000 INFO  
>>> io.debezium.connector.postgresql.PostgresSnapshotChangeEventSource
>>> [] - Read xlogStart at 'LSN{6/C9806430}' from transaction '73559682'
>>>
>>> 2024-06-14 09:24:01,270 INFO  io.debezium.jdbc.JdbcConnection
>>>                     [] - Connection gracefully closed
>>>
>>> 2024-06-14 09:24:01,271 INFO  
>>> org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceReader
>>> [] - Source reader 0 discovers table schema for stream split stream-split
>>> success
>>>
>>> 2024-06-14 09:24:01,271 INFO  
>>> org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceReader
>>> [] - Source reader 0 received the stream split :
>>> StreamSplit{splitId='stream-split', offset=Offset{lsn=LSN{6/C98061B0},
>>> txId=73559676, lastCommitTs=-9223372036854775808],
>>> endOffset=Offset{lsn=LSN{FFFFFFFF/FFFFFFFF}, txId=null,
>>> lastCommitTs=-9223372036853775810], isSuspended=false}.
>>>
>>> 2024-06-14 09:24:01,272 INFO  
>>> org.apache.flink.connector.base.source.reader.SourceReaderBase
>>> [] - Adding split(s) to reader: [StreamSplit{splitId='stream-split',
>>> offset=Offset{lsn=LSN{6/C98061B0}, txId=73559676,
>>> lastCommitTs=-9223372036854775808],
>>> endOffset=Offset{lsn=LSN{FFFFFFFF/FFFFFFFF}, txId=null,
>>> lastCommitTs=-9223372036853775810], isSuspended=false}]
>>>
>>> 2024-06-14 09:24:01,273 INFO  
>>> org.apache.flink.cdc.connectors.base.source.enumerator.IncrementalSourceEnumerator
>>> [] - The enumerator receives notice from subtask 0 for the stream split
>>> assignment.
>>>
>>> 2024-06-14 09:24:01,274 INFO  
>>> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher
>>> [] - Starting split fetcher 0
>>>
>>> 2024-06-14 09:24:01,276 INFO  
>>> org.apache.flink.cdc.connectors.postgres.source.fetch.PostgresSourceFetchTaskContext
>>> [] - PostgresConnectorConfig is
>>>
>>> 2024-06-14 09:24:02,150 INFO  
>>> io.debezium.connector.postgresql.PostgresObjectUtils
>>>         [] - Creating a new replication connection for
>>> io.debezium.connector.postgresql.PostgresTaskContext@23f6230
>>>
>>> 2024-06-14 09:24:02,176 INFO  
>>> org.apache.flink.cdc.connectors.postgres.source.fetch.PostgresStreamFetchTask$StreamSplitReadTask
>>> [] - Execute StreamSplitReadTask for split:
>>> StreamSplit{splitId='stream-split', offset=Offset{lsn=LSN{6/C98060F8},
>>> txId=73559674, lastCommitTs=-9223372036854775808],
>>> endOffset=Offset{lsn=LSN{FFFFFFFF/FFFFFFFF}, txId=null,
>>> lastCommitTs=-9223372036853775810], isSuspended=false}
>>>
>>> 2024-06-14 09:24:02,176 INFO  
>>> io.debezium.connector.postgresql.PostgresStreamingChangeEventSource
>>> [] - Retrieved latest position from stored offset 'LSN{6/C98060F8}'
>>>
>>> 2024-06-14 09:24:02,177 INFO  
>>> io.debezium.connector.postgresql.connection.WalPositionLocator
>>> [] - Looking for WAL restart position for last commit LSN 'null' and last
>>> change LSN 'LSN{6/C98060F8}'
>>>
>>> 2024-06-14 09:24:02,740 INFO  
>>> io.debezium.connector.postgresql.PostgresObjectUtils
>>>         [] - Creating a new replication connection for
>>> io.debezium.connector.postgresql.PostgresTaskContext@3d3d6b08
>>>
>>> 2024-06-14 09:24:02,742 INFO  
>>> org.apache.flink.cdc.connectors.postgres.source.fetch.PostgresStreamFetchTask$StreamSplitReadTask
>>> [] - Execute StreamSplitReadTask for split:
>>> StreamSplit{splitId='stream-split', offset=Offset{lsn=LSN{6/C98061B0},
>>> txId=73559676, lastCommitTs=-9223372036854775808],
>>> endOffset=Offset{lsn=LSN{FFFFFFFF/FFFFFFFF}, txId=null,
>>> lastCommitTs=-9223372036853775810], isSuspended=false}
>>>
>>> 2024-06-14 09:24:02,742 INFO  
>>> io.debezium.connector.postgresql.PostgresStreamingChangeEventSource
>>> [] - Retrieved latest position from stored offset 'LSN{6/C98061B0}'
>>>
>>> 2024-06-14 09:24:02,743 INFO  
>>> io.debezium.connector.postgresql.connection.WalPositionLocator
>>> [] - Looking for WAL restart position for last commit LSN 'null' and last
>>> change LSN 'LSN{6/C98061B0}'
>>>
>>> 2024-06-14 09:24:02,779 INFO  
>>> io.debezium.connector.postgresql.connection.PostgresConnection
>>> [] - Obtained valid replication slot ReplicationSlot [active=true,
>>> latestFlushedLsn=LSN{6/B8C36900}, catalogXmin=73461500]
>>>
>>> 2024-06-14 09:24:02,783 INFO  io.debezium.jdbc.JdbcConnection
>>>                     [] - Connection gracefully closed
>>>
>>> 2024-06-14 09:24:03,317 INFO  
>>> io.debezium.connector.postgresql.connection.PostgresConnection
>>> [] - Obtained valid replication slot ReplicationSlot [active=true,
>>> latestFlushedLsn=LSN{6/B8C36820}, catalogXmin=73461500]
>>>
>>> 2024-06-14 09:24:03,322 INFO  io.debezium.jdbc.JdbcConnection
>>>                     [] - Connection gracefully closed
>>>
>>> 2024-06-14 09:24:03,488 WARN  
>>> io.debezium.connector.postgresql.connection.PostgresReplicationConnection
>>> [] - Failed to start replication stream at LSN{6/C98060F8}, waiting for
>>> PT10S ms and retrying, attempt number 1 over 6
>>>
>>> 2024-06-14 09:24:04,017 WARN  
>>> io.debezium.connector.postgresql.connection.PostgresReplicationConnection
>>> [] - Failed to start replication stream at LSN{6/C98061B0}, waiting for
>>> PT10S ms and retrying, attempt number 1 over 6
>>>
>>> 2024-06-14 09:24:13,649 WARN  
>>> io.debezium.connector.postgresql.connection.PostgresReplicationConnection
>>> [] - Failed to start replication stream at LSN{6/C98060F8}, waiting for
>>> PT10S ms and retrying, attempt number 2 over 6
>>>
>>> 2024-06-14 09:24:14,167 WARN  
>>> io.debezium.connector.postgresql.connection.PostgresReplicationConnection
>>> [] - Failed to start replication stream at LSN{6/C98061B0}, waiting for
>>> PT10S ms and retrying, attempt number 2 over 6
>>>
>>> 2024-06-14 09:24:23,799 WARN  
>>> io.debezium.connector.postgresql.connection.PostgresReplicationConnection
>>> [] - Failed to start replication stream at LSN{6/C98060F8}, waiting for
>>> PT10S ms and retrying, attempt number 3 over 6
>>>
>>> 2024-06-14 09:24:24,320 WARN  
>>> io.debezium.connector.postgresql.connection.PostgresReplicationConnection
>>> [] - Failed to start replication stream at LSN{6/C98061B0}, waiting for
>>> PT10S ms and retrying, attempt number 3 over 6
>>>
>>> 2024-06-14 09:24:30,481 INFO  
>>> io.debezium.connector.postgresql.connection.WalPositionLocator
>>> [] - First LSN 'LSN{6/C9807FD8}' received
>>>
>>> 2024-06-14 09:24:30,491 INFO  
>>> io.debezium.connector.postgresql.PostgresStreamingChangeEventSource
>>> [] - WAL resume position 'LSN{6/C9807FD8}' discovered
>>>
>>> 2024-06-14 09:24:30,494 INFO  io.debezium.jdbc.JdbcConnection
>>>                     [] - Connection gracefully closed
>>>
>>> 2024-06-14 09:24:30,517 INFO  
>>> io.debezium.connector.postgresql.connection.WalPositionLocator
>>> [] - First LSN 'LSN{6/C9807FD8}' received
>>>
>>> 2024-06-14 09:24:30,517 INFO  
>>> io.debezium.connector.postgresql.PostgresStreamingChangeEventSource
>>> [] - WAL resume position 'LSN{6/C9807FD8}' discovered
>>>
>>> 2024-06-14 09:24:30,520 INFO  io.debezium.jdbc.JdbcConnection
>>>                     [] - Connection gracefully closed
>>>
>>> 2024-06-14 09:24:31,138 INFO  io.debezium.util.Threads
>>>                     [] - Requested thread factory for connector
>>> PostgresConnector, id = postgres_cdc_source named = keep-alive
>>>
>>> 2024-06-14 09:24:31,138 INFO  io.debezium.util.Threads
>>>                     [] - Creating thread
>>> debezium-postgresconnector-postgres_cdc_source-keep-alive
>>>
>>> 2024-06-14 09:24:31,139 INFO  
>>> io.debezium.connector.postgresql.PostgresStreamingChangeEventSource
>>> [] - Processing messages
>>>
>>> 2024-06-14 09:24:31,169 INFO  io.debezium.util.Threads
>>>                     [] - Requested thread factory for connector
>>> PostgresConnector, id = postgres_cdc_source named = keep-alive
>>>
>>> 2024-06-14 09:24:31,169 INFO  io.debezium.util.Threads
>>>                     [] - Creating thread
>>> debezium-postgresconnector-postgres_cdc_source-keep-alive
>>>
>>> 2024-06-14 09:24:31,169 INFO  
>>> io.debezium.connector.postgresql.PostgresStreamingChangeEventSource
>>> [] - Processing messages
>>>
>>> 2024-06-14 09:24:33,958 WARN  
>>> io.debezium.connector.postgresql.connection.PostgresReplicationConnection
>>> [] - Failed to start replication stream at LSN{6/C98060F8}, waiting for
>>> PT10S ms and retrying, attempt number 4 over 6
>>>
>>> 2024-06-14 09:24:34,513 WARN  
>>> io.debezium.connector.postgresql.connection.PostgresReplicationConnection
>>> [] - Failed to start replication stream at LSN{6/C98061B0}, waiting for
>>> PT10S ms and retrying, attempt number 4 over 6
>>>
>>> 2024-06-14 09:24:44,113 WARN  
>>> io.debezium.connector.postgresql.connection.PostgresReplicationConnection
>>> [] - Failed to start replication stream at LSN{6/C98060F8}, waiting for
>>> PT10S ms and retrying, attempt number 5 over 6
>>>
>>> 2024-06-14 09:24:44,668 WARN  
>>> io.debezium.connector.postgresql.connection.PostgresReplicationConnection
>>> [] - Failed to start replication stream at LSN{6/C98061B0}, waiting for
>>> PT10S ms and retrying, attempt number 5 over 6
>>>
>>> 2024-06-14 09:24:54,332 WARN  
>>> io.debezium.connector.postgresql.connection.PostgresReplicationConnection
>>> [] - Failed to start replication stream at LSN{6/C98060F8}, waiting for
>>> PT10S ms and retrying, attempt number 6 over 6
>>>
>>> 2024-06-14 09:24:54,822 WARN  
>>> io.debezium.connector.postgresql.connection.PostgresReplicationConnection
>>> [] - Failed to start replication stream at LSN{6/C98061B0}, waiting for
>>> PT10S ms and retrying, attempt number 6 over 6
>>>
>>> 2024-06-14 09:24:55,788 INFO  
>>> io.debezium.connector.postgresql.connection.WalPositionLocator
>>> [] - Message with LSN 'LSN{6/C9807FD8}' arrived, switching off the filtering
>>>
>>> 2024-06-14 09:24:55,798 INFO  
>>> io.debezium.connector.postgresql.connection.WalPositionLocator
>>> [] - Message with LSN 'LSN{6/C9807FD8}' arrived, switching off the filtering
>>>
>>

Reply via email to