hmalaspina opened a new issue, #10149:
URL: https://github.com/apache/seatunnel/issues/10149

   Hello, 
   I am having issues with using the Postgres-CDC source connector.
   I have a setup that streams the captured channges on a Postgres table to an 
Iceberg table.
   In our preproduction environment that has very low traffic it's working fine.
   In our production environment it periodically gets 
`java.net.SocketException: Socket is closed` leading to the job being cancelled 
after too many exceptions.
   I have tried increasing Postgres `wal_sender_timeout` to 20 minutes. Does 
not make any difference with the default 1 minute
   
   Some informations anout our setup:
   The job is run with the Zeta engine in a Seatunnel cluster comprised of 2 
masters and 2 workers, all in EKS.
   The Postgres DB version is 16.8. The Postgres instance and the Seatunnel 
cluster are in the same VPC.
   The Seatunnel has been deployed using the Seatunnel provided helm chart.
   
   The only meaningfull change made to the values.yaml is to pin down the 
seatunnel version to 2.3.11 as I have never managed to get the Postgres 
connector to connect to our table in 2.3.12.
   
   Here is the full diff of the values for the sake of completeness:
   ```
   Common subdirectories: seatunnel-helm-original/conf and seatunnel-helm/conf
   Common subdirectories: seatunnel-helm-original/templates and 
seatunnel-helm/templates
   diff --color=auto seatunnel-helm-original/values.yaml 
seatunnel-helm/values.yaml
   24c24
   <   tag: ""
   ---
   >   tag: "2.3.11"
   31c31
   <     value: Asia/Shanghai
   ---
   >     value: UTC
   48,51c48,51
   <     prometheus.io/path: /hazelcast/rest/instance/metrics
   <     prometheus.io/port: "5801"
   <     prometheus.io/scrape: "true"
   <     prometheus.io/role: "seatunnel-master"
   ---
   >     # prometheus.io/path: /hazelcast/rest/instance/metrics
   >     # prometheus.io/port: "5801"
   >     # prometheus.io/scrape: "true"
   >     # prometheus.io/role: "seatunnel-master"
   58c58,59
   <   nodeSelector: {}
   ---
   >   nodeSelector:
   >     node.family: on-demand
   61c62,66
   <   tolerations: []
   ---
   >   tolerations:
   >     - key: node.family
   >       operator: Equal
   >       value: on-demand
   >       effect: NoSchedule
   64,71c69,75
   <   resources: {}
   <   # resources:
   <   #   limits:
   <   #     memory: "4Gi"
   <   #     cpu: "4"
   <   #   requests:
   <   #     memory: "2Gi"
   <   #     cpu: "500m"
   ---
   >   resources:
   >     limits:
   >       memory: "4Gi"
   >       cpu: "2"
   >     requests:
   >       memory: "4Gi"
   >       cpu: "2"
   111a116
   >     karpenter.sh/do-not-disrupt: "true"
   124,131c129,135
   <   resources: {}
   <   # resources:
   <   #   limits:
   <   #     memory: "4Gi"
   <   #     cpu: "4"
   <   #   requests:
   <   #     memory: "2Gi"
   <   #     cpu: "500m"
   ---
   >   resources:
   >     limits:
   >       memory: "10Gi"
   >       cpu: "2"
   >     requests:
   >       memory: "10Gi"
   >       cpu: "2"
   160a165,196
   >
   > # Secret configuration for database credentials
   > secret:
   >   # Enable secret creation
   >   enabled: true
   >   # Secret name
   >   name: "postgres-credentials"
   >   # Database credentials
   >   dbUser: "redacted"
   >   dbPassword: "redacted" # your password here
   >
   > # Persistent Volume Claim for checkpoint storage
   > persistence:
   >   # Enable PVC creation for checkpoint storage
   >   enabled: true
   >   # PVC name
   >   name: "seatunnel-checkpoint-pvc"
   >   # Access mode
   >   accessModes:
   >     - ReadWriteMany
   >   # Storage size
   >   size: 10Gi
   >   # Storage class name (optional)
   >   storageClassName: "efs-sc"
   >   # Mount path for checkpoints
   >   mountPath: "/tmp/seatunnel/checkpoint_snapshot"
   >
   > # ServiceAccount configuration
   > serviceAccount:
   >   # Annotations for the ServiceAccount (e.g., EKS IAM role)
   >   annotations:
   >     eks.amazonaws.com/role-arn: "redacted
   ```
   
   Here are the current Postgres parameters we are running with.
   ```
   wal_sender_timeout 6min
   tcp_keepalives_idle 300
   tcp_keepalives_interval 30
   tcp_keepalives_count 3
   ```
   
   This is our job definition:
   ```
   # SeaTunnel Job: PostgreSQL CDC to Iceberg Test Table
   # Streams real-time changes from PostgreSQL redacted to Iceberg table 
redacted
   
   env {
     execution.parallelism = 1
     job.retry.times = 3
     job.mode = "STREAMING"
     job.name = "redacted"
     
     # Enable checkpointing for fault tolerance - every 5 minutes # low 
frequency checkpoints allow for less frequent writes to icebrg
     checkpoint.interval = 300000
     checkpoint.timeout = 900000
     
     
     read_limit.rows_per_second = 2500
   }
   
   source {
     Postgres-CDC {
       # Plugin identifier
       plugin_output = "postgres_cdc_source"
       # Database connection
       base-url = "jdbc:postgresql://redacted/redacted?tcpKeepAlive=true"
       username = "redacted"
       password = ${DB_PASSWORD}
       
       # Database and schema to monitor
       database-names = ["redacted"]
       schema-names = ["public"]
       
       # Tables to monitor (format: database.schema.table)
       table-names = ["redacted"]
       
       # Startup mode
       # LATEST: Skip snapshot, start streaming from current WAL position 
(streaming only)
       # INITIAL: Read full snapshot, then continue streaming (recommended for 
CDC)
       # NOTE: After job failure, start fresh job (don't use -r resume, it's 
broken).
       # INITIAL mode + replication slot = automatic recovery from last 
committed position
       startup.mode = "INITIAL"
       
   
       slot.name = "seatunnel_iceberg_test_slot"
       
       # Decoding plugin (pgoutput is recommended for PostgreSQL 10+)
       decoding.plugin.name = "pgoutput"
       
       # Snapshot configuration - optimized for high throughput with 16GB pod
       # With 146M rows and 500k chunks = ~300 splits (manageable memory)
       snapshot.split.size = 500000     # 500k rows per split (CRITICAL - keeps 
split count low)
       snapshot.fetch.size = 30000      # 30k rows per fetch (high performance)
       
       # Incremental snapshot chunk size (for parallel snapshot reading)
       scan.incremental.snapshot.chunk.size = 200000
       
       # Connection settings
       connect.timeout.ms = 800000
       connect.max-retries = 6
       connection.pool.size = 20
       
       # Server timezone
       server-time-zone = "UTC"
       
       # Exactly-once semantics (recommended for production)
       exactly_once = true
       
       # Pass-through Debezium properties
       # Use existing publication created by table owner
       debezium = {
           "publication.autocreate.mode" = "disabled"
           "publication.name" = "seatunnel_cdc_publication"
                # keepalive / heartbeat
       }
       
       # Output format
       format = "DEFAULT"
     }
   }
   
   sink {
     Iceberg {
       plugin_input = "postgres_cdc_source"
       
       catalog_name = "glue_catalog"
       catalog_type = "glue"
       
       iceberg.table.upsert-mode-enabled = true
       primary_keys = "redacted"
       # AWS Glue catalog configuration with AssumeRoleAwsClientFactory
       iceberg.catalog.config = {
         "catalog-impl" = "org.apache.iceberg.aws.glue.GlueCatalog"
         "warehouse" = "redacted"
         "io-impl" = "org.apache.iceberg.aws.s3.S3FileIO"
         "glue.account-id" = "redacted"
         # Use AssumeRoleAwsClientFactory for cross-account access
         "client.factory" = "org.apache.iceberg.aws.AssumeRoleAwsClientFactory"
         "client.assume-role.arn" = "redacted"
         "client.assume-role.region" = "redacted
         "write.update.mode" = "merge-on-read"
         "write.delete.mode" = "merge-on-read"
         # "write.target-file-size-bytes" = "67108864"  # 64MB files
       }
       
       iceberg.table.write-props = {
         "write.update.mode" = "merge-on-read"
         "write.delete.mode" = "merge-on-read"
   
       }
   
       namespace = "redacted"
       table = "redacted"
     }
   }
   ```
   
   This is our stack trace when the exception happens:
   ```
   Exception in thread "main" 
org.apache.seatunnel.core.starter.exception.CommandExecuteException: SeaTunnel 
job executed failed
        at 
org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:228)
        at org.apache.seatunnel.core.starter.SeaTunnel.run(SeaTunnel.java:40)
        at 
org.apache.seatunnel.core.starter.seatunnel.SeaTunnelClient.main(SeaTunnelClient.java:40)
   Caused by: 
org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException: 
java.lang.RuntimeException: One or more fetchers have encountered exception
        at 
org.apache.seatunnel.connectors.seatunnel.common.source.reader.fetcher.SplitFetcherManager.checkErrors(SplitFetcherManager.java:147)
        at 
org.apache.seatunnel.connectors.seatunnel.common.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:167)
        at 
org.apache.seatunnel.connectors.seatunnel.common.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:93)
        at 
org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceReader.pollNext(IncrementalSourceReader.java:119)
        at 
org.apache.seatunnel.engine.server.task.flow.SourceFlowLifeCycle.collect(SourceFlowLifeCycle.java:159)
        at 
org.apache.seatunnel.engine.server.task.SourceSeaTunnelTask.collect(SourceSeaTunnelTask.java:127)
        at 
org.apache.seatunnel.engine.server.task.SeaTunnelTask.stateProcess(SeaTunnelTask.java:165)
        at 
org.apache.seatunnel.engine.server.task.SourceSeaTunnelTask.call(SourceSeaTunnelTask.java:132)
        at 
org.apache.seatunnel.engine.server.TaskExecutionService$BlockingWorker.run(TaskExecutionService.java:694)
        at 
org.apache.seatunnel.engine.server.TaskExecutionService$NamedTaskWrapper.run(TaskExecutionService.java:1023)
        at org.apache.seatunnel.api.tracing.MDCRunnable.run(MDCRunnable.java:43)
        at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:750)
   Caused by: java.lang.RuntimeException: SplitFetcher thread 0 received 
unexpected exception while polling the records
        at 
org.apache.seatunnel.connectors.seatunnel.common.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:165)
        at 
org.apache.seatunnel.connectors.seatunnel.common.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:81)
        ... 5 more
   Caused by: org.apache.kafka.connect.errors.RetriableException: An exception 
occurred in the change event producer. This connector will be restarted.
        at 
io.debezium.pipeline.ErrorHandler.setProducerThrowable(ErrorHandler.java:46)
        at 
io.debezium.connector.postgresql.PostgresStreamingChangeEventSource.execute(PostgresStreamingChangeEventSource.java:180)
        at 
org.apache.seatunnel.connectors.seatunnel.cdc.postgres.source.reader.wal.PostgresWalFetchTask.execute(PostgresWalFetchTask.java:74)
        at 
org.apache.seatunnel.connectors.cdc.base.source.reader.external.IncrementalSourceStreamFetcher.lambda$submitTask$0(IncrementalSourceStreamFetcher.java:107)
        ... 5 more
   Caused by: org.postgresql.util.PSQLException: Database connection failed 
when reading from copy
        at 
org.postgresql.core.v3.QueryExecutorImpl.readFromCopy(QueryExecutorImpl.java:1166)
        at 
org.postgresql.core.v3.CopyDualImpl.readFromCopy(CopyDualImpl.java:44)
        at 
org.postgresql.core.v3.replication.V3PGReplicationStream.receiveNextData(V3PGReplicationStream.java:160)
        at 
org.postgresql.core.v3.replication.V3PGReplicationStream.readInternal(V3PGReplicationStream.java:125)
        at 
org.postgresql.core.v3.replication.V3PGReplicationStream.readPending(V3PGReplicationStream.java:82)
        at 
io.debezium.connector.postgresql.connection.PostgresReplicationConnection$1.readPending(PostgresReplicationConnection.java:504)
        at 
io.debezium.connector.postgresql.PostgresStreamingChangeEventSource.processMessages(PostgresStreamingChangeEventSource.java:215)
        at 
io.debezium.connector.postgresql.PostgresStreamingChangeEventSource.execute(PostgresStreamingChangeEventSource.java:177)
        ... 7 more
   Caused by: java.net.SocketException: Socket is closed
        at java.net.Socket.setSoTimeout(Socket.java:1155)
        at 
sun.security.ssl.BaseSSLSocketImpl.setSoTimeout(BaseSSLSocketImpl.java:639)
        at sun.security.ssl.SSLSocketImpl.setSoTimeout(SSLSocketImpl.java:73)
        at org.postgresql.core.PGStream.hasMessagePending(PGStream.java:210)
        at 
org.postgresql.core.v3.QueryExecutorImpl.processCopyResults(QueryExecutorImpl.java:1208)
        at 
org.postgresql.core.v3.QueryExecutorImpl.readFromCopy(QueryExecutorImpl.java:1164)
        ... 14 more
   
        at 
org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:220)
        ... 2 more
   2025-12-03 00:17:29,651 INFO  [s.c.s.s.c.ClientExecuteCommand] 
[SeaTunnel-CompletableFuture-Thread-0] - run shutdown hook because get close 
signal
   ```
   
   And at the same time we see this in the logs of our database.
   ```
   SSL error: SSLV3_ALERT_UNEXPECTED_MESSAGE
   LOG:  could not receive data from client: Connection reset by peer
   LOG:  unexpected EOF on standby connection
   CONTEXT: slot "seatunnel_iceberg_test_slot", output plugin "pgoutput"
   ```
   
   Does anyone know about this issue ?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to