Copilot commented on code in PR #4077:
URL: https://github.com/apache/amoro/pull/4077#discussion_r3042604085


##########
amoro-ams/src/main/java/org/apache/amoro/server/scheduler/PeriodicTableScheduler.java:
##########
@@ -156,25 +175,125 @@ protected boolean shouldExecute(Long lastCleanupEndTime) 
{
     return true;
   }
 
-  private void persistUpdatingCleanupTime(TableRuntime tableRuntime) {
-    CleanupOperation cleanupOperation = getCleanupOperation();
+  @VisibleForTesting
+  public TableProcessMeta createCleanupProcessInfo(
+      TableRuntime tableRuntime, CleanupOperation cleanupOperation) {
+
     if (shouldSkipOperation(tableRuntime, cleanupOperation)) {
+      return null;
+    }
+
+    TableProcessMeta cleanProcessMeta = buildProcessMeta(tableRuntime, 
cleanupOperation);
+    persistencyHelper.beginAndPersistCleanupProcess(cleanProcessMeta);
+
+    logger.debug(
+        "Successfully persist cleanup process [processId={}, tableId={}, 
processType={}]",
+        cleanProcessMeta.getProcessId(),
+        cleanProcessMeta.getTableId(),
+        cleanProcessMeta.getProcessType());
+
+    return cleanProcessMeta;
+  }
+
+  @VisibleForTesting
+  public void persistCleanupResult(
+      TableRuntime tableRuntime,
+      CleanupOperation cleanupOperation,
+      TableProcessMeta cleanProcessMeta,
+      Throwable executionError) {
+
+    if (cleanProcessMeta == null) {
       return;
     }
 
-    try {
-      long currentTime = System.currentTimeMillis();
-      ((DefaultTableRuntime) 
tableRuntime).updateLastCleanTime(cleanupOperation, currentTime);
+    if (executionError != null) {
+      cleanProcessMeta.setStatus(ProcessStatus.FAILED);
+      String message = executionError.getMessage();
+      if (message == null) {
+        message = executionError.getClass().getName();
+      }
 
-      logger.debug(
-          "Update lastCleanTime for table {} with cleanup operation {}",
-          tableRuntime.getTableIdentifier().getTableName(),
-          cleanupOperation);
-    } catch (Exception e) {
-      logger.error(
-          "Failed to update lastCleanTime for table {}",
-          tableRuntime.getTableIdentifier().getTableName(),
-          e);
+      cleanProcessMeta.setFailMessage(message);
+    } else {

Review Comment:
   `fail_message` in `table_process` is constrained (e.g., Postgres has 
`length(fail_message) <= 4096`). Persisting `Throwable.getMessage()` without 
truncation can violate this constraint (and also loses stack trace detail). 
Consider using the existing 
`org.apache.amoro.utils.ExceptionUtil.getErrorMessage(t, 4000)` pattern used by 
optimizing to both capture stack traces and bound the stored length.



##########
amoro-ams/src/main/java/org/apache/amoro/server/scheduler/PeriodicTableScheduler.java:
##########
@@ -123,16 +136,22 @@ private void scheduleTableExecution(TableRuntime 
tableRuntime, long delay) {
   }
 
   private void executeTask(TableRuntime tableRuntime) {
+    TableProcessMeta cleanProcessMeta = null;
+    CleanupOperation cleanupOperation = null;
+    Throwable executionError = null;
+
     try {
       if (isExecutable(tableRuntime)) {
+        cleanupOperation = getCleanupOperation();
+        cleanProcessMeta = createCleanupProcessInfo(tableRuntime, 
cleanupOperation);
+
         execute(tableRuntime);
-        // Different tables take different amounts of time to execute the end 
of execute(),
-        // so you need to perform the update operation separately for each 
table.
-        persistUpdatingCleanupTime(tableRuntime);
       }
-    } catch (Exception e) {
-      logger.error("exception when schedule for table: {}", 
tableRuntime.getTableIdentifier(), e);
+    } catch (Throwable t) {
+      executionError = t;

Review Comment:
   The `catch (Throwable t)` block records the error for persistence but never 
logs it. This will silently swallow failures when `execute()` or the 
persistence calls throw, making cleanup issues hard to diagnose in production. 
Please add an error log (including table identifier and cleanup operation) and 
consider rethrowing `Error`s if you don’t want to mask fatal JVM problems.
   ```suggestion
         executionError = t;
         logger.error(
             "Failed to execute cleanup operation {} for table {}",
             cleanupOperation,
             tableRuntime.getTableIdentifier(),
             t);
         if (t instanceof Error) {
           throw (Error) t;
         }
   ```



##########
amoro-ams/src/test/java/org/apache/amoro/server/scheduler/inline/TestPeriodicTableSchedulerCleanup.java:
##########
@@ -262,4 +265,67 @@ public void testShouldExecuteTaskWithNoneOperation() {
     boolean shouldExecute = executor.shouldExecuteTaskForTest(tableRuntime, 
CleanupOperation.NONE);
     Assert.assertTrue("Should always execute with NONE operation", 
shouldExecute);
   }
+
+  /**
+   * Validates that process info is correctly saved to table_process with 
proper status transitions.
+   */
+  @Test
+  public void testCleanupProcessPersistence() {
+    long baseTableId = 200L;
+    List<CleanupOperation> operations =
+        Arrays.asList(
+            CleanupOperation.ORPHAN_FILES_CLEANING,
+            CleanupOperation.DANGLING_DELETE_FILES_CLEANING,
+            CleanupOperation.DATA_EXPIRING,
+            CleanupOperation.SNAPSHOTS_EXPIRING);
+
+    for (int i = 0; i < operations.size(); i++) {
+      long tableId = baseTableId + i;
+      CleanupOperation operation = operations.get(i);
+      prepareTestEnvironment(Collections.singletonList(tableId));
+
+      PeriodicTableSchedulerTestBase executor = createTestExecutor(operation);
+      ServerTableIdentifier identifier = createTableIdentifier(tableId);
+      DefaultTableRuntime tableRuntime = createDefaultTableRuntime(identifier);
+
+      // Scenario 1: Create process - verify initial persisted state
+      TableProcessMeta processMeta =
+          executor.createCleanupProcessInfoForTest(tableRuntime, operation);
+      TableProcessMeta persistedRunning = 
getProcessMeta(processMeta.getProcessId());
+
+      Assert.assertEquals(ProcessStatus.RUNNING, persistedRunning.getStatus());
+
+      // Scenario 2: Success completion - verify status update and timestamps
+      executor.persistCleanupResultForTest(tableRuntime, operation, 
processMeta.copy(), null);
+      TableProcessMeta persistedSuccess = 
getProcessMeta(processMeta.getProcessId());
+      Assert.assertEquals(ProcessStatus.SUCCESS, persistedSuccess.getStatus());
+      Assert.assertTrue(persistedSuccess.getCreateTime() < 
persistedSuccess.getFinishTime());

Review Comment:
   This assertion can be flaky: `createTime` and `finishTime` are both derived 
from `System.currentTimeMillis()` and may be equal when the persistence call 
happens within the same millisecond (or due to DB timestamp precision). 
Consider asserting `<=` instead, or introducing a small deterministic delay to 
guarantee `finishTime` is after `createTime`.
   ```suggestion
         Assert.assertTrue(persistedSuccess.getCreateTime() <= 
persistedSuccess.getFinishTime());
   ```



##########
amoro-ams/src/main/java/org/apache/amoro/server/scheduler/PeriodicTableScheduler.java:
##########
@@ -156,25 +175,125 @@ protected boolean shouldExecute(Long lastCleanupEndTime) 
{
     return true;
   }
 
-  private void persistUpdatingCleanupTime(TableRuntime tableRuntime) {
-    CleanupOperation cleanupOperation = getCleanupOperation();
+  @VisibleForTesting
+  public TableProcessMeta createCleanupProcessInfo(
+      TableRuntime tableRuntime, CleanupOperation cleanupOperation) {
+
     if (shouldSkipOperation(tableRuntime, cleanupOperation)) {
+      return null;
+    }
+
+    TableProcessMeta cleanProcessMeta = buildProcessMeta(tableRuntime, 
cleanupOperation);
+    persistencyHelper.beginAndPersistCleanupProcess(cleanProcessMeta);
+
+    logger.debug(
+        "Successfully persist cleanup process [processId={}, tableId={}, 
processType={}]",
+        cleanProcessMeta.getProcessId(),
+        cleanProcessMeta.getTableId(),
+        cleanProcessMeta.getProcessType());
+
+    return cleanProcessMeta;
+  }
+
+  @VisibleForTesting
+  public void persistCleanupResult(
+      TableRuntime tableRuntime,
+      CleanupOperation cleanupOperation,
+      TableProcessMeta cleanProcessMeta,
+      Throwable executionError) {
+

Review Comment:
   `createCleanupProcessInfo` / `persistCleanupResult` are marked 
`@VisibleForTesting` but exposed as `public` methods on a production scheduler 
base class. This widens the API surface unnecessarily; making them `protected` 
(or package-private) would still allow test subclasses to access them while 
reducing accidental external use.



##########
amoro-ams/src/main/java/org/apache/amoro/server/scheduler/PeriodicTableScheduler.java:
##########
@@ -156,25 +175,125 @@ protected boolean shouldExecute(Long lastCleanupEndTime) 
{
     return true;
   }
 
-  private void persistUpdatingCleanupTime(TableRuntime tableRuntime) {
-    CleanupOperation cleanupOperation = getCleanupOperation();
+  @VisibleForTesting
+  public TableProcessMeta createCleanupProcessInfo(
+      TableRuntime tableRuntime, CleanupOperation cleanupOperation) {
+
     if (shouldSkipOperation(tableRuntime, cleanupOperation)) {
+      return null;
+    }
+
+    TableProcessMeta cleanProcessMeta = buildProcessMeta(tableRuntime, 
cleanupOperation);
+    persistencyHelper.beginAndPersistCleanupProcess(cleanProcessMeta);
+
+    logger.debug(
+        "Successfully persist cleanup process [processId={}, tableId={}, 
processType={}]",
+        cleanProcessMeta.getProcessId(),
+        cleanProcessMeta.getTableId(),
+        cleanProcessMeta.getProcessType());
+
+    return cleanProcessMeta;
+  }
+
+  @VisibleForTesting
+  public void persistCleanupResult(
+      TableRuntime tableRuntime,
+      CleanupOperation cleanupOperation,
+      TableProcessMeta cleanProcessMeta,
+      Throwable executionError) {
+
+    if (cleanProcessMeta == null) {
       return;
     }
 
-    try {
-      long currentTime = System.currentTimeMillis();
-      ((DefaultTableRuntime) 
tableRuntime).updateLastCleanTime(cleanupOperation, currentTime);
+    if (executionError != null) {
+      cleanProcessMeta.setStatus(ProcessStatus.FAILED);
+      String message = executionError.getMessage();
+      if (message == null) {
+        message = executionError.getClass().getName();
+      }
 
-      logger.debug(
-          "Update lastCleanTime for table {} with cleanup operation {}",
-          tableRuntime.getTableIdentifier().getTableName(),
-          cleanupOperation);
-    } catch (Exception e) {
-      logger.error(
-          "Failed to update lastCleanTime for table {}",
-          tableRuntime.getTableIdentifier().getTableName(),
-          e);
+      cleanProcessMeta.setFailMessage(message);
+    } else {
+      cleanProcessMeta.setStatus(ProcessStatus.SUCCESS);
+    }
+
+    long endTime = System.currentTimeMillis();
+    persistencyHelper.persistAndSetCompleted(
+        tableRuntime, cleanupOperation, cleanProcessMeta, endTime);
+
+    logger.debug(
+        "Successfully updated lastCleanTime and cleanupProcess for table {} 
with cleanup operation {}",
+        tableRuntime.getTableIdentifier().getTableName(),
+        cleanupOperation);
+  }
+
+  private TableProcessMeta buildProcessMeta(
+      TableRuntime tableRuntime, CleanupOperation cleanupOperation) {
+    TableProcessMeta cleanProcessMeta = new TableProcessMeta();
+    cleanProcessMeta.setTableId(tableRuntime.getTableIdentifier().getId());
+    cleanProcessMeta.setProcessId(idGenerator.generateId());
+    cleanProcessMeta.setExternalProcessIdentifier(EXTERNAL_PROCESS_IDENTIFIER);
+    cleanProcessMeta.setStatus(ProcessStatus.RUNNING);
+    cleanProcessMeta.setProcessType(cleanupOperation.name());
+    cleanProcessMeta.setProcessStage(CLEANUP_PROCESS_STAGE);
+    cleanProcessMeta.setExecutionEngine(CLEANUP_EXECUTION_ENGINE);
+    cleanProcessMeta.setRetryNumber(0);
+    cleanProcessMeta.setCreateTime(System.currentTimeMillis());
+    cleanProcessMeta.setProcessParameters(new HashMap<>());
+    cleanProcessMeta.setSummary(new HashMap<>());
+
+    return cleanProcessMeta;
+  }
+
+  private static class PersistencyHelper extends PersistentBase {
+
+    public PersistencyHelper() {}
+
+    private void beginAndPersistCleanupProcess(TableProcessMeta meta) {

Review Comment:
   The inner helper is named `PersistencyHelper`, while the surrounding 
package/classes use `persistence` terminology (e.g., `PersistentBase`, 
`server.persistence`). Renaming to `PersistenceHelper` would improve 
consistency and avoid confusion when searching for persistence-related 
utilities.



##########
amoro-ams/src/main/java/org/apache/amoro/server/scheduler/PeriodicTableScheduler.java:
##########
@@ -48,6 +55,12 @@ public abstract class PeriodicTableScheduler extends 
RuntimeHandlerChain {
   protected final Logger logger = LoggerFactory.getLogger(getClass());
 
   private static final long START_DELAY = 10 * 1000L;
+  private static final String CLEANUP_EXECUTION_ENGINE = "AMORO";
+  private static final String CLEANUP_PROCESS_STAGE = "CLEANUP";
+  private static final String EXTERNAL_PROCESS_IDENTIFIER = "";
+
+  private final SnowflakeIdGenerator idGenerator = new SnowflakeIdGenerator();

Review Comment:
   `SnowflakeIdGenerator` is instantiated per `PeriodicTableScheduler` 
instance. Because the generator’s uniqueness relies on per-instance sequence 
state (and defaults to machineId=0), multiple schedulers generating IDs within 
the same time slice can produce duplicate `processId`s and violate the 
`table_process.process_id` PK. Use a shared/static generator (or a centralized 
ID service) so all schedulers share sequence state, or pass distinct machine 
IDs per instance.
   ```suggestion
     private static final SnowflakeIdGenerator ID_GENERATOR = new 
SnowflakeIdGenerator();
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to