qiaoborui opened a new issue, #9806: URL: https://github.com/apache/seatunnel/issues/9806
### Search before asking - [x] I had searched in the [issues](https://github.com/apache/seatunnel/issues?q=is%3Aissue+label%3A%22bug%22) and found no similar issues. ### What happened When using MySQL-CDC as source and Elasticsearch as sink, timestamp fields from MySQL are serialized with ISO format (containing 'T') which causes format mismatch with Elasticsearch date fields. Root Cause In ElasticsearchRowSerializer.java line 204-206, the convertValue method handles Temporal objects by directly calling toString(): ```java if (value instanceof Temporal) { // jackson not support jdk8 new time api return value.toString(); } ```` MySQL-CDC converts timestamp fields to LocalDateTime objects. When serialized, LocalDateTime.toString() produces ISO format like "2023-01-01T12:34:56", but Elasticsearch date fields may expect different formats like "2023-01-01 12:34:56". ## Steps to Reproduce 1. Set up MySQL-CDC source with a table containing timestamp fields 2. Configure Elasticsearch sink with date field mapping 3. Run the pipeline 4. Observe timestamp format mismatch errors in Elasticsearch ## Suggested Solution Modify the convertValue method in ElasticsearchRowSerializer.java to handle LocalDateTime specifically: private Object convertValue(String fieldName, Object value) { if (value instanceof Temporal) { if (value instanceof LocalDateTime) { // Format for ES date field compatibility return ((LocalDateTime) value).format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")); } // jackson not support jdk8 new time api - keep original logic for other Temporal types return value.toString(); } // ... rest of the logic remains unchanged } ### SeaTunnel Version 2.3.11 ### SeaTunnel Config ```conf { "env": { "parallelism": 1, "job.mode": "STREAMING", "job.name": "xx-x-xx", "checkpoint.timeout": 600000 }, "source": [ { "plugin_name": "MySQL-CDC", "base-url": "jdbc:mysql://m2-m:3504/kxxx", "username": "xxxx", "password": "xxxxx", "startup.specific-offset.file": "mysql-bin.006461", "startup.specific-offset.pos": 47331234, "table-names": ["xxx.xx"], "startup.mode": "specific" } ], "sink": [ { "plugin_name": "Elasticsearch", "hosts": ["http://xxxxxx:9201"], "tls_verify_certificate": false, "tls_verify_hostname": false, "schema_save_mode": "CREATE_SCHEMA_WHEN_NOT_EXIST", "data_save_mode": "APPEND_DATA", "index": "xx-index-xx", "index_type": "doc", "primary_keys":["id"] } ] } ``` ### Running Command ```shell ./bin/seatunnel-cluster.sh -d ``` ### Error Exception ```log Timestamp fields should be formatted in a way that's compatible with Elasticsearch date field expectations. ``` ### Zeta or Flink or Spark Version _No response_ ### Java or Scala Version _No response_ ### Screenshots _No response_ ### Are you willing to submit PR? - [x] Yes I am willing to submit a PR! ### Code of Conduct - [x] I agree to follow this project's [Code of Conduct](https://www.apache.org/foundation/policies/conduct) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
