lovezhou1990 opened a new issue, #6335:
URL: https://github.com/apache/seatunnel/issues/6335

   ### Search before asking
   
   - [X] I had searched in the 
[issues](https://github.com/apache/seatunnel/issues?q=is%3Aissue+label%3A%22bug%22)
 and found no similar issues.
   
   
   ### What happened
   
   Scenario: Real time writing of MySQL changed data to the ES end (MySQL 
cdc>>ES)
   Problem description: If the change data does not reach 10 pieces (default 
batch submission number for ES sink), it will not be written to ES. If the 
change reaches 10 pieces, it will be written to ES. (There is no problem at the 
beginning of the task, even if one piece of data is changed, it will be 
synchronized to ES)
   Upon self investigation, it was found that:
   1> As long as there is a data change, it will enter the code: `org. apache. 
seatunnel. connectors. seatunnel. Elasticsearch. sink. ElasticsearchSinkWriter 
# write.` However, it is determined that if the submission count is not 
reached, the` org. apache. seatunnel. connectors. seatunnel. Elasticsearch. 
sink. ElasticsearchSinkWriter # bulkEsWithTry` method will not be executed
   
   ### SeaTunnel Version
   
   2.3.3
   
   ### SeaTunnel Config
   
   ```conf
   env {
     # You can set flink configuration here
     execution.parallelism = 1
     job.name = "st功能调试"
     job.mode = "STREAMING"
     checkpoint.interval = 2000
     #execution.checkpoint.interval = 10000
     #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
   }
   
   source {
     MySQL-CDC {
       result_table_name = "company"
       parallelism = 1
       username = "xxxx"
       password = "xxxx"
       startup.mode = "latest"
       database-names =["test3_bigdata"]
       table-names = ["test3_bigdata.company"]
       base-url = 
"jdbc:mysql://xxx:3306/test3_bigdata?useUnicode=true&characterEncoding=utf8&serverTimezone=GMT%2B8&rewriteBatchedStatements=true&zeroDateTimeBehavior=CONVERT_TO_NULL"
     }
   }
   
   sink {
     Elasticsearch {
         source_table_name = "company"
         hosts = ["http://xxx:19200";]
         index = "changelog"
       }
   }
   ```
   
   
   ### Running Command
   
   ```shell
   org.apache.seatunnel.example.engine.SeaTunnelEngineServerExample
   
   org.apache.seatunnel.example.engine.SeaTunnelEngineExample
   
   by source execution
   ```
   
   
   ### Error Exception
   
   ```log
   no
   ```
   
   
   ### Zeta or Flink or Spark Version
   
   zeta 
   
   ### Java or Scala Version
   
   _No response_
   
   ### Screenshots
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://www.apache.org/foundation/policies/conduct)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to