wsm-zao opened a new issue, #10120:
URL: https://github.com/apache/seatunnel/issues/10120

   ### Search before asking
   
   - [x] I had searched in the 
[issues](https://github.com/apache/seatunnel/issues?q=is%3Aissue+label%3A%22bug%22)
 and found no similar issues.
   
   
   ### What happened
   
   When using SeaTunnel's MySQL-CDC source connector to synchronize data from 
MySQL to RocketMQ sink, an unexpected duplicate message issue occurs. 
Specifically, every single UPDATE operation on the MySQL table triggers the 
generation of 3 identical (or redundant) messages in the target RocketMQ topic, 
instead of the expected 1 message per update.
   This issue persists consistently across different types of UPDATE statements 
(e.g., single-row updates, conditional updates) and is not limited to specific 
tables or data volumes. The duplicate messages contain the same business data 
(including before-update/after-update snapshots or full data records) and are 
sent to the same RocketMQ topic/queue within a short time window (milliseconds 
apart).
   The problem disrupts downstream message consumption: consumers receive 
redundant data, leading to incorrect business logic execution (e.g., repeated 
data processing, inconsistent statistics) and increased resource overhead 
(storage, network, and consumption computing costs).
   Notably, the MySQL binlog only records 1 valid UPDATE event for each 
operation (verified via mysqlbinlog tool), ruling out the possibility of 
multiple binlog events being generated from the source database. The 
duplication occurs exclusively during the SeaTunnel data synchronization 
process to RocketMQ.
   
   ### SeaTunnel Version
   
   2.3.10
   
   ### SeaTunnel Config
   
   ```conf
   {
       "env": {
           "parallelism": 1,
           "job.mode": "STREAMING",
           "job.name": "update_mq"
       },
       "source": [
           {
               "plugin_name": "MySQL-CDC",
               "server-id": "2-100",
               "base-url": "jdbc:mysql://192.168.56.200:3306/cdc_test",
               "username": "root",
               "password": "123456",
               "table-names": [
                   "cdc_test.operation_logs"
               ],
               "startup.mode": "latest"
           }
       ],
       "sink": [
           {
               "plugin_name": "Rocketmq",
               "name.srv.addr": "192.168.56.200:9876",
               "topic": "test_topic",
               "tag": "operation_logs",
               "partition.key.fields": [
                   "log_id"
               ],
               "producer.send.sync": true
           }
       ]
   }
   ```
   
   ### Running Command
   
   ```shell
   # master.yaml
   apiVersion: apps/v1
   kind: Deployment
   metadata:
     name: seatunnel-cluster-master
     namespace: seatunnel
     labels:
       app.kubernetes.io/name: seatunnel-cluster-master
       app.kubernetes.io/component: master
       app.kubernetes.io/version: 2.3.10
   spec:
     replicas: 1
     strategy:
       type: RollingUpdate
       rollingUpdate:
         maxUnavailable: 25%
         maxSurge: 50%
     selector:
       matchLabels:
         app.kubernetes.io/instance: seatunnel-cluster-app
         app.kubernetes.io/version: 2.3.10
         app.kubernetes.io/name: seatunnel-cluster-master
         app.kubernetes.io/component: master
     template:
       metadata:
         labels:
           app.kubernetes.io/instance: seatunnel-cluster-app
           app.kubernetes.io/version: 2.3.10
           app.kubernetes.io/name: seatunnel-cluster-master
           app.kubernetes.io/component: master
         annotations:
           prometheus.io/path: "/hazelcast/rest/instance/metrics"
           prometheus.io/port: "5801"
           prometheus.io/scrape: "true"
           prometheus.io/role: "seatunnel-master"
       spec:
         nodeSelector:
           seatunnel: "enable"
         containers:
         - name: seatunnel-master
           image: seatunnel-oss:v1
           imagePullPolicy: IfNotPresent
           ports:
           - containerPort: 5801
             name: hazelcast
             protocol: TCP
           - containerPort: 8080
             name: master-port
             protocol: TCP
           command:
             - /bin/sh
             - -c
             - /opt/seatunnel/bin/seatunnel-cluster.sh -r master 
-DJvmOption="-Xms256m -Xmx512m -XX:MaxMetaspaceSize=256m -XX:+UseG1GC"
           resources:
             requests:
               cpu: "0.5"
               memory: "512Mi"
             limits:
               cpu: "1"
               memory: "1Gi"
           volumeMounts:
             - mountPath: "/opt/seatunnel/config/hazelcast-master.yaml"
               name: seatunnel-configs
               subPath: hazelcast-master.yaml
             - mountPath: "/opt/seatunnel/config/hazelcast-worker.yaml"
               name: seatunnel-configs
               subPath: hazelcast-worker.yaml
             - mountPath: "/opt/seatunnel/config/seatunnel.yaml"
               name: seatunnel-configs
               subPath: seatunnel.yaml
             - mountPath: "/opt/seatunnel/config/plugin_config"
               name: seatunnel-configs
               subPath: plugin_config
           livenessProbe:
             httpGet:
               path: /hazelcast/health/node-state
               port: 5801
               scheme: HTTP
             initialDelaySeconds: 30
             periodSeconds: 10
             timeoutSeconds: 5
             failureThreshold: 3
           readinessProbe:
             httpGet:
               path: /hazelcast/health/node-state
               port: 5801
               scheme: HTTP
             initialDelaySeconds: 15
             periodSeconds: 5
             timeoutSeconds: 3
             failureThreshold: 3
         volumes:
         - name: seatunnel-configs
           configMap:
             name: seatunnel-cluster-configs
   # worker.yaml
   apiVersion: apps/v1
   kind: Deployment
   metadata:
     name: seatunnel-cluster-worker
     namespace: seatunnel
     labels:
       app.kubernetes.io/name: seatunnel-cluster-worker
       app.kubernetes.io/component: worker
       app.kubernetes.io/version: 2.3.10
   spec:
     replicas: 1
     strategy:
       type: RollingUpdate
       rollingUpdate:
         maxUnavailable: 25%
         maxSurge: 50%
     selector:
       matchLabels:
         app.kubernetes.io/instance: seatunnel-cluster-app
         app.kubernetes.io/version: 2.3.10
         app.kubernetes.io/name: seatunnel-cluster-worker
         app.kubernetes.io/component: worker
     template:
       metadata:
         labels:
           app.kubernetes.io/instance: seatunnel-cluster-app
           app.kubernetes.io/version: 2.3.10
           app.kubernetes.io/name: seatunnel-cluster-worker
           app.kubernetes.io/component: worker
         annotations:
           prometheus.io/path: "/hazelcast/rest/instance/metrics"
           prometheus.io/port: "5801"
           prometheus.io/scrape: "true"
           prometheus.io/role: "seatunnel-worker"
       spec:
         nodeSelector:
           seatunnel: "enable"
         containers:
         - name: seatunnel-worker
           image: seatunnel-oss:v1
           imagePullPolicy: IfNotPresent
           ports:
           - containerPort: 5801
             name: hazelcast
             protocol: TCP
           command:
             - /opt/seatunnel/bin/seatunnel-cluster.sh
             - -r
             - worker
           resources:
             requests:
               cpu: "1"
               memory: "1Gi"
             limits:
               cpu: "1.5"
               memory: "2Gi"
           volumeMounts:
             - mountPath: "/opt/seatunnel/config/hazelcast-master.yaml"
               name: seatunnel-configs
               subPath: hazelcast-master.yaml
             - mountPath: "/opt/seatunnel/config/hazelcast-worker.yaml"
               name: seatunnel-configs
               subPath: hazelcast-worker.yaml
             - mountPath: "/opt/seatunnel/config/seatunnel.yaml"
               name: seatunnel-configs
               subPath: seatunnel.yaml
             - mountPath: "/opt/seatunnel/config/plugin_config"
               name: seatunnel-configs
               subPath: plugin_config
           livenessProbe:
             httpGet:
               path: /hazelcast/health/node-state
               port: 5801
               scheme: HTTP
             initialDelaySeconds: 30
             periodSeconds: 10
             timeoutSeconds: 5
           readinessProbe:
             httpGet:
               path: /hazelcast/health/node-state
               port: 5801
               scheme: HTTP
             initialDelaySeconds: 15
             periodSeconds: 5
             timeoutSeconds: 3
         volumes:
         - name: seatunnel-configs
           configMap:
             name: seatunnel-cluster-configs
   ```
   
   ### Error Exception
   
   ```log
   No explicit error exceptions, stack traces, or error logs were thrown during 
the execution of the SeaTunnel job.
   The SeaTunnel job starts successfully, runs with a "RUNNING" status in the 
Kubernetes cluster (verified via kubectl get pods and SeaTunnel logs), and 
continuously outputs normal synchronization logs (e.g., "CDC binlog read 
successfully", "Message sent to RocketMQ"). There are no error-level logs such 
as connection failures, serialization exceptions, or sink write errors in the 
Pod logs (kubectl logs <seatunnel-pod-name>) or SeaTunnel job logs.
   ```
   
   ### Zeta or Flink or Spark Version
   
   Zeta 2.3.10
   
   ### Java or Scala Version
   
   _No response_
   
   ### Screenshots
   
   <img width="2452" height="333" alt="Image" 
src="https://github.com/user-attachments/assets/60384145-f6d8-4924-84f8-12c56880eeab";
 />
   
   <img width="1658" height="694" alt="Image" 
src="https://github.com/user-attachments/assets/106a3d87-2b29-4278-ba3a-778b4c1b2713";
 />
   
   <img width="1705" height="791" alt="Image" 
src="https://github.com/user-attachments/assets/b133b060-160b-4bcc-8dfb-04c9a636fce1";
 />
   
   ### Are you willing to submit PR?
   
   - [x] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [x] I agree to follow this project's [Code of 
Conduct](https://www.apache.org/foundation/policies/conduct)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to