This is an automated email from the ASF dual-hosted git repository.

jinsongzhou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/amoro.git


The following commit(s) were added to refs/heads/master by this push:
     new 49989babc [AMORO-3365]The table is always in committing state (#3366)
49989babc is described below

commit 49989babc96bff948fc230407c629dfbc3f5666d
Author: 7hong <[email protected]>
AuthorDate: Thu Jan 23 14:47:50 2025 +0800

    [AMORO-3365]The table is always in committing state (#3366)
    
    * [AMORO-3365]The table is always in committing state
    
    * Close the committing process on start
    
    * Fix unit tests
    
    Fix unit tests
    
    Fix unit tests
    
    Fix unit tests
    
    ---------
    
    Co-authored-by: Congxian Qiu <[email protected]>
---
 .../amoro/server/optimizing/OptimizingQueue.java   | 29 +++++++++++++++++++---
 .../amoro/server/TestDefaultOptimizingService.java |  8 ++++--
 2 files changed, 31 insertions(+), 6 deletions(-)

diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/OptimizingQueue.java
 
b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/OptimizingQueue.java
index db10d9a8b..2c8640d05 100644
--- 
a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/OptimizingQueue.java
+++ 
b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/OptimizingQueue.java
@@ -23,6 +23,7 @@ import org.apache.amoro.OptimizerProperties;
 import org.apache.amoro.ServerTableIdentifier;
 import org.apache.amoro.api.OptimizingTaskId;
 import org.apache.amoro.exception.OptimizingClosedException;
+import org.apache.amoro.exception.PersistenceException;
 import org.apache.amoro.optimizing.MetricsSummary;
 import org.apache.amoro.optimizing.OptimizingType;
 import org.apache.amoro.optimizing.RewriteFilesInput;
@@ -120,9 +121,20 @@ public class OptimizingQueue extends PersistentBase {
     if (tableRuntime.isOptimizingEnabled()) {
       tableRuntime.resetTaskQuotas(
           System.currentTimeMillis() - 
AmoroServiceConstants.QUOTA_LOOK_BACK_TIME);
+      // Close the committing process to avoid duplicate commit on the table.
+      if (tableRuntime.getOptimizingStatus() == OptimizingStatus.COMMITTING) {
+        OptimizingProcess process = tableRuntime.getOptimizingProcess();
+        if (process != null) {
+          LOG.warn(
+              "Close the committing process {} on table {}",
+              process.getProcessId(),
+              tableRuntime.getTableIdentifier());
+          process.close();
+        }
+      }
       if (!tableRuntime.getOptimizingStatus().isProcessing()) {
         scheduler.addTable(tableRuntime);
-      } else if (tableRuntime.getOptimizingStatus() != 
OptimizingStatus.COMMITTING) {
+      } else {
         tableQueue.offer(new TableOptimizingProcess(tableRuntime));
       }
     } else {
@@ -569,6 +581,10 @@ public class OptimizingQueue extends PersistentBase {
       try {
         if (hasCommitted) {
           LOG.warn("{} has already committed, give up", 
tableRuntime.getTableIdentifier());
+          try {
+            persistAndSetCompleted(status == ProcessStatus.SUCCESS);
+          } catch (Exception ignored) {
+          }
           throw new IllegalStateException("repeat commit, and last error " + 
failedReason);
         }
         try {
@@ -577,10 +593,15 @@ public class OptimizingQueue extends PersistentBase {
           status = ProcessStatus.SUCCESS;
           endTime = System.currentTimeMillis();
           persistAndSetCompleted(true);
-        } catch (Exception e) {
-          LOG.error("{} Commit optimizing failed ", 
tableRuntime.getTableIdentifier(), e);
+        } catch (PersistenceException e) {
+          LOG.warn(
+              "{} failed to persist process completed, will retry next commit",
+              tableRuntime.getTableIdentifier(),
+              e);
+        } catch (Throwable t) {
+          LOG.error("{} Commit optimizing failed ", 
tableRuntime.getTableIdentifier(), t);
           status = ProcessStatus.FAILED;
-          failedReason = ExceptionUtil.getErrorMessage(e, 4000);
+          failedReason = ExceptionUtil.getErrorMessage(t, 4000);
           endTime = System.currentTimeMillis();
           persistAndSetCompleted(false);
         }
diff --git 
a/amoro-ams/src/test/java/org/apache/amoro/server/TestDefaultOptimizingService.java
 
b/amoro-ams/src/test/java/org/apache/amoro/server/TestDefaultOptimizingService.java
index 9f17b8a4c..b68660a9c 100644
--- 
a/amoro-ams/src/test/java/org/apache/amoro/server/TestDefaultOptimizingService.java
+++ 
b/amoro-ams/src/test/java/org/apache/amoro/server/TestDefaultOptimizingService.java
@@ -325,8 +325,12 @@ public class TestDefaultOptimizingService extends 
AMSTableTestBase {
     optimizingService().completeTask(token, 
buildOptimizingTaskResult(task.getTaskId()));
 
     reload();
-    assertTaskCompleted(null);
-    Assertions.assertNull(optimizingService().pollTask(token, THREAD_ID));
+    // Committing process will be closed when reloading
+    Assertions.assertNull(
+        
tableService().getRuntime(serverTableIdentifier().getId()).getOptimizingProcess());
+    Assertions.assertEquals(
+        OptimizingStatus.IDLE,
+        
tableService().getRuntime(serverTableIdentifier().getId()).getOptimizingStatus());
   }
 
   @Test

Reply via email to