wsm-zao opened a new issue, #10120: URL: https://github.com/apache/seatunnel/issues/10120
### Search before asking - [x] I had searched in the [issues](https://github.com/apache/seatunnel/issues?q=is%3Aissue+label%3A%22bug%22) and found no similar issues. ### What happened When using SeaTunnel's MySQL-CDC source connector to synchronize data from MySQL to RocketMQ sink, an unexpected duplicate message issue occurs. Specifically, every single UPDATE operation on the MySQL table triggers the generation of 3 identical (or redundant) messages in the target RocketMQ topic, instead of the expected 1 message per update. This issue persists consistently across different types of UPDATE statements (e.g., single-row updates, conditional updates) and is not limited to specific tables or data volumes. The duplicate messages contain the same business data (including before-update/after-update snapshots or full data records) and are sent to the same RocketMQ topic/queue within a short time window (milliseconds apart). The problem disrupts downstream message consumption: consumers receive redundant data, leading to incorrect business logic execution (e.g., repeated data processing, inconsistent statistics) and increased resource overhead (storage, network, and consumption computing costs). Notably, the MySQL binlog only records 1 valid UPDATE event for each operation (verified via mysqlbinlog tool), ruling out the possibility of multiple binlog events being generated from the source database. The duplication occurs exclusively during the SeaTunnel data synchronization process to RocketMQ. ### SeaTunnel Version 2.3.10 ### SeaTunnel Config ```conf { "env": { "parallelism": 1, "job.mode": "STREAMING", "job.name": "update_mq" }, "source": [ { "plugin_name": "MySQL-CDC", "server-id": "2-100", "base-url": "jdbc:mysql://192.168.56.200:3306/cdc_test", "username": "root", "password": "123456", "table-names": [ "cdc_test.operation_logs" ], "startup.mode": "latest" } ], "sink": [ { "plugin_name": "Rocketmq", "name.srv.addr": "192.168.56.200:9876", "topic": "test_topic", "tag": "operation_logs", "partition.key.fields": [ "log_id" ], "producer.send.sync": true } ] } ``` ### Running Command ```shell # master.yaml apiVersion: apps/v1 kind: Deployment metadata: name: seatunnel-cluster-master namespace: seatunnel labels: app.kubernetes.io/name: seatunnel-cluster-master app.kubernetes.io/component: master app.kubernetes.io/version: 2.3.10 spec: replicas: 1 strategy: type: RollingUpdate rollingUpdate: maxUnavailable: 25% maxSurge: 50% selector: matchLabels: app.kubernetes.io/instance: seatunnel-cluster-app app.kubernetes.io/version: 2.3.10 app.kubernetes.io/name: seatunnel-cluster-master app.kubernetes.io/component: master template: metadata: labels: app.kubernetes.io/instance: seatunnel-cluster-app app.kubernetes.io/version: 2.3.10 app.kubernetes.io/name: seatunnel-cluster-master app.kubernetes.io/component: master annotations: prometheus.io/path: "/hazelcast/rest/instance/metrics" prometheus.io/port: "5801" prometheus.io/scrape: "true" prometheus.io/role: "seatunnel-master" spec: nodeSelector: seatunnel: "enable" containers: - name: seatunnel-master image: seatunnel-oss:v1 imagePullPolicy: IfNotPresent ports: - containerPort: 5801 name: hazelcast protocol: TCP - containerPort: 8080 name: master-port protocol: TCP command: - /bin/sh - -c - /opt/seatunnel/bin/seatunnel-cluster.sh -r master -DJvmOption="-Xms256m -Xmx512m -XX:MaxMetaspaceSize=256m -XX:+UseG1GC" resources: requests: cpu: "0.5" memory: "512Mi" limits: cpu: "1" memory: "1Gi" volumeMounts: - mountPath: "/opt/seatunnel/config/hazelcast-master.yaml" name: seatunnel-configs subPath: hazelcast-master.yaml - mountPath: "/opt/seatunnel/config/hazelcast-worker.yaml" name: seatunnel-configs subPath: hazelcast-worker.yaml - mountPath: "/opt/seatunnel/config/seatunnel.yaml" name: seatunnel-configs subPath: seatunnel.yaml - mountPath: "/opt/seatunnel/config/plugin_config" name: seatunnel-configs subPath: plugin_config livenessProbe: httpGet: path: /hazelcast/health/node-state port: 5801 scheme: HTTP initialDelaySeconds: 30 periodSeconds: 10 timeoutSeconds: 5 failureThreshold: 3 readinessProbe: httpGet: path: /hazelcast/health/node-state port: 5801 scheme: HTTP initialDelaySeconds: 15 periodSeconds: 5 timeoutSeconds: 3 failureThreshold: 3 volumes: - name: seatunnel-configs configMap: name: seatunnel-cluster-configs # worker.yaml apiVersion: apps/v1 kind: Deployment metadata: name: seatunnel-cluster-worker namespace: seatunnel labels: app.kubernetes.io/name: seatunnel-cluster-worker app.kubernetes.io/component: worker app.kubernetes.io/version: 2.3.10 spec: replicas: 1 strategy: type: RollingUpdate rollingUpdate: maxUnavailable: 25% maxSurge: 50% selector: matchLabels: app.kubernetes.io/instance: seatunnel-cluster-app app.kubernetes.io/version: 2.3.10 app.kubernetes.io/name: seatunnel-cluster-worker app.kubernetes.io/component: worker template: metadata: labels: app.kubernetes.io/instance: seatunnel-cluster-app app.kubernetes.io/version: 2.3.10 app.kubernetes.io/name: seatunnel-cluster-worker app.kubernetes.io/component: worker annotations: prometheus.io/path: "/hazelcast/rest/instance/metrics" prometheus.io/port: "5801" prometheus.io/scrape: "true" prometheus.io/role: "seatunnel-worker" spec: nodeSelector: seatunnel: "enable" containers: - name: seatunnel-worker image: seatunnel-oss:v1 imagePullPolicy: IfNotPresent ports: - containerPort: 5801 name: hazelcast protocol: TCP command: - /opt/seatunnel/bin/seatunnel-cluster.sh - -r - worker resources: requests: cpu: "1" memory: "1Gi" limits: cpu: "1.5" memory: "2Gi" volumeMounts: - mountPath: "/opt/seatunnel/config/hazelcast-master.yaml" name: seatunnel-configs subPath: hazelcast-master.yaml - mountPath: "/opt/seatunnel/config/hazelcast-worker.yaml" name: seatunnel-configs subPath: hazelcast-worker.yaml - mountPath: "/opt/seatunnel/config/seatunnel.yaml" name: seatunnel-configs subPath: seatunnel.yaml - mountPath: "/opt/seatunnel/config/plugin_config" name: seatunnel-configs subPath: plugin_config livenessProbe: httpGet: path: /hazelcast/health/node-state port: 5801 scheme: HTTP initialDelaySeconds: 30 periodSeconds: 10 timeoutSeconds: 5 readinessProbe: httpGet: path: /hazelcast/health/node-state port: 5801 scheme: HTTP initialDelaySeconds: 15 periodSeconds: 5 timeoutSeconds: 3 volumes: - name: seatunnel-configs configMap: name: seatunnel-cluster-configs ``` ### Error Exception ```log No explicit error exceptions, stack traces, or error logs were thrown during the execution of the SeaTunnel job. The SeaTunnel job starts successfully, runs with a "RUNNING" status in the Kubernetes cluster (verified via kubectl get pods and SeaTunnel logs), and continuously outputs normal synchronization logs (e.g., "CDC binlog read successfully", "Message sent to RocketMQ"). There are no error-level logs such as connection failures, serialization exceptions, or sink write errors in the Pod logs (kubectl logs <seatunnel-pod-name>) or SeaTunnel job logs. ``` ### Zeta or Flink or Spark Version Zeta 2.3.10 ### Java or Scala Version _No response_ ### Screenshots <img width="2452" height="333" alt="Image" src="https://github.com/user-attachments/assets/60384145-f6d8-4924-84f8-12c56880eeab" /> <img width="1658" height="694" alt="Image" src="https://github.com/user-attachments/assets/106a3d87-2b29-4278-ba3a-778b4c1b2713" /> <img width="1705" height="791" alt="Image" src="https://github.com/user-attachments/assets/b133b060-160b-4bcc-8dfb-04c9a636fce1" /> ### Are you willing to submit PR? - [x] Yes I am willing to submit a PR! ### Code of Conduct - [x] I agree to follow this project's [Code of Conduct](https://www.apache.org/foundation/policies/conduct) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
