xxubai commented on code in PR #4116:
URL: https://github.com/apache/amoro/pull/4116#discussion_r2915545921
##########
amoro-ams/src/main/java/org/apache/amoro/server/process/TableProcessMeta.java:
##########
@@ -188,6 +191,24 @@ public static TableProcessMeta
fromTableProcessStore(TableProcessStore tableProc
return tableProcessMeta;
}
+ public static TableProcessMeta createProcessMeta(TableProcess process) {
+ TableProcessMeta tableProcessMeta = new TableProcessMeta();
+ tableProcessMeta.setProcessId(idGenerator.generateId());
+ tableProcessMeta.setTableId(process.getTableIdentifier().getId());
+ tableProcessMeta.setExternalProcessIdentifier("");
+ tableProcessMeta.setStatus(ProcessStatus.PENDING);
+ tableProcessMeta.setProcessType(process.getAction().getName());
+ tableProcessMeta.setProcessStage(process.getProcessStage());
+ tableProcessMeta.setExecutionEngine(process.getExecutionEngine());
+ tableProcessMeta.setRetryNumber(1);
Review Comment:
This means every new process starts with retry count 1 instead of 0. Is this
intentional?
##########
amoro-ams/src/main/java/org/apache/amoro/server/process/ProcessService.java:
##########
@@ -249,37 +217,35 @@ private void executeOrTraceProcess(TableProcess process) {
"Submit table process {} to engine {}, process id:{}",
process,
executeEngine.engineType(),
- process.getId());
+ store.getProcessId());
}
/**
* Cancel the given process and untrack it.
*
* @param process table process
*/
- private void cancelProcess(TableProcess process) {
+ private void cancelProcess(TableProcessStore store, TableProcess process) {
- process
- .store()
- .tryTransitState(
- ProcessStatus.CANCELED,
- ProcessEvent.CANCEL_REQUESTED,
- process.getExternalProcessIdentifier(),
- "Gracefully Canceled.",
- process.getProcessParameters(),
- process.getSummary());
-
untrackTableProcessInstance(process.getTableRuntime().getTableIdentifier(),
process.getId());
+ store.tryTransitState(
+ ProcessStatus.CANCELED,
+ ProcessEvent.CANCEL_REQUESTED,
+ store.getExternalProcessIdentifier(),
+ "Gracefully Canceled.",
+ process.getProcessParameters(),
+ process.getSummary());
+ untrackTableProcessInstance(
+ process.getTableRuntime().getTableIdentifier(), store.getProcessId());
- ExecuteEngine executeEngine =
-
executeEngines.get(EngineType.of(process.store().getExecutionEngine()));
+ ExecuteEngine executeEngine =
executeEngines.get(EngineType.of(process.getExecutionEngine()));
Review Comment:
In `executeOrTraceProcess`:
`executeEngines.get(EngineType.of(store.getExecutionEngine()))`
In `cancelProcess`:
`executeEngines.get(EngineType.of(process.getExecutionEngine()))`
Both should use the same source. In the cancel path, the store's engine
value (persisted) is more reliable since it reflects the actual state at
registration time.
##########
amoro-common/src/main/java/org/apache/amoro/TableRuntime.java:
##########
@@ -59,6 +59,13 @@ public interface TableRuntime {
*/
ServerTableIdentifier getTableIdentifier();
+ /**
+ * Load the current table instance for this runtime.
+ *
+ * <p>This method is mainly intended for in-AMS processes.
+ */
+ AmoroTable<?> loadTable();
Review Comment:
Adding loadTable() to the TableRuntime interface in amoro-common ties the
runtime abstraction to table loading, which is an AMS-level concern. Consider a
separate TableLoader interface or keeping it on DefaultTableRuntime only.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]