This is an automated email from the ASF dual-hosted git repository.
zhouyao2023 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 48968fb4e9 [Fix][Connector-V2][Databend] Ensure CDC final merge on
committer close (#10349)
48968fb4e9 is described below
commit 48968fb4e93246c2ddc5b53617373faa0cf676b8
Author: yzeng1618 <[email protected]>
AuthorDate: Fri Jan 23 16:11:16 2026 +0800
[Fix][Connector-V2][Databend] Ensure CDC final merge on committer close
(#10349)
Co-authored-by: zengyi <[email protected]>
---
.../sink/DatabendSinkAggregatedCommitter.java | 53 ++++++++++++++++++----
1 file changed, 45 insertions(+), 8 deletions(-)
diff --git
a/seatunnel-connectors-v2/connector-databend/src/main/java/org/apache/seatunnel/connectors/seatunnel/databend/sink/DatabendSinkAggregatedCommitter.java
b/seatunnel-connectors-v2/connector-databend/src/main/java/org/apache/seatunnel/connectors/seatunnel/databend/sink/DatabendSinkAggregatedCommitter.java
index 0f55132fa9..8a22e78653 100644
---
a/seatunnel-connectors-v2/connector-databend/src/main/java/org/apache/seatunnel/connectors/seatunnel/databend/sink/DatabendSinkAggregatedCommitter.java
+++
b/seatunnel-connectors-v2/connector-databend/src/main/java/org/apache/seatunnel/connectors/seatunnel/databend/sink/DatabendSinkAggregatedCommitter.java
@@ -59,6 +59,7 @@ public class DatabendSinkAggregatedCommitter
private Connection connection;
private boolean isCdcMode;
+ private volatile boolean aborted;
// Store catalog table to access schema information
private CatalogTable catalogTable;
@@ -130,14 +131,21 @@ public class DatabendSinkAggregatedCommitter
List<DatabendSinkAggregatedCommitInfo> aggregatedCommitInfos)
throws IOException {
// Perform final merge operation in CDC mode only when necessary
if (isCdcMode) {
- performMerge(aggregatedCommitInfos);
+ if (log.isDebugEnabled()) {
+ log.debug(
+ "[Instance {}] Committing aggregatedCommitInfos size:
{}",
+ instanceId,
+ aggregatedCommitInfos == null ? 0 :
aggregatedCommitInfos.size());
+ }
+ performMerge();
}
// Return empty list as there's no need to retry
return new ArrayList<>();
}
- private void performMerge(List<DatabendSinkAggregatedCommitInfo>
aggregatedCommitInfos) {
+ /** Perform merge from CDC stream to target table. */
+ private void performMerge() {
// Merge all the data from raw table to target table
String mergeSql = generateMergeSql();
log.info("[Instance {}] Executing MERGE INTO statement: {}",
instanceId, mergeSql);
@@ -214,6 +222,7 @@ public class DatabendSinkAggregatedCommitter
@Override
public void abort(List<DatabendSinkAggregatedCommitInfo>
aggregatedCommitInfos)
throws IOException {
+ aborted = true;
// In case of abort, we might want to clean up the raw table and stream
log.info("[Instance {}] Aborting Databend sink operations",
instanceId);
try {
@@ -235,16 +244,44 @@ public class DatabendSinkAggregatedCommitter
@Override
public void close() throws IOException {
+ Exception closeException = null;
try {
- if (connection != null && !connection.isClosed()) {
- connection.close();
+ if (!aborted && isCdcMode && connection != null &&
!connection.isClosed()) {
+ try {
+ log.info("[Instance {}] Performing final merge before
closing", instanceId);
+ performMerge();
+ } catch (Exception mergeEx) {
+ log.error(
+ "[Instance {}] Final merge failed, will still
close connection: {}",
+ instanceId,
+ mergeEx.getMessage(),
+ mergeEx);
+ }
}
- } catch (SQLException e) {
+ } catch (Exception e) {
+ closeException = e;
+ } finally {
+ if (connection != null) {
+ try {
+ connection.close();
+ } catch (SQLException e) {
+ if (closeException != null) {
+ closeException.addSuppressed(e);
+ } else {
+ closeException = e;
+ }
+ }
+ }
+ }
+
+ if (closeException != null) {
throw new DatabendConnectorException(
DatabendConnectorErrorCode.CONNECT_FAILED,
- "[Instance {}] Failed to close connection in
DatabendSinkAggregatedCommitter: "
- + e.getMessage(),
- e);
+ "[Instance "
+ + instanceId
+ + "] Failed to close connection in
DatabendSinkAggregatedCommitter: "
+ + closeException.getMessage(),
+ closeException);
}
}
}