Hi, David. We've met a similar problem of pg connection, the error message is 'Socket is closed' and we put a lot of effort into investigating, but we couldn't find the reason. Then we modify the publication mode[1] and only subscribe the changes of certain table with following connector options: 'decoding.plugin.name' = 'pgoutput', "slot.name": "flink_cleaned_v2", 'debezium.publication.name' = 'flink_cleaned_v2_publication', 'debezium.publication.autocreate.mode'='filtered'
and publication and slot should be pre-create by SQL like following: CREATE PUBLICATION flink_cleaned_v2_publication FOR TABLE your_table; SELECT pg_create_logical_replication_slot('flink_cleaned_v2', 'pgoutput'); This can reduce the network traffic sent to the client, and the situation of disconnection is almost not happening. Maybe you can have a try on this, and I'm looking forward to your feedback. [1] https://debezium.io/documentation/reference/stable/connectors/postgresql.html#postgresql-publication-autocreate-mode David Bryson <d...@eplant.bio> 于2024年7月3日周三 08:46写道: > I have a flink stream using Postgres-CDC as a source. It's been operating > mostly fine, but I recently had to stop, and then start the stream again. > The stream is never able to start again as the replication never completes > and Flink enters a restart loop. > > Upon starting the cdc reader task issues a "START REPLICATION" call on the > postgres primary, this call then spent 1-1.5 hours transferring data and > the operator is 100% busy. > I'm not sure why the connector would not resume from the most recent > snapshot, as the configuration is for 'latest-offset'. Here are the > connector options: > > "slot.name": "flink_cleaned_v2", > "heartbeat.interval.ms": "15000", > "scan.snapshot.fetch.size": "8192", > "debezium.max.queue.size": "2048", > "debezium.max.batch.size": "1024", > "scan.incremental.snapshot.enabled": "true", > "scan.incremental.snapshot.chunk.size": "80960", > "debezium.slot.drop.on.stop": "false", > "debezium.slot.max.retries": "15", > "debezium.slot.retry.delay.ms": "67000", > > The logs on the RDS suggest that the CDC client is disconnecting, and the > logs on Flink seem to suggest the RDS is disconnecting. I'm very confused > by this as my wal_sender_timeout is 120s. Are there other settings I > should be adjusting? How can I figure out who is disconnecting from who? > It really feels like a socket/keep alive timeout of some kind is being > missed. > > Flink 1.18 > CDC 3.1.1 > > The RDS logs: > > 2024-07-02 21:02:22 UTC:10.32.1.45(7501):postgres_rw@dendrometer:[24901]:LOG: > could not receive data from client: Connection reset by peer > 2024-07-02 21:02:22 UTC:10.32.0.67(10827):postgres_rw@dendrometer:[26483]:LOG: > could not receive data from client: Connection reset by peer > > Flink throwable: > > java.lang.RuntimeException: One or more fetchers have encountered exception > at > org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager.checkErrors(SplitFetcherManager.java:263) > at > org.apache.flink.connector.base.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:185) > at > org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:147) > at > org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:173) > at > org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:419) > at > org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68) > at > org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:562) > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:858) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:807) > at > org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:955) > at > org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:934) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:748) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:564) > at java.base/java.lang.Thread.run(Thread.java:829) > Caused by: java.lang.RuntimeException: SplitFetcher thread 0 received > unexpected exception while polling the records > at > org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:168) > at > org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:117) > at > java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) > at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) > at > java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) > at > java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) > ... 1 more > Caused by: java.io.IOException: > org.apache.flink.cdc.connectors.shaded.org.apache.kafka.connect.errors.RetriableException: > An exception occurred in the change event producer. This connector will be > restarted. > at > org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceSplitReader.fetch(IncrementalSourceSplitReader.java:101) > at > org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58) > at > org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:165) > ... 6 more > Caused by: > org.apache.flink.cdc.connectors.shaded.org.apache.kafka.connect.errors.RetriableException: > An exception occurred in the change event producer. This connector will be > restarted. > at > io.debezium.pipeline.ErrorHandler.setProducerThrowable(ErrorHandler.java:46) > at > io.debezium.connector.postgresql.PostgresStreamingChangeEventSource.execute(PostgresStreamingChangeEventSource.java:214) > at > org.apache.flink.cdc.connectors.postgres.source.fetch.PostgresStreamFetchTask$StreamSplitReadTask.execute(PostgresStreamFetchTask.java:216) > at > org.apache.flink.cdc.connectors.postgres.source.fetch.PostgresStreamFetchTask.execute(PostgresStreamFetchTask.java:97) > at > org.apache.flink.cdc.connectors.base.source.reader.external.IncrementalSourceStreamFetcher.lambda$submitTask$0(IncrementalSourceStreamFetcher.java:89) > ... 5 more > Caused by: org.postgresql.util.PSQLException: Database connection failed > when writing to copy > at > org.postgresql.core.v3.QueryExecutorImpl.flushCopy(QueryExecutorImpl.java:1144) > at org.postgresql.core.v3.CopyDualImpl.flushCopy(CopyDualImpl.java:30) > at > org.postgresql.core.v3.replication.V3PGReplicationStream.updateStatusInternal(V3PGReplicationStream.java:195) > at > org.postgresql.core.v3.replication.V3PGReplicationStream.timeUpdateStatus(V3PGReplicationStream.java:186) > at > org.postgresql.core.v3.replication.V3PGReplicationStream.readInternal(V3PGReplicationStream.java:128) > at > org.postgresql.core.v3.replication.V3PGReplicationStream.readPending(V3PGReplicationStream.java:82) > at > io.debezium.connector.postgresql.connection.PostgresReplicationConnection$1.readPending(PostgresReplicationConnection.java:588) > at > io.debezium.connector.postgresql.PostgresStreamingChangeEventSource.processMessages(PostgresStreamingChangeEventSource.java:257) > at > io.debezium.connector.postgresql.PostgresStreamingChangeEventSource.execute(PostgresStreamingChangeEventSource.java:212) > ... 8 more > Caused by: java.net.SocketException: Broken pipe (Write failed) > at java.base/java.net.SocketOutputStream.socketWrite0(Native Method) > at > java.base/java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:110) > at java.base/java.net.SocketOutputStream.write(SocketOutputStream.java:150) > at > java.base/sun.security.ssl.SSLSocketOutputRecord.deliver(SSLSocketOutputRecord.java:345) > at > java.base/sun.security.ssl.SSLSocketImpl$AppOutputStream.write(SSLSocketImpl.java:1305) > at > java.base/java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:81) > at > java.base/java.io.BufferedOutputStream.flush(BufferedOutputStream.java:142) > at org.postgresql.core.PGStream.flush(PGStream.java:709) > at > org.postgresql.core.v3.QueryExecutorImpl.flushCopy(QueryExecutorImpl.java:1142) >