Hisoka-X commented on code in PR #9661:
URL: https://github.com/apache/seatunnel/pull/9661#discussion_r2252966464


##########
seatunnel-connectors-v2/connector-databend/src/main/java/org/apache/seatunnel/connectors/seatunnel/databend/sink/DatabendSinkWriter.java:
##########
@@ -110,48 +146,179 @@ public DatabendSinkWriter(
                         e);
             }
         } else {
-            // use the catalog table schema to create the target table
-            SeaTunnelRowType rowType = catalogTable.getSeaTunnelRowType();
-            if (rowType == null || rowType.getFieldNames().length == 0) {
-                throw new DatabendConnectorException(
-                        DatabendConnectorErrorCode.SCHEMA_NOT_FOUND,
-                        "Source table schema is empty or null");
-            }
-
             try {
-                if (!tableExists(database, table)) {
-                    log.info(
-                            "Target table {}.{} does not exist, creating with 
source schema",
-                            database,
-                            table);
-                    createTable(database, table, rowType);
+                if (isCdcMode) {
+                    // Initialize CDC mode
+                    initCdcMode(database);
                 } else {
-                    log.info("Target table {}.{} exists, verifying schema", 
database, table);
-                    verifyTableSchema(database, table, rowType);
+                    // Traditional mode
+                    initTraditionalMode(database, table);
                 }
             } catch (SQLException e) {
                 throw new DatabendConnectorException(
                         DatabendConnectorErrorCode.SQL_OPERATION_FAILED,
-                        "Failed to verify/create target table: " + 
e.getMessage(),
+                        "Failed to initialize sink writer: " + e.getMessage(),
                         e);
             }
+        }
+    }
 
-            this.insertSql = generateInsertSql(database, table, rowType);
-            log.info("Generated insert SQL: {}", insertSql);
-            try {
-                this.schemaChangeManager = new 
SchemaChangeManager(databendSinkConfig);
-                this.preparedStatement = 
connection.prepareStatement(insertSql);
-                this.preparedStatement.setQueryTimeout(executeTimeoutSec);
-                log.info("PreparedStatement created successfully");
-            } catch (SQLException e) {
-                throw new DatabendConnectorException(
-                        DatabendConnectorErrorCode.SQL_OPERATION_FAILED,
-                        "Failed to prepare statement: " + e.getMessage(),
-                        e);
-            }
+    private String getCurrentTimestamp() {
+        return 
LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyyMMddHHmmss"));
+    }
+
+    private void initCdcMode(String database) throws SQLException {
+        // Create raw table
+        createRawTable(database);

Review Comment:
   if the parallelism value more than 1. The DatabendSinkWriter will more than 
one instance. So the initCdcMode will be invoked more than once. Shall we 
create one table for each writer?



##########
seatunnel-connectors-v2/connector-databend/src/main/java/org/apache/seatunnel/connectors/seatunnel/databend/sink/DatabendSinkWriter.java:
##########
@@ -110,48 +146,179 @@ public DatabendSinkWriter(
                         e);
             }
         } else {
-            // use the catalog table schema to create the target table
-            SeaTunnelRowType rowType = catalogTable.getSeaTunnelRowType();
-            if (rowType == null || rowType.getFieldNames().length == 0) {
-                throw new DatabendConnectorException(
-                        DatabendConnectorErrorCode.SCHEMA_NOT_FOUND,
-                        "Source table schema is empty or null");
-            }
-
             try {
-                if (!tableExists(database, table)) {
-                    log.info(
-                            "Target table {}.{} does not exist, creating with 
source schema",
-                            database,
-                            table);
-                    createTable(database, table, rowType);
+                if (isCdcMode) {
+                    // Initialize CDC mode
+                    initCdcMode(database);
                 } else {
-                    log.info("Target table {}.{} exists, verifying schema", 
database, table);
-                    verifyTableSchema(database, table, rowType);
+                    // Traditional mode
+                    initTraditionalMode(database, table);
                 }
             } catch (SQLException e) {
                 throw new DatabendConnectorException(
                         DatabendConnectorErrorCode.SQL_OPERATION_FAILED,
-                        "Failed to verify/create target table: " + 
e.getMessage(),
+                        "Failed to initialize sink writer: " + e.getMessage(),
                         e);
             }
+        }
+    }
 
-            this.insertSql = generateInsertSql(database, table, rowType);
-            log.info("Generated insert SQL: {}", insertSql);
-            try {
-                this.schemaChangeManager = new 
SchemaChangeManager(databendSinkConfig);
-                this.preparedStatement = 
connection.prepareStatement(insertSql);
-                this.preparedStatement.setQueryTimeout(executeTimeoutSec);
-                log.info("PreparedStatement created successfully");
-            } catch (SQLException e) {
-                throw new DatabendConnectorException(
-                        DatabendConnectorErrorCode.SQL_OPERATION_FAILED,
-                        "Failed to prepare statement: " + e.getMessage(),
-                        e);
-            }
+    private String getCurrentTimestamp() {
+        return 
LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyyMMddHHmmss"));
+    }
+
+    private void initCdcMode(String database) throws SQLException {
+        // Create raw table
+        createRawTable(database);
+
+        // Create stream on raw table
+        createStream(database);
+
+        // Prepare statement for inserting into raw table
+        String insertRawSql = generateInsertRawSql(database);
+        this.cdcPreparedStatement = connection.prepareStatement(insertRawSql);
+        this.cdcPreparedStatement.setQueryTimeout(executeTimeoutSec);
+        log.info("CDC PreparedStatement created: {}", insertRawSql);
+
+        // Start merge scheduler
+        startMergeScheduler();

Review Comment:
   we can do merge in the prepareCommit method. No need create an scheduler.



##########
seatunnel-connectors-v2/connector-databend/src/main/java/org/apache/seatunnel/connectors/seatunnel/databend/config/DatabendSinkOptions.java:
##########
@@ -47,4 +47,28 @@ public class DatabendSinkOptions {
                     .intType()
                     .defaultValue(300)
                     .withDescription("The timeout seconds for Databend client 
execution");
+
+    public static final Option<Integer> BATCH_SIZE =
+            Options.key("batch_size")
+                    .intType()
+                    .defaultValue(1000)
+                    .withDescription("Batch size for CDC merge operations");
+
+    public static final Option<Integer> INTERVAL =
+            Options.key("interval")
+                    .intType()
+                    .defaultValue(30)
+                    .withDescription("Interval in seconds for CDC merge 
operations");
+
+    public static final Option<String> CONFLICT_KEY =
+            Options.key("conflict_key")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("Conflict key for CDC merge operations");
+
+    public static final Option<Boolean> ALLOW_DELETE =
+            Options.key("allow_delete")

Review Comment:
   ```suggestion
               Options.key("enable_delete")
   ```



##########
seatunnel-connectors-v2/connector-databend/src/main/java/org/apache/seatunnel/connectors/seatunnel/databend/config/DatabendOptions.java:
##########
@@ -97,4 +97,10 @@ public class DatabendOptions {
                     .booleanType()
                     .defaultValue(true)
                     .withDescription("Whether to auto commit for sink");
+
+    public static final Option<String> CONFLICT_KEY =
+            Options.key("conflict_key")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("The conflict key for sink, used in 
upsert mode");

Review Comment:
   why not get primary key as conflict_key from databend?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to