This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 6cacf03c18 [hotfix] Refactor code in FileStoreCommitImpl
6cacf03c18 is described below

commit 6cacf03c18148fc82b3f96d5d87b3945c9612bdc
Author: JingsongLi <[email protected]>
AuthorDate: Mon Jun 9 12:07:42 2025 +0800

    [hotfix] Refactor code in FileStoreCommitImpl
---
 .../paimon/operation/FileStoreCommitImpl.java      | 73 ++++++++++++----------
 .../org/apache/paimon/schema/SchemaManager.java    |  4 ++
 2 files changed, 43 insertions(+), 34 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
index f85bf6b2c7..852776d2fe 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
@@ -713,22 +713,22 @@ public class FileStoreCommitImpl implements 
FileStoreCommit {
         }
         if (!commitMessages.isEmpty()) {
             List<String> msg = new ArrayList<>();
-            if (appendTableFiles.size() > 0) {
+            if (!appendTableFiles.isEmpty()) {
                 msg.add(appendTableFiles.size() + " append table files");
             }
-            if (appendChangelog.size() > 0) {
+            if (!appendChangelog.isEmpty()) {
                 msg.add(appendChangelog.size() + " append Changelogs");
             }
-            if (compactTableFiles.size() > 0) {
+            if (!compactTableFiles.isEmpty()) {
                 msg.add(compactTableFiles.size() + " compact table files");
             }
-            if (compactChangelog.size() > 0) {
+            if (!compactChangelog.isEmpty()) {
                 msg.add(compactChangelog.size() + " compact Changelogs");
             }
-            if (appendHashIndexFiles.size() > 0) {
+            if (!appendHashIndexFiles.isEmpty()) {
                 msg.add(appendHashIndexFiles.size() + " append hash index 
files");
             }
-            if (compactDvIndexFiles.size() > 0) {
+            if (!compactDvIndexFiles.isEmpty()) {
                 msg.add(compactDvIndexFiles.size() + " compact dv index 
files");
             }
             LOG.info("Finished collecting changes, including: {}", 
String.join(", ", msg));
@@ -989,7 +989,10 @@ public class FileStoreCommitImpl implements 
FileStoreCommit {
                         indexManifestFile.writeIndexFiles(oldIndexManifest, 
indexFiles, bucketMode);
             }
 
-            long latestSchemaId = schemaManager.latest().get().id();
+            long latestSchemaId =
+                    schemaManager
+                            .latestOrThrow("Cannot get latest schema for table 
" + tableName)
+                            .id();
 
             // write new stats or inherit from the previous snapshot
             String statsFileName = null;
@@ -1261,8 +1264,7 @@ public class FileStoreCommitImpl implements 
FileStoreCommit {
                                     baseCommitUser,
                                     baseEntries,
                                     changes,
-                                    e,
-                                    50);
+                                    e);
                     LOG.warn("", conflictException.getLeft());
                     throw conflictException.getRight();
                 };
@@ -1310,8 +1312,7 @@ public class FileStoreCommitImpl implements 
FileStoreCommit {
                                     baseCommitUser,
                                     baseEntries,
                                     changes,
-                                    null,
-                                    50);
+                                    null);
 
                     LOG.warn("", conflictException.getLeft());
                     throw conflictException.getRight();
@@ -1332,29 +1333,33 @@ public class FileStoreCommitImpl implements 
FileStoreCommit {
                         tableName);
             }
         } catch (Throwable e) {
-            if (partitionExpire != null && 
partitionExpire.isValueExpiration()) {
-                Set<BinaryRow> deletedPartitions = new HashSet<>();
-                for (SimpleFileEntry entry : mergedEntries) {
-                    if (entry.kind() == FileKind.DELETE) {
-                        deletedPartitions.add(entry.partition());
-                    }
-                }
-                if (partitionExpire.isValueAllExpired(deletedPartitions)) {
-                    List<String> expiredPartitions =
-                            deletedPartitions.stream()
-                                    .map(
-                                            partition ->
-                                                    partToSimpleString(
-                                                            partitionType, 
partition, "-", 200))
-                                    .collect(Collectors.toList());
-                    throw new RuntimeException(
-                            "You are writing data to expired partitions, and 
you can filter this data to avoid job failover."
-                                    + " Otherwise, continuous expired records 
will cause the job to failover restart continuously."
-                                    + " Expired partitions are: "
-                                    + expiredPartitions);
+            assertConflictForPartitionExpire(mergedEntries);
+            conflictHandler.accept(e);
+        }
+    }
+
+    private void assertConflictForPartitionExpire(Collection<SimpleFileEntry> 
mergedEntries) {
+        if (partitionExpire != null && partitionExpire.isValueExpiration()) {
+            Set<BinaryRow> deletedPartitions = new HashSet<>();
+            for (SimpleFileEntry entry : mergedEntries) {
+                if (entry.kind() == FileKind.DELETE) {
+                    deletedPartitions.add(entry.partition());
                 }
             }
-            conflictHandler.accept(e);
+            if (partitionExpire.isValueAllExpired(deletedPartitions)) {
+                List<String> expiredPartitions =
+                        deletedPartitions.stream()
+                                .map(
+                                        partition ->
+                                                partToSimpleString(
+                                                        partitionType, 
partition, "-", 200))
+                                .collect(Collectors.toList());
+                throw new RuntimeException(
+                        "You are writing data to expired partitions, and you 
can filter this data to avoid job failover."
+                                + " Otherwise, continuous expired records will 
cause the job to failover restart continuously."
+                                + " Expired partitions are: "
+                                + expiredPartitions);
+            }
         }
     }
 
@@ -1368,8 +1373,7 @@ public class FileStoreCommitImpl implements 
FileStoreCommit {
             String baseCommitUser,
             List<SimpleFileEntry> baseEntries,
             List<SimpleFileEntry> changes,
-            Throwable cause,
-            int maxEntry) {
+            Throwable cause) {
         String possibleCauses =
                 String.join(
                         "\n",
@@ -1414,6 +1418,7 @@ public class FileStoreCommitImpl implements 
FileStoreCommit {
                         cause);
 
         RuntimeException simplifiedException;
+        int maxEntry = 50;
         if (baseEntries.size() > maxEntry || changes.size() > maxEntry) {
             baseEntriesString =
                     "Base entries are:\n"
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java 
b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
index 21484cfc84..36a18e26a8 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
@@ -119,6 +119,10 @@ public class SchemaManager implements Serializable {
         }
     }
 
+    public TableSchema latestOrThrow(String message) {
+        return latest().orElseThrow(() -> new RuntimeException(message));
+    }
+
     public long earliestCreationTime() {
         try {
             long earliest = 0;

Reply via email to