Hi, David.
We've met a similar problem of pg connection, the error message is 'Socket
is closed' and we put a lot of effort into investigating, but we couldn't
find the reason.
Then we modify the publication mode[1] and only subscribe the changes of
certain table with following connector options:
'decoding.plugin.name' = 'pgoutput',
"slot.name": "flink_cleaned_v2",
'debezium.publication.name' = 'flink_cleaned_v2_publication',
'debezium.publication.autocreate.mode'='filtered'

and publication and slot should be pre-create by SQL like following:
CREATE PUBLICATION flink_cleaned_v2_publication FOR TABLE your_table;
SELECT pg_create_logical_replication_slot('flink_cleaned_v2', 'pgoutput');

This can reduce the network traffic sent to the client, and the situation
of disconnection is almost not happening. Maybe you can have a try on this,
and I'm looking forward to your feedback.
[1]
https://debezium.io/documentation/reference/stable/connectors/postgresql.html#postgresql-publication-autocreate-mode


David Bryson <d...@eplant.bio> 于2024年7月3日周三 08:46写道:

> I have a flink stream using Postgres-CDC as a source.  It's been operating
> mostly fine, but I recently had to stop, and then start the stream again.
> The stream is never able to start again as the replication never completes
> and Flink enters a restart loop.
>
> Upon starting the cdc reader task issues a "START REPLICATION" call on the
> postgres primary, this call then spent 1-1.5 hours transferring data and
> the operator is 100% busy.
> I'm not sure why the connector would not resume from the most recent
> snapshot, as the configuration is for 'latest-offset'.  Here are the
> connector options:
>
>      "slot.name": "flink_cleaned_v2",
>      "heartbeat.interval.ms": "15000",
>      "scan.snapshot.fetch.size": "8192",
>      "debezium.max.queue.size": "2048",
>      "debezium.max.batch.size": "1024",
>      "scan.incremental.snapshot.enabled": "true",
>      "scan.incremental.snapshot.chunk.size": "80960",
>      "debezium.slot.drop.on.stop": "false",
>      "debezium.slot.max.retries": "15",
>      "debezium.slot.retry.delay.ms": "67000",
>
> The logs on the RDS suggest that the CDC client is disconnecting, and the
> logs on Flink seem to suggest the RDS is disconnecting.  I'm very confused
> by this as my wal_sender_timeout is 120s.  Are there other settings I
> should be adjusting? How can I figure out who is disconnecting from who?
> It really feels like a socket/keep alive timeout of some kind is being
> missed.
>
> Flink 1.18
> CDC 3.1.1
>
> The RDS logs:
>
> 2024-07-02 21:02:22 UTC:10.32.1.45(7501):postgres_rw@dendrometer:[24901]:LOG:
>  could not receive data from client: Connection reset by peer
> 2024-07-02 21:02:22 UTC:10.32.0.67(10827):postgres_rw@dendrometer:[26483]:LOG:
>  could not receive data from client: Connection reset by peer
>
> Flink throwable:
>
> java.lang.RuntimeException: One or more fetchers have encountered exception
> at
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager.checkErrors(SplitFetcherManager.java:263)
> at
> org.apache.flink.connector.base.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:185)
> at
> org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:147)
> at
> org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:173)
> at
> org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:419)
> at
> org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)
> at
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:562)
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:858)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:807)
> at
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:955)
> at
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:934)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:748)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:564)
> at java.base/java.lang.Thread.run(Thread.java:829)
> Caused by: java.lang.RuntimeException: SplitFetcher thread 0 received
> unexpected exception while polling the records
> at
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:168)
> at
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:117)
> at
> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
> at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
> at
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
> at
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
> ... 1 more
> Caused by: java.io.IOException:
> org.apache.flink.cdc.connectors.shaded.org.apache.kafka.connect.errors.RetriableException:
> An exception occurred in the change event producer. This connector will be
> restarted.
> at
> org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceSplitReader.fetch(IncrementalSourceSplitReader.java:101)
> at
> org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58)
> at
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:165)
> ... 6 more
> Caused by:
> org.apache.flink.cdc.connectors.shaded.org.apache.kafka.connect.errors.RetriableException:
> An exception occurred in the change event producer. This connector will be
> restarted.
> at
> io.debezium.pipeline.ErrorHandler.setProducerThrowable(ErrorHandler.java:46)
> at
> io.debezium.connector.postgresql.PostgresStreamingChangeEventSource.execute(PostgresStreamingChangeEventSource.java:214)
> at
> org.apache.flink.cdc.connectors.postgres.source.fetch.PostgresStreamFetchTask$StreamSplitReadTask.execute(PostgresStreamFetchTask.java:216)
> at
> org.apache.flink.cdc.connectors.postgres.source.fetch.PostgresStreamFetchTask.execute(PostgresStreamFetchTask.java:97)
> at
> org.apache.flink.cdc.connectors.base.source.reader.external.IncrementalSourceStreamFetcher.lambda$submitTask$0(IncrementalSourceStreamFetcher.java:89)
> ... 5 more
> Caused by: org.postgresql.util.PSQLException: Database connection failed
> when writing to copy
> at
> org.postgresql.core.v3.QueryExecutorImpl.flushCopy(QueryExecutorImpl.java:1144)
> at org.postgresql.core.v3.CopyDualImpl.flushCopy(CopyDualImpl.java:30)
> at
> org.postgresql.core.v3.replication.V3PGReplicationStream.updateStatusInternal(V3PGReplicationStream.java:195)
> at
> org.postgresql.core.v3.replication.V3PGReplicationStream.timeUpdateStatus(V3PGReplicationStream.java:186)
> at
> org.postgresql.core.v3.replication.V3PGReplicationStream.readInternal(V3PGReplicationStream.java:128)
> at
> org.postgresql.core.v3.replication.V3PGReplicationStream.readPending(V3PGReplicationStream.java:82)
> at
> io.debezium.connector.postgresql.connection.PostgresReplicationConnection$1.readPending(PostgresReplicationConnection.java:588)
> at
> io.debezium.connector.postgresql.PostgresStreamingChangeEventSource.processMessages(PostgresStreamingChangeEventSource.java:257)
> at
> io.debezium.connector.postgresql.PostgresStreamingChangeEventSource.execute(PostgresStreamingChangeEventSource.java:212)
> ... 8 more
> Caused by: java.net.SocketException: Broken pipe (Write failed)
> at java.base/java.net.SocketOutputStream.socketWrite0(Native Method)
> at
> java.base/java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:110)
> at java.base/java.net.SocketOutputStream.write(SocketOutputStream.java:150)
> at
> java.base/sun.security.ssl.SSLSocketOutputRecord.deliver(SSLSocketOutputRecord.java:345)
> at
> java.base/sun.security.ssl.SSLSocketImpl$AppOutputStream.write(SSLSocketImpl.java:1305)
> at
> java.base/java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:81)
> at
> java.base/java.io.BufferedOutputStream.flush(BufferedOutputStream.java:142)
> at org.postgresql.core.PGStream.flush(PGStream.java:709)
> at
> org.postgresql.core.v3.QueryExecutorImpl.flushCopy(QueryExecutorImpl.java:1142)
>

Reply via email to