yzeng1618 opened a new issue, #10092: URL: https://github.com/apache/seatunnel/issues/10092
### Search before asking - [x] I had searched in the [issues](https://github.com/apache/seatunnel/issues?q=is%3Aissue+label%3A%22bug%22) and found no similar issues. ### What happened When using a JDBC source with only `query` configured and a Doris sink with the default `save_mode_create_template`, the job fails to auto-create the Doris table even though the upstream MySQL table has a primary key. **Context** - Source: MySQL via `JDBC` connector - Sink: Doris via `Doris` connector - Engine: Flink `1.20.1`, job launched as `SeaTunnelFlink` on YARN - SeaTunnel version: `2.3.12-SNAPSHOT` (seatunnel-flink-15-starter-2.3.12-SNAPSHOT.jar) - Source table DDL (MySQL): ```sql CREATE TABLE IF NOT EXISTS test_pq_db.test_db_10 ( `id` bigint(20) AUTO_INCREMENT NOT NULL, `name` varchar(100) DEFAULT NULL, `age` int(10) DEFAULT NULL, `sex` boolean DEFAULT NULL, `address` varchar(100) DEFAULT NULL, `telephone` char(12) DEFAULT NULL, `height` float DEFAULT NULL, `weight` double DEFAULT NULL, `size` decimal(10,2) DEFAULT NULL, `ID_number` varchar(100) DEFAULT NULL, `date_time` date DEFAULT NULL, `ts` timestamp NULL, PRIMARY KEY (`id`) ); Doris sink uses the default save_mode_create_template: CREATE TABLE IF NOT EXISTS `${database}`.`${table}` ( ${rowtype_primary_key}, ${rowtype_fields} ) ENGINE=OLAP UNIQUE KEY (${rowtype_primary_key}) COMMENT '${comment}' DISTRIBUTED BY HASH (${rowtype_primary_key}) PROPERTIES ( "replication_allocation" = "tag.location.default: 1", "in_memory" = "false", "storage_format" = "V2", "disable_auto_compaction" = "false" ) <img width="2414" height="1229" alt="Image" src="https://github.com/user-attachments/assets/3a73017d-d483-4401-a3f6-7ac03cd587ff" /> ### SeaTunnel Version seatunnel 2.3.12 ### SeaTunnel Config ```conf env { execution.parallelism = 1 job.mode = "BATCH" job.name = "mysql_test" } source { Jdbc { url = "jdbc:mysql://xxxx:xxx/test_pq_db" driver = "com.mysql.jdbc.Driver" user = "root" password = "******" query = "select * from `test_pq_db`.`test_db_10` where 1 = 1" split.size = 5000 fetch_size = 2000 } } sink { Doris { fenodes = "xxxxx:8035" query-port = "9030" username = "******" password = "******" database = "test_pq_db" table = "test_db_10" doris.config { format = "json" read_json_by_line = "true" } # schema_save_mode = CREATE_SCHEMA_WHEN_NOT_EXIST (default) # data_save_mode = APPEND_DATA (default) # save_mode_create_template = <default template with ${rowtype_primary_key}> } } ``` ### Running Command ```shell # Simplified example bin/seatunnel-flink.sh \ --config mysql_to_doris.conf \ --deploy-mode run-application \ --target yarn-application ``` ### Error Exception ```log Caused by: org.apache.seatunnel.common.exception.SeaTunnelRuntimeException: ErrorCode:[API-09], ErrorDescription:[Handle save mode failed] at org.apache.seatunnel.core.starter.flink.execution.SinkExecuteProcessor.handleSaveMode(SinkExecuteProcessor.java:191) ~[seatunnel-flink-15-starter-2.3.12-SNAPSHOT.jar:2.3.12-SNAPSHOT] at org.apache.seatunnel.core.starter.flink.execution.SinkExecuteProcessor.execute(SinkExecuteProcessor.java:139) ~[seatunnel-flink-15-starter-2.3.12-SNAPSHOT.jar:2.3.12-SNAPSHOT] at org.apache.seatunnel.core.starter.flink.execution.FlinkExecution.execute(FlinkExecution.java:104) ~[seatunnel-flink-15-starter-2.3.12-SNAPSHOT.jar:2.3.12-SNAPSHOT] at org.apache.seatunnel.core.starter.flink.command.FlinkTaskExecuteCommand.execute(FlinkTaskExecuteCommand.java:61) ~[seatunnel-flink-15-starter-2.3.12-SNAPSHOT.jar:2.3.12-SNAPSHOT] at org.apache.seatunnel.core.starter.SeaTunnel.run(SeaTunnel.java:40) ~[seatunnel-flink-15-starter-2.3.12-SNAPSHOT.jar:2.3.12-SNAPSHOT] at org.apache.seatunnel.core.starter.flink.SeaTunnelFlink.main(SeaTunnelFlink.java:34) ~[seatunnel-flink-15-starter-2.3.12-SNAPSHOT.jar:2.3.12-SNAPSHOT] at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_352] at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_352] at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_352] at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_352] at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:356) ~[flink-dist-1.20.1.jar:1.20.1] at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:223) ~[flink-dist-1.20.1.jar:1.20.1] at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:113) ~[flink-dist-1.20.1.jar:1.20.1] at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:301) ~[flink-dist-1.20.1.jar:1.20.1] ... 12 more Caused by: org.apache.seatunnel.common.exception.SeaTunnelRuntimeException: ErrorCode:[COMMON-24], ErrorDescription:[The table of test_pq_db.test_db_10 has no primary keys, but the template CREATE TABLE IF NOT EXISTS `${database}`.`${table}` ( ${rowtype_primary_key}, ${rowtype_fields} ) ENGINE=OLAP UNIQUE KEY (${rowtype_primary_key}) COMMENT '${comment}' DISTRIBUTED BY HASH (${rowtype_primary_key}) PROPERTIES ( "replication_allocation" = "tag.location.default: 1", "in_memory" = "false", "storage_format" = "V2", "disable_auto_compaction" = "false" ) which has the place holder named ${rowtype_primary_key}. Please use the option named save_mode_create_template to specify sql template] ``` ### Zeta or Flink or Spark Version Flink: 1.20.1 ### Java or Scala Version Java: 1.8.0_352 ### Screenshots _No response_ ### Are you willing to submit PR? - [x] Yes I am willing to submit a PR! ### Code of Conduct - [x] I agree to follow this project's [Code of Conduct](https://www.apache.org/foundation/policies/conduct) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
