bugesoft opened a new issue, #6911:
URL: https://github.com/apache/seatunnel/issues/6911

   ### Search before asking
   
   - [X] I had searched in the 
[feature](https://github.com/apache/seatunnel/issues?q=is%3Aissue+label%3A%22Feature%22)
 and found no similar feature requirement.
   
   
   ### Description
   
   For Dameng, SQLSERVR, and MYSQL, auto-increment fields are supported.When 
inserting data into Dameng's data table in batches, the following errors may 
occur:
   Caused by: java.sql.BatchUpdateException: 仅当指定列列表,且SET 
IDENTITY_INSERT为ON时,才能对自增列赋值
           at dm.jdbc.driver.DBError.throwBatchUpdateException(SourceFile:755)
           at dm.jdbc.a.a.b(SourceFile:1138)
           ...................
           at 
org.apache.seatunnel.shade.com.zaxxer.hikari.pool.ProxyStatement.executeBatch(ProxyStatement.java:127)
           at 
org.apache.seatunnel.shade.com.zaxxer.hikari.pool.HikariProxyPreparedStatement.executeBatch(HikariProxyPreparedStatement.java)
           at 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor.FieldNamedPreparedStatement.executeBatch(FieldNamedPreparedStatement.java:533)
           at 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor.SimpleBatchStatementExecutor.executeBatch(SimpleBatchStatementExecutor.java:51)
           at 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor.BufferedBatchStatementExecutor.executeBatch(BufferedBatchStatementExecutor.java:53)
           at 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.JdbcOutputFormat.attemptFlush(JdbcOutputFormat.java:172)
           at 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.JdbcOutputFormat.flush(JdbcOutputFormat.java:136)
   
   Suggested solution(All operations use the same Connection):
   1)set IDENTITY_INSERT " + SCHEMA + "." + TABLENAME + " ON;
   2)batch insert : 
SimpleBatchStatementExecutor.executeBatch();-->JdbcSinkWriter.close()
   3)set IDENTITY_INSERT " + SCHEMA + "." + TABLENAME + " OFF;
   4)connection.close()
   
   Modify JdbcSinkWriter.close() as follows:
       tryOpen();
       String sql = "SELECT * FROM " + jdbcSinkConfig.getDatabase()+ "." + 
jdbcSinkConfig.getTable;
       Statement stmt = conn.createStatement();
       ResultSet rs = stmt.executeQuery(sql);
       ResultSetMetaData resultSetMetaData = rs.getMetaData();
       int columnCount = resultSetMetaData.getColumnCount();
       boolean foundAutoIncrement = false;
       for (int i = 1; i <= columnCount; i++) {
           if (resultSetMetaData.isAutoIncrement(i)) {
               foundAutoIncrement = true;
               System.out.println("Primary key column '" + 
resultSetMetaData.getColumnName(i) + "' is auto-incremented.");
           }
       }
       rs.close();
       stmt.close();
   
       if (foundAutoIncrement) {
           String setSql = "set IDENTITY_INSERT " + SCHEMA + "." + TABLENAME + 
" ON";
           Statement setStmt = conn.createStatement();
           setStmt.execute(setSql);
       }
   
       outputFormat.flush();
       if (foundAutoIncrement) {
           String setSql = "set IDENTITY_INSERT " + SCHEMA + "." + TABLENAME + 
" OFF";
           Statement setStmt = conn.createStatement();
           setStmt.execute(setSql);
       }
   
   ### Usage Scenario
   
   Applicable to Sink scenarios with auto-increment fields
   
   ### Related issues
   
   _No response_
   
   ### Are you willing to submit a PR?
   
   - [X] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://www.apache.org/foundation/policies/conduct)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to