This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 11797229e8 [spark] Optimize compact for data-evolution table, commit
multiple times to avoid out of memory (#6907)
11797229e8 is described below
commit 11797229e8da861a41ec856cd62db9fd0141cb01
Author: YeJunHao <[email protected]>
AuthorDate: Fri Dec 26 16:52:40 2025 +0800
[spark] Optimize compact for data-evolution table, commit multiple times to
avoid out of memory (#6907)
---
.../org/apache/paimon/spark/procedure/CompactProcedure.java | 11 +++--------
1 file changed, 3 insertions(+), 8 deletions(-)
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java
index 75a99fc9b3..7785735d04 100644
---
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java
@@ -489,7 +489,6 @@ public class CompactProcedure extends BaseProcedure {
new DataEvolutionCompactCoordinator(table, partitionPredicate,
false);
CommitMessageSerializer messageSerializerser = new
CommitMessageSerializer();
String commitUser =
createCommitUser(table.coreOptions().toConfiguration());
- List<CommitMessage> messages = new ArrayList<>();
try {
while (true) {
compactionTasks = compactCoordinator.plan();
@@ -560,13 +559,15 @@ public class CompactProcedure extends BaseProcedure {
return
messagesBytes.iterator();
});
+ List<CommitMessage> messages = new ArrayList<>();
List<byte[]> serializedMessages =
commitMessageJavaRDD.collect();
- try {
+ try (TableCommitImpl commit = table.newCommit(commitUser)) {
for (byte[] serializedMessage : serializedMessages) {
messages.add(
messageSerializerser.deserialize(
messageSerializerser.getVersion(),
serializedMessage));
}
+ commit.commit(messages);
} catch (Exception e) {
throw new RuntimeException("Deserialize commit message
failed", e);
}
@@ -574,12 +575,6 @@ public class CompactProcedure extends BaseProcedure {
} catch (EndOfScanException e) {
LOG.info("Catching EndOfScanException, the compact job is
finishing.");
}
-
- try (TableCommitImpl commit = table.newCommit(commitUser)) {
- commit.commit(messages);
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
}
private Set<BinaryRow> getHistoryPartition(