Hisoka-X commented on code in PR #9661:
URL: https://github.com/apache/seatunnel/pull/9661#discussion_r2253010417
##########
seatunnel-connectors-v2/connector-databend/src/main/java/org/apache/seatunnel/connectors/seatunnel/databend/sink/DatabendSinkWriter.java:
##########
@@ -110,48 +146,179 @@ public DatabendSinkWriter(
e);
}
} else {
- // use the catalog table schema to create the target table
- SeaTunnelRowType rowType = catalogTable.getSeaTunnelRowType();
- if (rowType == null || rowType.getFieldNames().length == 0) {
- throw new DatabendConnectorException(
- DatabendConnectorErrorCode.SCHEMA_NOT_FOUND,
- "Source table schema is empty or null");
- }
-
try {
- if (!tableExists(database, table)) {
- log.info(
- "Target table {}.{} does not exist, creating with
source schema",
- database,
- table);
- createTable(database, table, rowType);
+ if (isCdcMode) {
+ // Initialize CDC mode
+ initCdcMode(database);
} else {
- log.info("Target table {}.{} exists, verifying schema",
database, table);
- verifyTableSchema(database, table, rowType);
+ // Traditional mode
+ initTraditionalMode(database, table);
}
} catch (SQLException e) {
throw new DatabendConnectorException(
DatabendConnectorErrorCode.SQL_OPERATION_FAILED,
- "Failed to verify/create target table: " +
e.getMessage(),
+ "Failed to initialize sink writer: " + e.getMessage(),
e);
}
+ }
+ }
- this.insertSql = generateInsertSql(database, table, rowType);
- log.info("Generated insert SQL: {}", insertSql);
- try {
- this.schemaChangeManager = new
SchemaChangeManager(databendSinkConfig);
- this.preparedStatement =
connection.prepareStatement(insertSql);
- this.preparedStatement.setQueryTimeout(executeTimeoutSec);
- log.info("PreparedStatement created successfully");
- } catch (SQLException e) {
- throw new DatabendConnectorException(
- DatabendConnectorErrorCode.SQL_OPERATION_FAILED,
- "Failed to prepare statement: " + e.getMessage(),
- e);
- }
+ private String getCurrentTimestamp() {
+ return
LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyyMMddHHmmss"));
+ }
+
+ private void initCdcMode(String database) throws SQLException {
+ // Create raw table
+ createRawTable(database);
Review Comment:
You can do merge in the AggregatedCommitter
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]