This is an automated email from the ASF dual-hosted git repository.
jinsongzhou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/amoro.git
The following commit(s) were added to refs/heads/master by this push:
new 49989babc [AMORO-3365]The table is always in committing state (#3366)
49989babc is described below
commit 49989babc96bff948fc230407c629dfbc3f5666d
Author: 7hong <[email protected]>
AuthorDate: Thu Jan 23 14:47:50 2025 +0800
[AMORO-3365]The table is always in committing state (#3366)
* [AMORO-3365]The table is always in committing state
* Close the committing process on start
* Fix unit tests
Fix unit tests
Fix unit tests
Fix unit tests
---------
Co-authored-by: Congxian Qiu <[email protected]>
---
.../amoro/server/optimizing/OptimizingQueue.java | 29 +++++++++++++++++++---
.../amoro/server/TestDefaultOptimizingService.java | 8 ++++--
2 files changed, 31 insertions(+), 6 deletions(-)
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/OptimizingQueue.java
b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/OptimizingQueue.java
index db10d9a8b..2c8640d05 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/OptimizingQueue.java
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/OptimizingQueue.java
@@ -23,6 +23,7 @@ import org.apache.amoro.OptimizerProperties;
import org.apache.amoro.ServerTableIdentifier;
import org.apache.amoro.api.OptimizingTaskId;
import org.apache.amoro.exception.OptimizingClosedException;
+import org.apache.amoro.exception.PersistenceException;
import org.apache.amoro.optimizing.MetricsSummary;
import org.apache.amoro.optimizing.OptimizingType;
import org.apache.amoro.optimizing.RewriteFilesInput;
@@ -120,9 +121,20 @@ public class OptimizingQueue extends PersistentBase {
if (tableRuntime.isOptimizingEnabled()) {
tableRuntime.resetTaskQuotas(
System.currentTimeMillis() -
AmoroServiceConstants.QUOTA_LOOK_BACK_TIME);
+ // Close the committing process to avoid duplicate commit on the table.
+ if (tableRuntime.getOptimizingStatus() == OptimizingStatus.COMMITTING) {
+ OptimizingProcess process = tableRuntime.getOptimizingProcess();
+ if (process != null) {
+ LOG.warn(
+ "Close the committing process {} on table {}",
+ process.getProcessId(),
+ tableRuntime.getTableIdentifier());
+ process.close();
+ }
+ }
if (!tableRuntime.getOptimizingStatus().isProcessing()) {
scheduler.addTable(tableRuntime);
- } else if (tableRuntime.getOptimizingStatus() !=
OptimizingStatus.COMMITTING) {
+ } else {
tableQueue.offer(new TableOptimizingProcess(tableRuntime));
}
} else {
@@ -569,6 +581,10 @@ public class OptimizingQueue extends PersistentBase {
try {
if (hasCommitted) {
LOG.warn("{} has already committed, give up",
tableRuntime.getTableIdentifier());
+ try {
+ persistAndSetCompleted(status == ProcessStatus.SUCCESS);
+ } catch (Exception ignored) {
+ }
throw new IllegalStateException("repeat commit, and last error " +
failedReason);
}
try {
@@ -577,10 +593,15 @@ public class OptimizingQueue extends PersistentBase {
status = ProcessStatus.SUCCESS;
endTime = System.currentTimeMillis();
persistAndSetCompleted(true);
- } catch (Exception e) {
- LOG.error("{} Commit optimizing failed ",
tableRuntime.getTableIdentifier(), e);
+ } catch (PersistenceException e) {
+ LOG.warn(
+ "{} failed to persist process completed, will retry next commit",
+ tableRuntime.getTableIdentifier(),
+ e);
+ } catch (Throwable t) {
+ LOG.error("{} Commit optimizing failed ",
tableRuntime.getTableIdentifier(), t);
status = ProcessStatus.FAILED;
- failedReason = ExceptionUtil.getErrorMessage(e, 4000);
+ failedReason = ExceptionUtil.getErrorMessage(t, 4000);
endTime = System.currentTimeMillis();
persistAndSetCompleted(false);
}
diff --git
a/amoro-ams/src/test/java/org/apache/amoro/server/TestDefaultOptimizingService.java
b/amoro-ams/src/test/java/org/apache/amoro/server/TestDefaultOptimizingService.java
index 9f17b8a4c..b68660a9c 100644
---
a/amoro-ams/src/test/java/org/apache/amoro/server/TestDefaultOptimizingService.java
+++
b/amoro-ams/src/test/java/org/apache/amoro/server/TestDefaultOptimizingService.java
@@ -325,8 +325,12 @@ public class TestDefaultOptimizingService extends
AMSTableTestBase {
optimizingService().completeTask(token,
buildOptimizingTaskResult(task.getTaskId()));
reload();
- assertTaskCompleted(null);
- Assertions.assertNull(optimizingService().pollTask(token, THREAD_ID));
+ // Committing process will be closed when reloading
+ Assertions.assertNull(
+
tableService().getRuntime(serverTableIdentifier().getId()).getOptimizingProcess());
+ Assertions.assertEquals(
+ OptimizingStatus.IDLE,
+
tableService().getRuntime(serverTableIdentifier().getId()).getOptimizingStatus());
}
@Test