This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 8c8f66668bd Fix procedure concurrency problem caused by non-atomic 
procedure Id generation (#12229)
8c8f66668bd is described below

commit 8c8f66668bdde0fc4c9a969729e555e5e257b930
Author: Li Yu Heng <[email protected]>
AuthorDate: Mon Mar 25 13:23:12 2024 +0800

    Fix procedure concurrency problem caused by non-atomic procedure Id 
generation (#12229)
---
 .../iotdb/confignode/persistence/ProcedureInfo.java      | 16 +++++++++-------
 1 file changed, 9 insertions(+), 7 deletions(-)

diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/ProcedureInfo.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/ProcedureInfo.java
index 9a6666eee68..78e03f39ee3 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/ProcedureInfo.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/ProcedureInfo.java
@@ -59,6 +59,7 @@ import java.util.Optional;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Stream;
 
 public class ProcedureInfo implements SnapshotProcessor {
@@ -76,7 +77,7 @@ public class ProcedureInfo implements SnapshotProcessor {
   private final Map<Long, Procedure<ConfigNodeProcedureEnv>> procedureMap =
       new ConcurrentHashMap<>();
 
-  private long lastProcId = -1;
+  private final AtomicLong lastProcId = new AtomicLong(-1);
 
   private final ProcedureFactory procedureFactory = 
ProcedureFactory.getInstance();
 
@@ -104,7 +105,8 @@ public class ProcedureInfo implements SnapshotProcessor {
       LOGGER.error("Load procedure wal failed.", e);
     }
     procedureList.forEach(procedure -> procedureMap.put(procedure.getProcId(), 
procedure));
-    procedureList.forEach(procedure -> lastProcId = Math.max(lastProcId, 
procedure.getProcId()));
+    procedureList.forEach(
+        procedure -> lastProcId.set(Math.max(lastProcId.get(), 
procedure.getProcId())));
     return procedureList;
   }
 
@@ -133,7 +135,7 @@ public class ProcedureInfo implements SnapshotProcessor {
   public TSStatus updateProcedure(UpdateProcedurePlan updateProcedurePlan) {
     Procedure<ConfigNodeProcedureEnv> procedure = 
updateProcedurePlan.getProcedure();
     procedureMap.put(procedure.getProcId(), procedure);
-    lastProcId = Math.max(lastProcId, procedure.getProcId());
+    lastProcId.set(Math.max(lastProcId.get(), procedure.getProcId()));
     return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
   }
 
@@ -198,7 +200,7 @@ public class ProcedureInfo implements SnapshotProcessor {
     try (FileOutputStream fileOutputStream = new FileOutputStream(mainFile);
         DataOutputStream dataOutputStream = new 
DataOutputStream(fileOutputStream);
         TIOStreamTransport tioStreamTransport = new 
TIOStreamTransport(fileOutputStream)) {
-      ReadWriteIOUtils.write(lastProcId, fileOutputStream);
+      ReadWriteIOUtils.write(lastProcId.get(), fileOutputStream);
       tioStreamTransport.flush();
       fileOutputStream.getFD().sync();
     }
@@ -244,7 +246,7 @@ public class ProcedureInfo implements SnapshotProcessor {
     File mainFile =
         new File(procedureSnapshotDir.getAbsolutePath() + File.separator + 
MAIN_SNAPSHOT_FILENAME);
     try (FileInputStream fileInputStream = new FileInputStream(mainFile)) {
-      lastProcId = ReadWriteIOUtils.readLong(fileInputStream);
+      lastProcId.set(ReadWriteIOUtils.readLong(fileInputStream));
     }
 
     Arrays.stream(Objects.requireNonNull(procedureSnapshotDir.listFiles()))
@@ -262,7 +264,7 @@ public class ProcedureInfo implements SnapshotProcessor {
   }
 
   public long getNextProcId() {
-    return ++this.lastProcId;
+    return this.lastProcId.incrementAndGet();
   }
 
   @Override
@@ -274,7 +276,7 @@ public class ProcedureInfo implements SnapshotProcessor {
       return false;
     }
     ProcedureInfo procedureInfo = (ProcedureInfo) o;
-    return lastProcId == procedureInfo.lastProcId
+    return lastProcId.get() == procedureInfo.lastProcId.get()
         && procedureMap.equals(procedureInfo.procedureMap);
   }
 }

Reply via email to