DcWww opened a new issue, #9807: URL: https://github.com/apache/seatunnel/issues/9807
### Search before asking - [x] I had searched in the [issues](https://github.com/apache/seatunnel/issues?q=is%3Aissue+label%3A%22bug%22) and found no similar issues. ### What happened 在高可用集群(3个Master节点:a, b, c)环境中,从 PostgreSQL 到 Apache Doris /PostgreSQL 的数据同步任务在当前的 Leader Master (a) 上运行。当手动终止 (`kill`) Master a 的进程后,观察到: 1、Leader Master变成b 2. 任务进程仍然显示为“运行中”,此时对PG库做数据变更,但 Doris 中的数据停止更新。 3. 此时若使用命令手动 `/hazelcast/rest/maps/stop-jobs` 该任务,Doris 中的数据会立即更新到最新状态。 ### SeaTunnel Version 2.3.9 ### SeaTunnel Config ```conf seatunnel: engine: classloader-cache-mode: true history-job-expire-minutes: 1440 backup-count: 1 queue-type: blockingqueue print-execution-info-interval: 60 print-job-metrics-info-interval: 60 slot-service: dynamic-slot: true checkpoint: interval: 10000 timeout: 60000 storage: type: hdfs max-retained: 3 plugin-config: namespace: /seatunnel/checkpoint_snapshot storage.type: hdfs fs.defaultFS: hdfs://ctyunns #if you need hdfs-site config, you can config like this: hdfs_site_path: "/hadoop-conf/hdfs-site.xml" seatunnel.hadoop.dfs.nameservices: ctyunns seatunnel.hadoop.dfs.ha.namenodes.ctyunns: nn1,nn2 seatunnel.hadoop.dfs.namenode.rpc-address.ctyunns.nn1: a1:54310 seatunnel.hadoop.dfs.namenode.rpc-address.ctyunns.nn2: a7:54310 seatunnel.hadoop.dfs.client.failover.proxy.provider.ctyunns: org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider # if you used kerberos, you can config like this: krb5Path: "/hadoop-conf/krb5.conf" kerberosPrincipal: "hdfs/[email protected]" kerberosKeytabFilePath: "/hadoop-conf/hdfs.keytab" telemetry: metric: enabled: false log: scheduled-deletion-enable: true http: enable-http: true port: 8080 enable-dynamic-port: false hazelcast: cluster-name: seatunnel network: rest-api: enabled: true endpoint-groups: CLUSTER_WRITE: enabled: true DATA: enabled: true join: tcp-ip: enabled: true member-list: - a:5801 - b:5801 - c:5801 - a:5802 - b:5802 - c:5802 port: auto-increment: false port: 5801 properties: hazelcast.invocation.max.retry.count: 20 hazelcast.tcp.join.port.try.count: 30 hazelcast.logging.type: log4j2 hazelcast.operation.generic.thread.count: 50 hazelcast.heartbeat.failuredetector.type: phi-accrual hazelcast.heartbeat.interval.seconds: 2 hazelcast.max.no.heartbeat.seconds: 180 hazelcast.heartbeat.phiaccrual.failuredetector.threshold: 10 hazelcast.heartbeat.phiaccrual.failuredetector.sample.size: 200 hazelcast.heartbeat.phiaccrual.failuredetector.min.std.dev.millis: 100 map: engine*: map-store: enabled: true initial-mode: EAGER factory-class-name: org.apache.seatunnel.engine.server.persistence.FileMapStoreFactory properties: type: hdfs namespace: /seatunnel/imap clusterName: seatunnel storage.type: hdfs fs.defaultFS: hdfs://ctyunns hdfs_site_path: "/hadoop-conf/hdfs-site.xml" # 新增 Kerberos 认证配置 # if you need hdfs-site config, you can config like this: seatunnel.hadoop.dfs.nameservices: ctyunns seatunnel.hadoop.dfs.ha.namenodes.ctyunns: nn1,nn2 seatunnel.hadoop.dfs.namenode.rpc-address.ctyunns.nn1: a1:54310 seatunnel.hadoop.dfs.namenode.rpc-address.ctyunns.nn2: a7:54310 seatunnel.hadoop.dfs.client.failover.proxy.provider.ctyunns: org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider # if you used kerberos, you can config like this: krb5Path: "/hadoop-conf/krb5.conf" kerberosPrincipal: "hdfs/[email protected]" kerberosKeytabFilePath: "/hadoop-conf/hdfs.keytab" ``` ### Running Command ```shell { "env": { "job.mode": "STREAMING", "checkpoint.timeout": "500000", "job.name": "SeaTunnel_Job", "read_limit.rows_per_second": "10000", "checkpoint.interval": "5000", "parallelism": "5" }, "source": [{ "base-url": "jdbc:postgresql://xxxx", "startup.mode": "latest", "table-names": [ "xxx" ], "result_table_name": "102100000000052670_table_source", "database-names": [ "xxxx" ], "plugin_name": "Postgres-CDC", "password": "xxxx", "snapshot.split.size": 20000, "snapshot.fetch.size": 5000, "connection.pool.size": "20", "connect.max-retries": "3", "connect.timeout.ms": "30000", "username": "xxx", "slot.name": "1952315293163560963_1005134", "debezium": { "publication.autocreate.mode": "filtered", "publication.name": "qeaadsrvos1" } }], "transform": [], "sink": [{ "fenodes": "xxxx:8035", "query-port": "9030", "username": "xxx", "password": "xxxx", "database": "xxxx", "table": "${table_name}", "source_table_name": "102100000000052670_table_sink", "sink.enable-2pc": "false", "doris.batch.size": 20000, "sink.buffer-size": 20971520, "sink.enable-delete": true, "plugin_name": "Doris", "schema_save_mode": "ERROR_WHEN_SCHEMA_NOT_EXIST", "doris.config": { "format": "json", "read_json_by_line": "true", "num_as_string": "true" } }] } ``` ### Error Exception ```log 目标端数据不变化,无报错信息 ``` ### Zeta or Flink or Spark Version _No response_ ### Java or Scala Version _No response_ ### Screenshots _No response_ ### Are you willing to submit PR? - [ ] Yes I am willing to submit a PR! ### Code of Conduct - [x] I agree to follow this project's [Code of Conduct](https://www.apache.org/foundation/policies/conduct) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
