These sink share the same source. The pipeline that works looks something like this:
table1 -> process1 -> process2 -> sink2 When I change it to this: table1 -> process1 -> process2 -> sink2 `--> sink1 I get the errors described, where it appears that a second process is created that attempts to use the current slot twice. On Mon, Jun 17, 2024 at 1:58 AM Hongshun Wang <loserwang1...@gmail.com> wrote: > Hi David, > > When I add this second sink, the postgres-cdc connector appears to add a > second reader from the replication log, but with the same slot name. > > I don't understand what you mean by adding a second sink. Do they share > the same source, or does each have a separate pipeline? If the former one, > you can share the same source for two sinks, in which case one replication > slot is sufficient. If the later one, if you want each sink to have its own > source, you can set a different slot name for each source (the option name > is slot.name[1]). > > [1] > https://nightlies.apache.org/flink/flink-cdc-docs-master/docs/connectors/flink-sources/postgres-cdc/#connector-options > > On Sat, Jun 15, 2024 at 12:40 AM David Bryson <d...@eplant.bio> wrote: > >> Hi, >> >> I have a stream reading from postgres-cdc connector version 3.1.0. I read >> from two tables: >> >> flink.cleaned_migrations >> public.cleaned >> >> I convert the tables into a datastream, do some processing, then write it >> to a sink at the end of my stream: >> >> joined_table_result = >> joined_with_metadata.execute_insert(daily_sink_property_map['flink_table_name']) >> >> This works well, however I recently tried to add a second table which >> contains state reached in the middle of my stream: >> >> continuous_metrics_table = table_env.execute_sql("SELECT f1, f2, f3 >> from joined_processed_table") >> >> >> continuous_metrics_table.execute_insert(continuous_sink_property_map['flink_table_name']) >> >> When I add this second sink, the postgres-cdc connector appears to add a >> second reader from the replication log, but with the same slot name. It >> seems to behave this way regardless of the sink connector I use, and seems >> to happen in addition to the existing slot that is already allocated to the >> stream. This second reader of course cannot use the same replication slot, >> and so the connector eventually times out. Is this expected behavior from >> the connector? It seems strange the connector would attempt to use a slot >> twice. >> >> I am using incremental snapshots, and I am passing a unique slot per >> table connector. >> >> Logs below: >> >> 2024-06-14 09:23:59,600 INFO >> org.apache.flink.cdc.connectors.postgres.source.utils.TableDiscoveryUtils >> [] - Postgres captured tables : flink.cleaned_migrations . >> >> 2024-06-14 09:23:59,603 INFO io.debezium.jdbc.JdbcConnection >> [] - Connection gracefully closed >> >> 2024-06-14 09:24:00,198 INFO >> org.apache.flink.cdc.connectors.postgres.source.utils.TableDiscoveryUtils >> [] - Postgres captured tables : public.cleaned . >> >> 2024-06-14 09:24:00,199 INFO io.debezium.jdbc.JdbcConnection >> [] - Connection gracefully closed >> >> 2024-06-14 09:24:00,224 INFO >> io.debezium.connector.postgresql.PostgresSnapshotChangeEventSource >> [] - Creating initial offset context >> >> 2024-06-14 09:24:00,417 INFO >> io.debezium.connector.postgresql.PostgresSnapshotChangeEventSource >> [] - Read xlogStart at 'LSN{6/C9806378}' from transaction '73559679' >> >> 2024-06-14 09:24:00,712 INFO io.debezium.jdbc.JdbcConnection >> [] - Connection gracefully closed >> >> 2024-06-14 09:24:00,712 INFO >> org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceReader >> [] - Source reader 0 discovers table schema for stream split stream-split >> success >> >> 2024-06-14 09:24:00,712 INFO >> org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceReader >> [] - Source reader 0 received the stream split : >> StreamSplit{splitId='stream-split', offset=Offset{lsn=LSN{6/C98060F8}, >> txId=73559674, lastCommitTs=-9223372036854775808], >> endOffset=Offset{lsn=LSN{FFFFFFFF/FFFFFFFF}, txId=null, >> lastCommitTs=-9223372036853775810], isSuspended=false}. >> >> 2024-06-14 09:24:00,714 INFO >> org.apache.flink.connector.base.source.reader.SourceReaderBase >> [] - Adding split(s) to reader: [StreamSplit{splitId='stream-split', >> offset=Offset{lsn=LSN{6/C98060F8}, txId=73559674, >> lastCommitTs=-9223372036854775808], >> endOffset=Offset{lsn=LSN{FFFFFFFF/FFFFFFFF}, txId=null, >> lastCommitTs=-9223372036853775810], isSuspended=false}] >> >> 2024-06-14 09:24:00,714 INFO >> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher >> [] - Starting split fetcher 0 >> >> 2024-06-14 09:24:00,716 INFO >> org.apache.flink.cdc.connectors.base.source.enumerator.IncrementalSourceEnumerator >> [] - The enumerator receives notice from subtask 0 for the stream split >> assignment. >> >> 2024-06-14 09:24:00,721 INFO >> org.apache.flink.cdc.connectors.postgres.source.fetch.PostgresSourceFetchTaskContext >> [] - PostgresConnectorConfig is >> >> 2024-06-14 09:24:00,847 INFO >> io.debezium.connector.postgresql.PostgresSnapshotChangeEventSource >> [] - Creating initial offset context >> >> 2024-06-14 09:24:01,000 INFO >> io.debezium.connector.postgresql.PostgresSnapshotChangeEventSource >> [] - Read xlogStart at 'LSN{6/C9806430}' from transaction '73559682' >> >> 2024-06-14 09:24:01,270 INFO io.debezium.jdbc.JdbcConnection >> [] - Connection gracefully closed >> >> 2024-06-14 09:24:01,271 INFO >> org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceReader >> [] - Source reader 0 discovers table schema for stream split stream-split >> success >> >> 2024-06-14 09:24:01,271 INFO >> org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceReader >> [] - Source reader 0 received the stream split : >> StreamSplit{splitId='stream-split', offset=Offset{lsn=LSN{6/C98061B0}, >> txId=73559676, lastCommitTs=-9223372036854775808], >> endOffset=Offset{lsn=LSN{FFFFFFFF/FFFFFFFF}, txId=null, >> lastCommitTs=-9223372036853775810], isSuspended=false}. >> >> 2024-06-14 09:24:01,272 INFO >> org.apache.flink.connector.base.source.reader.SourceReaderBase >> [] - Adding split(s) to reader: [StreamSplit{splitId='stream-split', >> offset=Offset{lsn=LSN{6/C98061B0}, txId=73559676, >> lastCommitTs=-9223372036854775808], >> endOffset=Offset{lsn=LSN{FFFFFFFF/FFFFFFFF}, txId=null, >> lastCommitTs=-9223372036853775810], isSuspended=false}] >> >> 2024-06-14 09:24:01,273 INFO >> org.apache.flink.cdc.connectors.base.source.enumerator.IncrementalSourceEnumerator >> [] - The enumerator receives notice from subtask 0 for the stream split >> assignment. >> >> 2024-06-14 09:24:01,274 INFO >> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher >> [] - Starting split fetcher 0 >> >> 2024-06-14 09:24:01,276 INFO >> org.apache.flink.cdc.connectors.postgres.source.fetch.PostgresSourceFetchTaskContext >> [] - PostgresConnectorConfig is >> >> 2024-06-14 09:24:02,150 INFO >> io.debezium.connector.postgresql.PostgresObjectUtils >> [] - Creating a new replication connection for >> io.debezium.connector.postgresql.PostgresTaskContext@23f6230 >> >> 2024-06-14 09:24:02,176 INFO >> org.apache.flink.cdc.connectors.postgres.source.fetch.PostgresStreamFetchTask$StreamSplitReadTask >> [] - Execute StreamSplitReadTask for split: >> StreamSplit{splitId='stream-split', offset=Offset{lsn=LSN{6/C98060F8}, >> txId=73559674, lastCommitTs=-9223372036854775808], >> endOffset=Offset{lsn=LSN{FFFFFFFF/FFFFFFFF}, txId=null, >> lastCommitTs=-9223372036853775810], isSuspended=false} >> >> 2024-06-14 09:24:02,176 INFO >> io.debezium.connector.postgresql.PostgresStreamingChangeEventSource >> [] - Retrieved latest position from stored offset 'LSN{6/C98060F8}' >> >> 2024-06-14 09:24:02,177 INFO >> io.debezium.connector.postgresql.connection.WalPositionLocator >> [] - Looking for WAL restart position for last commit LSN 'null' and last >> change LSN 'LSN{6/C98060F8}' >> >> 2024-06-14 09:24:02,740 INFO >> io.debezium.connector.postgresql.PostgresObjectUtils >> [] - Creating a new replication connection for >> io.debezium.connector.postgresql.PostgresTaskContext@3d3d6b08 >> >> 2024-06-14 09:24:02,742 INFO >> org.apache.flink.cdc.connectors.postgres.source.fetch.PostgresStreamFetchTask$StreamSplitReadTask >> [] - Execute StreamSplitReadTask for split: >> StreamSplit{splitId='stream-split', offset=Offset{lsn=LSN{6/C98061B0}, >> txId=73559676, lastCommitTs=-9223372036854775808], >> endOffset=Offset{lsn=LSN{FFFFFFFF/FFFFFFFF}, txId=null, >> lastCommitTs=-9223372036853775810], isSuspended=false} >> >> 2024-06-14 09:24:02,742 INFO >> io.debezium.connector.postgresql.PostgresStreamingChangeEventSource >> [] - Retrieved latest position from stored offset 'LSN{6/C98061B0}' >> >> 2024-06-14 09:24:02,743 INFO >> io.debezium.connector.postgresql.connection.WalPositionLocator >> [] - Looking for WAL restart position for last commit LSN 'null' and last >> change LSN 'LSN{6/C98061B0}' >> >> 2024-06-14 09:24:02,779 INFO >> io.debezium.connector.postgresql.connection.PostgresConnection >> [] - Obtained valid replication slot ReplicationSlot [active=true, >> latestFlushedLsn=LSN{6/B8C36900}, catalogXmin=73461500] >> >> 2024-06-14 09:24:02,783 INFO io.debezium.jdbc.JdbcConnection >> [] - Connection gracefully closed >> >> 2024-06-14 09:24:03,317 INFO >> io.debezium.connector.postgresql.connection.PostgresConnection >> [] - Obtained valid replication slot ReplicationSlot [active=true, >> latestFlushedLsn=LSN{6/B8C36820}, catalogXmin=73461500] >> >> 2024-06-14 09:24:03,322 INFO io.debezium.jdbc.JdbcConnection >> [] - Connection gracefully closed >> >> 2024-06-14 09:24:03,488 WARN >> io.debezium.connector.postgresql.connection.PostgresReplicationConnection >> [] - Failed to start replication stream at LSN{6/C98060F8}, waiting for >> PT10S ms and retrying, attempt number 1 over 6 >> >> 2024-06-14 09:24:04,017 WARN >> io.debezium.connector.postgresql.connection.PostgresReplicationConnection >> [] - Failed to start replication stream at LSN{6/C98061B0}, waiting for >> PT10S ms and retrying, attempt number 1 over 6 >> >> 2024-06-14 09:24:13,649 WARN >> io.debezium.connector.postgresql.connection.PostgresReplicationConnection >> [] - Failed to start replication stream at LSN{6/C98060F8}, waiting for >> PT10S ms and retrying, attempt number 2 over 6 >> >> 2024-06-14 09:24:14,167 WARN >> io.debezium.connector.postgresql.connection.PostgresReplicationConnection >> [] - Failed to start replication stream at LSN{6/C98061B0}, waiting for >> PT10S ms and retrying, attempt number 2 over 6 >> >> 2024-06-14 09:24:23,799 WARN >> io.debezium.connector.postgresql.connection.PostgresReplicationConnection >> [] - Failed to start replication stream at LSN{6/C98060F8}, waiting for >> PT10S ms and retrying, attempt number 3 over 6 >> >> 2024-06-14 09:24:24,320 WARN >> io.debezium.connector.postgresql.connection.PostgresReplicationConnection >> [] - Failed to start replication stream at LSN{6/C98061B0}, waiting for >> PT10S ms and retrying, attempt number 3 over 6 >> >> 2024-06-14 09:24:30,481 INFO >> io.debezium.connector.postgresql.connection.WalPositionLocator >> [] - First LSN 'LSN{6/C9807FD8}' received >> >> 2024-06-14 09:24:30,491 INFO >> io.debezium.connector.postgresql.PostgresStreamingChangeEventSource >> [] - WAL resume position 'LSN{6/C9807FD8}' discovered >> >> 2024-06-14 09:24:30,494 INFO io.debezium.jdbc.JdbcConnection >> [] - Connection gracefully closed >> >> 2024-06-14 09:24:30,517 INFO >> io.debezium.connector.postgresql.connection.WalPositionLocator >> [] - First LSN 'LSN{6/C9807FD8}' received >> >> 2024-06-14 09:24:30,517 INFO >> io.debezium.connector.postgresql.PostgresStreamingChangeEventSource >> [] - WAL resume position 'LSN{6/C9807FD8}' discovered >> >> 2024-06-14 09:24:30,520 INFO io.debezium.jdbc.JdbcConnection >> [] - Connection gracefully closed >> >> 2024-06-14 09:24:31,138 INFO io.debezium.util.Threads >> [] - Requested thread factory for connector >> PostgresConnector, id = postgres_cdc_source named = keep-alive >> >> 2024-06-14 09:24:31,138 INFO io.debezium.util.Threads >> [] - Creating thread >> debezium-postgresconnector-postgres_cdc_source-keep-alive >> >> 2024-06-14 09:24:31,139 INFO >> io.debezium.connector.postgresql.PostgresStreamingChangeEventSource >> [] - Processing messages >> >> 2024-06-14 09:24:31,169 INFO io.debezium.util.Threads >> [] - Requested thread factory for connector >> PostgresConnector, id = postgres_cdc_source named = keep-alive >> >> 2024-06-14 09:24:31,169 INFO io.debezium.util.Threads >> [] - Creating thread >> debezium-postgresconnector-postgres_cdc_source-keep-alive >> >> 2024-06-14 09:24:31,169 INFO >> io.debezium.connector.postgresql.PostgresStreamingChangeEventSource >> [] - Processing messages >> >> 2024-06-14 09:24:33,958 WARN >> io.debezium.connector.postgresql.connection.PostgresReplicationConnection >> [] - Failed to start replication stream at LSN{6/C98060F8}, waiting for >> PT10S ms and retrying, attempt number 4 over 6 >> >> 2024-06-14 09:24:34,513 WARN >> io.debezium.connector.postgresql.connection.PostgresReplicationConnection >> [] - Failed to start replication stream at LSN{6/C98061B0}, waiting for >> PT10S ms and retrying, attempt number 4 over 6 >> >> 2024-06-14 09:24:44,113 WARN >> io.debezium.connector.postgresql.connection.PostgresReplicationConnection >> [] - Failed to start replication stream at LSN{6/C98060F8}, waiting for >> PT10S ms and retrying, attempt number 5 over 6 >> >> 2024-06-14 09:24:44,668 WARN >> io.debezium.connector.postgresql.connection.PostgresReplicationConnection >> [] - Failed to start replication stream at LSN{6/C98061B0}, waiting for >> PT10S ms and retrying, attempt number 5 over 6 >> >> 2024-06-14 09:24:54,332 WARN >> io.debezium.connector.postgresql.connection.PostgresReplicationConnection >> [] - Failed to start replication stream at LSN{6/C98060F8}, waiting for >> PT10S ms and retrying, attempt number 6 over 6 >> >> 2024-06-14 09:24:54,822 WARN >> io.debezium.connector.postgresql.connection.PostgresReplicationConnection >> [] - Failed to start replication stream at LSN{6/C98061B0}, waiting for >> PT10S ms and retrying, attempt number 6 over 6 >> >> 2024-06-14 09:24:55,788 INFO >> io.debezium.connector.postgresql.connection.WalPositionLocator >> [] - Message with LSN 'LSN{6/C9807FD8}' arrived, switching off the filtering >> >> 2024-06-14 09:24:55,798 INFO >> io.debezium.connector.postgresql.connection.WalPositionLocator >> [] - Message with LSN 'LSN{6/C9807FD8}' arrived, switching off the filtering >> >