Copilot commented on code in PR #4077:
URL: https://github.com/apache/amoro/pull/4077#discussion_r3042604085
##########
amoro-ams/src/main/java/org/apache/amoro/server/scheduler/PeriodicTableScheduler.java:
##########
@@ -156,25 +175,125 @@ protected boolean shouldExecute(Long lastCleanupEndTime)
{
return true;
}
- private void persistUpdatingCleanupTime(TableRuntime tableRuntime) {
- CleanupOperation cleanupOperation = getCleanupOperation();
+ @VisibleForTesting
+ public TableProcessMeta createCleanupProcessInfo(
+ TableRuntime tableRuntime, CleanupOperation cleanupOperation) {
+
if (shouldSkipOperation(tableRuntime, cleanupOperation)) {
+ return null;
+ }
+
+ TableProcessMeta cleanProcessMeta = buildProcessMeta(tableRuntime,
cleanupOperation);
+ persistencyHelper.beginAndPersistCleanupProcess(cleanProcessMeta);
+
+ logger.debug(
+ "Successfully persist cleanup process [processId={}, tableId={},
processType={}]",
+ cleanProcessMeta.getProcessId(),
+ cleanProcessMeta.getTableId(),
+ cleanProcessMeta.getProcessType());
+
+ return cleanProcessMeta;
+ }
+
+ @VisibleForTesting
+ public void persistCleanupResult(
+ TableRuntime tableRuntime,
+ CleanupOperation cleanupOperation,
+ TableProcessMeta cleanProcessMeta,
+ Throwable executionError) {
+
+ if (cleanProcessMeta == null) {
return;
}
- try {
- long currentTime = System.currentTimeMillis();
- ((DefaultTableRuntime)
tableRuntime).updateLastCleanTime(cleanupOperation, currentTime);
+ if (executionError != null) {
+ cleanProcessMeta.setStatus(ProcessStatus.FAILED);
+ String message = executionError.getMessage();
+ if (message == null) {
+ message = executionError.getClass().getName();
+ }
- logger.debug(
- "Update lastCleanTime for table {} with cleanup operation {}",
- tableRuntime.getTableIdentifier().getTableName(),
- cleanupOperation);
- } catch (Exception e) {
- logger.error(
- "Failed to update lastCleanTime for table {}",
- tableRuntime.getTableIdentifier().getTableName(),
- e);
+ cleanProcessMeta.setFailMessage(message);
+ } else {
Review Comment:
`fail_message` in `table_process` is constrained (e.g., Postgres has
`length(fail_message) <= 4096`). Persisting `Throwable.getMessage()` without
truncation can violate this constraint (and also loses stack trace detail).
Consider using the existing
`org.apache.amoro.utils.ExceptionUtil.getErrorMessage(t, 4000)` pattern used by
optimizing to both capture stack traces and bound the stored length.
##########
amoro-ams/src/main/java/org/apache/amoro/server/scheduler/PeriodicTableScheduler.java:
##########
@@ -123,16 +136,22 @@ private void scheduleTableExecution(TableRuntime
tableRuntime, long delay) {
}
private void executeTask(TableRuntime tableRuntime) {
+ TableProcessMeta cleanProcessMeta = null;
+ CleanupOperation cleanupOperation = null;
+ Throwable executionError = null;
+
try {
if (isExecutable(tableRuntime)) {
+ cleanupOperation = getCleanupOperation();
+ cleanProcessMeta = createCleanupProcessInfo(tableRuntime,
cleanupOperation);
+
execute(tableRuntime);
- // Different tables take different amounts of time to execute the end
of execute(),
- // so you need to perform the update operation separately for each
table.
- persistUpdatingCleanupTime(tableRuntime);
}
- } catch (Exception e) {
- logger.error("exception when schedule for table: {}",
tableRuntime.getTableIdentifier(), e);
+ } catch (Throwable t) {
+ executionError = t;
Review Comment:
The `catch (Throwable t)` block records the error for persistence but never
logs it. This will silently swallow failures when `execute()` or the
persistence calls throw, making cleanup issues hard to diagnose in production.
Please add an error log (including table identifier and cleanup operation) and
consider rethrowing `Error`s if you don’t want to mask fatal JVM problems.
```suggestion
executionError = t;
logger.error(
"Failed to execute cleanup operation {} for table {}",
cleanupOperation,
tableRuntime.getTableIdentifier(),
t);
if (t instanceof Error) {
throw (Error) t;
}
```
##########
amoro-ams/src/test/java/org/apache/amoro/server/scheduler/inline/TestPeriodicTableSchedulerCleanup.java:
##########
@@ -262,4 +265,67 @@ public void testShouldExecuteTaskWithNoneOperation() {
boolean shouldExecute = executor.shouldExecuteTaskForTest(tableRuntime,
CleanupOperation.NONE);
Assert.assertTrue("Should always execute with NONE operation",
shouldExecute);
}
+
+ /**
+ * Validates that process info is correctly saved to table_process with
proper status transitions.
+ */
+ @Test
+ public void testCleanupProcessPersistence() {
+ long baseTableId = 200L;
+ List<CleanupOperation> operations =
+ Arrays.asList(
+ CleanupOperation.ORPHAN_FILES_CLEANING,
+ CleanupOperation.DANGLING_DELETE_FILES_CLEANING,
+ CleanupOperation.DATA_EXPIRING,
+ CleanupOperation.SNAPSHOTS_EXPIRING);
+
+ for (int i = 0; i < operations.size(); i++) {
+ long tableId = baseTableId + i;
+ CleanupOperation operation = operations.get(i);
+ prepareTestEnvironment(Collections.singletonList(tableId));
+
+ PeriodicTableSchedulerTestBase executor = createTestExecutor(operation);
+ ServerTableIdentifier identifier = createTableIdentifier(tableId);
+ DefaultTableRuntime tableRuntime = createDefaultTableRuntime(identifier);
+
+ // Scenario 1: Create process - verify initial persisted state
+ TableProcessMeta processMeta =
+ executor.createCleanupProcessInfoForTest(tableRuntime, operation);
+ TableProcessMeta persistedRunning =
getProcessMeta(processMeta.getProcessId());
+
+ Assert.assertEquals(ProcessStatus.RUNNING, persistedRunning.getStatus());
+
+ // Scenario 2: Success completion - verify status update and timestamps
+ executor.persistCleanupResultForTest(tableRuntime, operation,
processMeta.copy(), null);
+ TableProcessMeta persistedSuccess =
getProcessMeta(processMeta.getProcessId());
+ Assert.assertEquals(ProcessStatus.SUCCESS, persistedSuccess.getStatus());
+ Assert.assertTrue(persistedSuccess.getCreateTime() <
persistedSuccess.getFinishTime());
Review Comment:
This assertion can be flaky: `createTime` and `finishTime` are both derived
from `System.currentTimeMillis()` and may be equal when the persistence call
happens within the same millisecond (or due to DB timestamp precision).
Consider asserting `<=` instead, or introducing a small deterministic delay to
guarantee `finishTime` is after `createTime`.
```suggestion
Assert.assertTrue(persistedSuccess.getCreateTime() <=
persistedSuccess.getFinishTime());
```
##########
amoro-ams/src/main/java/org/apache/amoro/server/scheduler/PeriodicTableScheduler.java:
##########
@@ -156,25 +175,125 @@ protected boolean shouldExecute(Long lastCleanupEndTime)
{
return true;
}
- private void persistUpdatingCleanupTime(TableRuntime tableRuntime) {
- CleanupOperation cleanupOperation = getCleanupOperation();
+ @VisibleForTesting
+ public TableProcessMeta createCleanupProcessInfo(
+ TableRuntime tableRuntime, CleanupOperation cleanupOperation) {
+
if (shouldSkipOperation(tableRuntime, cleanupOperation)) {
+ return null;
+ }
+
+ TableProcessMeta cleanProcessMeta = buildProcessMeta(tableRuntime,
cleanupOperation);
+ persistencyHelper.beginAndPersistCleanupProcess(cleanProcessMeta);
+
+ logger.debug(
+ "Successfully persist cleanup process [processId={}, tableId={},
processType={}]",
+ cleanProcessMeta.getProcessId(),
+ cleanProcessMeta.getTableId(),
+ cleanProcessMeta.getProcessType());
+
+ return cleanProcessMeta;
+ }
+
+ @VisibleForTesting
+ public void persistCleanupResult(
+ TableRuntime tableRuntime,
+ CleanupOperation cleanupOperation,
+ TableProcessMeta cleanProcessMeta,
+ Throwable executionError) {
+
Review Comment:
`createCleanupProcessInfo` / `persistCleanupResult` are marked
`@VisibleForTesting` but exposed as `public` methods on a production scheduler
base class. This widens the API surface unnecessarily; making them `protected`
(or package-private) would still allow test subclasses to access them while
reducing accidental external use.
##########
amoro-ams/src/main/java/org/apache/amoro/server/scheduler/PeriodicTableScheduler.java:
##########
@@ -156,25 +175,125 @@ protected boolean shouldExecute(Long lastCleanupEndTime)
{
return true;
}
- private void persistUpdatingCleanupTime(TableRuntime tableRuntime) {
- CleanupOperation cleanupOperation = getCleanupOperation();
+ @VisibleForTesting
+ public TableProcessMeta createCleanupProcessInfo(
+ TableRuntime tableRuntime, CleanupOperation cleanupOperation) {
+
if (shouldSkipOperation(tableRuntime, cleanupOperation)) {
+ return null;
+ }
+
+ TableProcessMeta cleanProcessMeta = buildProcessMeta(tableRuntime,
cleanupOperation);
+ persistencyHelper.beginAndPersistCleanupProcess(cleanProcessMeta);
+
+ logger.debug(
+ "Successfully persist cleanup process [processId={}, tableId={},
processType={}]",
+ cleanProcessMeta.getProcessId(),
+ cleanProcessMeta.getTableId(),
+ cleanProcessMeta.getProcessType());
+
+ return cleanProcessMeta;
+ }
+
+ @VisibleForTesting
+ public void persistCleanupResult(
+ TableRuntime tableRuntime,
+ CleanupOperation cleanupOperation,
+ TableProcessMeta cleanProcessMeta,
+ Throwable executionError) {
+
+ if (cleanProcessMeta == null) {
return;
}
- try {
- long currentTime = System.currentTimeMillis();
- ((DefaultTableRuntime)
tableRuntime).updateLastCleanTime(cleanupOperation, currentTime);
+ if (executionError != null) {
+ cleanProcessMeta.setStatus(ProcessStatus.FAILED);
+ String message = executionError.getMessage();
+ if (message == null) {
+ message = executionError.getClass().getName();
+ }
- logger.debug(
- "Update lastCleanTime for table {} with cleanup operation {}",
- tableRuntime.getTableIdentifier().getTableName(),
- cleanupOperation);
- } catch (Exception e) {
- logger.error(
- "Failed to update lastCleanTime for table {}",
- tableRuntime.getTableIdentifier().getTableName(),
- e);
+ cleanProcessMeta.setFailMessage(message);
+ } else {
+ cleanProcessMeta.setStatus(ProcessStatus.SUCCESS);
+ }
+
+ long endTime = System.currentTimeMillis();
+ persistencyHelper.persistAndSetCompleted(
+ tableRuntime, cleanupOperation, cleanProcessMeta, endTime);
+
+ logger.debug(
+ "Successfully updated lastCleanTime and cleanupProcess for table {}
with cleanup operation {}",
+ tableRuntime.getTableIdentifier().getTableName(),
+ cleanupOperation);
+ }
+
+ private TableProcessMeta buildProcessMeta(
+ TableRuntime tableRuntime, CleanupOperation cleanupOperation) {
+ TableProcessMeta cleanProcessMeta = new TableProcessMeta();
+ cleanProcessMeta.setTableId(tableRuntime.getTableIdentifier().getId());
+ cleanProcessMeta.setProcessId(idGenerator.generateId());
+ cleanProcessMeta.setExternalProcessIdentifier(EXTERNAL_PROCESS_IDENTIFIER);
+ cleanProcessMeta.setStatus(ProcessStatus.RUNNING);
+ cleanProcessMeta.setProcessType(cleanupOperation.name());
+ cleanProcessMeta.setProcessStage(CLEANUP_PROCESS_STAGE);
+ cleanProcessMeta.setExecutionEngine(CLEANUP_EXECUTION_ENGINE);
+ cleanProcessMeta.setRetryNumber(0);
+ cleanProcessMeta.setCreateTime(System.currentTimeMillis());
+ cleanProcessMeta.setProcessParameters(new HashMap<>());
+ cleanProcessMeta.setSummary(new HashMap<>());
+
+ return cleanProcessMeta;
+ }
+
+ private static class PersistencyHelper extends PersistentBase {
+
+ public PersistencyHelper() {}
+
+ private void beginAndPersistCleanupProcess(TableProcessMeta meta) {
Review Comment:
The inner helper is named `PersistencyHelper`, while the surrounding
package/classes use `persistence` terminology (e.g., `PersistentBase`,
`server.persistence`). Renaming to `PersistenceHelper` would improve
consistency and avoid confusion when searching for persistence-related
utilities.
##########
amoro-ams/src/main/java/org/apache/amoro/server/scheduler/PeriodicTableScheduler.java:
##########
@@ -48,6 +55,12 @@ public abstract class PeriodicTableScheduler extends
RuntimeHandlerChain {
protected final Logger logger = LoggerFactory.getLogger(getClass());
private static final long START_DELAY = 10 * 1000L;
+ private static final String CLEANUP_EXECUTION_ENGINE = "AMORO";
+ private static final String CLEANUP_PROCESS_STAGE = "CLEANUP";
+ private static final String EXTERNAL_PROCESS_IDENTIFIER = "";
+
+ private final SnowflakeIdGenerator idGenerator = new SnowflakeIdGenerator();
Review Comment:
`SnowflakeIdGenerator` is instantiated per `PeriodicTableScheduler`
instance. Because the generator’s uniqueness relies on per-instance sequence
state (and defaults to machineId=0), multiple schedulers generating IDs within
the same time slice can produce duplicate `processId`s and violate the
`table_process.process_id` PK. Use a shared/static generator (or a centralized
ID service) so all schedulers share sequence state, or pass distinct machine
IDs per instance.
```suggestion
private static final SnowflakeIdGenerator ID_GENERATOR = new
SnowflakeIdGenerator();
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]