This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new d0654a331b3 [fix](iceberg) Fix transaction issues (#52716)
d0654a331b3 is described below
commit d0654a331b3943a8fb8510160f87aa519519dc82
Author: wuwenchi <[email protected]>
AuthorDate: Sun Jul 13 08:52:51 2025 +0800
[fix](iceberg) Fix transaction issues (#52716)
### What problem does this PR solve?
Problem Summary:
1. In a transaction, the transaction API should be used, and commits
should not be made when finishInsert is called.
2. The ExecutorService for ReplacePartitions was omitted.
Co-authored-by: wuwenchi.wwc <[email protected]>
---
.../datasource/iceberg/IcebergTransaction.java | 24 +++++++++++++---------
1 file changed, 14 insertions(+), 10 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergTransaction.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergTransaction.java
index 95cf6c36a3b..9160c3012b5 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergTransaction.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergTransaction.java
@@ -108,8 +108,8 @@ public class IcebergTransaction implements Transaction {
}
private void updateManifestAfterInsert(TUpdateMode updateMode) {
- PartitionSpec spec = table.spec();
- FileFormat fileFormat = IcebergUtils.getFileFormat(table);
+ PartitionSpec spec = transaction.table().spec();
+ FileFormat fileFormat =
IcebergUtils.getFileFormat(transaction.table());
List<WriteResult> pendingResults;
if (commitDataList.isEmpty()) {
@@ -122,9 +122,9 @@ public class IcebergTransaction implements Transaction {
}
if (updateMode == TUpdateMode.APPEND) {
- commitAppendTxn(table, pendingResults);
+ commitAppendTxn(pendingResults);
} else {
- commitReplaceTxn(table, pendingResults);
+ commitReplaceTxn(pendingResults);
}
}
@@ -143,9 +143,9 @@ public class IcebergTransaction implements Transaction {
return
commitDataList.stream().mapToLong(TIcebergCommitData::getRowCount).sum();
}
- private void commitAppendTxn(Table table, List<WriteResult>
pendingResults) {
+ private void commitAppendTxn(List<WriteResult> pendingResults) {
// commit append files.
- AppendFiles appendFiles =
table.newAppend().scanManifestsWith(ops.getThreadPoolWithPreAuth());
+ AppendFiles appendFiles =
transaction.newAppend().scanManifestsWith(ops.getThreadPoolWithPreAuth());
for (WriteResult result : pendingResults) {
Preconditions.checkState(result.referencedDataFiles().length == 0,
"Should have no referenced data files for append.");
@@ -155,13 +155,15 @@ public class IcebergTransaction implements Transaction {
}
- private void commitReplaceTxn(Table table, List<WriteResult>
pendingResults) {
+ private void commitReplaceTxn(List<WriteResult> pendingResults) {
if (pendingResults.isEmpty()) {
// such as : insert overwrite table `dst_tb` select * from
`empty_tb`
// 1. if dst_tb is a partitioned table, it will return directly.
// 2. if dst_tb is an unpartitioned table, the `dst_tb` table will
be emptied.
- if (!table.spec().isPartitioned()) {
- OverwriteFiles overwriteFiles =
table.newOverwrite().scanManifestsWith(ops.getThreadPoolWithPreAuth());
+ if (!transaction.table().spec().isPartitioned()) {
+ OverwriteFiles overwriteFiles = transaction
+ .newOverwrite()
+ .scanManifestsWith(ops.getThreadPoolWithPreAuth());
try (CloseableIterable<FileScanTask> fileScanTasks =
table.newScan().planFiles()) {
fileScanTasks.forEach(f ->
overwriteFiles.deleteFile(f.file()));
} catch (IOException e) {
@@ -173,7 +175,9 @@ public class IcebergTransaction implements Transaction {
}
// commit replace partitions
- ReplacePartitions appendPartitionOp = table.newReplacePartitions();
+ ReplacePartitions appendPartitionOp = transaction
+ .newReplacePartitions()
+ .scanManifestsWith(ops.getThreadPoolWithPreAuth());
for (WriteResult result : pendingResults) {
Preconditions.checkState(result.referencedDataFiles().length == 0,
"Should have no referenced data files.");
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]