This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 8c8f66668bd Fix procedure concurrency problem caused by non-atomic
procedure Id generation (#12229)
8c8f66668bd is described below
commit 8c8f66668bdde0fc4c9a969729e555e5e257b930
Author: Li Yu Heng <[email protected]>
AuthorDate: Mon Mar 25 13:23:12 2024 +0800
Fix procedure concurrency problem caused by non-atomic procedure Id
generation (#12229)
---
.../iotdb/confignode/persistence/ProcedureInfo.java | 16 +++++++++-------
1 file changed, 9 insertions(+), 7 deletions(-)
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/ProcedureInfo.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/ProcedureInfo.java
index 9a6666eee68..78e03f39ee3 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/ProcedureInfo.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/ProcedureInfo.java
@@ -59,6 +59,7 @@ import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Stream;
public class ProcedureInfo implements SnapshotProcessor {
@@ -76,7 +77,7 @@ public class ProcedureInfo implements SnapshotProcessor {
private final Map<Long, Procedure<ConfigNodeProcedureEnv>> procedureMap =
new ConcurrentHashMap<>();
- private long lastProcId = -1;
+ private final AtomicLong lastProcId = new AtomicLong(-1);
private final ProcedureFactory procedureFactory =
ProcedureFactory.getInstance();
@@ -104,7 +105,8 @@ public class ProcedureInfo implements SnapshotProcessor {
LOGGER.error("Load procedure wal failed.", e);
}
procedureList.forEach(procedure -> procedureMap.put(procedure.getProcId(),
procedure));
- procedureList.forEach(procedure -> lastProcId = Math.max(lastProcId,
procedure.getProcId()));
+ procedureList.forEach(
+ procedure -> lastProcId.set(Math.max(lastProcId.get(),
procedure.getProcId())));
return procedureList;
}
@@ -133,7 +135,7 @@ public class ProcedureInfo implements SnapshotProcessor {
public TSStatus updateProcedure(UpdateProcedurePlan updateProcedurePlan) {
Procedure<ConfigNodeProcedureEnv> procedure =
updateProcedurePlan.getProcedure();
procedureMap.put(procedure.getProcId(), procedure);
- lastProcId = Math.max(lastProcId, procedure.getProcId());
+ lastProcId.set(Math.max(lastProcId.get(), procedure.getProcId()));
return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
}
@@ -198,7 +200,7 @@ public class ProcedureInfo implements SnapshotProcessor {
try (FileOutputStream fileOutputStream = new FileOutputStream(mainFile);
DataOutputStream dataOutputStream = new
DataOutputStream(fileOutputStream);
TIOStreamTransport tioStreamTransport = new
TIOStreamTransport(fileOutputStream)) {
- ReadWriteIOUtils.write(lastProcId, fileOutputStream);
+ ReadWriteIOUtils.write(lastProcId.get(), fileOutputStream);
tioStreamTransport.flush();
fileOutputStream.getFD().sync();
}
@@ -244,7 +246,7 @@ public class ProcedureInfo implements SnapshotProcessor {
File mainFile =
new File(procedureSnapshotDir.getAbsolutePath() + File.separator +
MAIN_SNAPSHOT_FILENAME);
try (FileInputStream fileInputStream = new FileInputStream(mainFile)) {
- lastProcId = ReadWriteIOUtils.readLong(fileInputStream);
+ lastProcId.set(ReadWriteIOUtils.readLong(fileInputStream));
}
Arrays.stream(Objects.requireNonNull(procedureSnapshotDir.listFiles()))
@@ -262,7 +264,7 @@ public class ProcedureInfo implements SnapshotProcessor {
}
public long getNextProcId() {
- return ++this.lastProcId;
+ return this.lastProcId.incrementAndGet();
}
@Override
@@ -274,7 +276,7 @@ public class ProcedureInfo implements SnapshotProcessor {
return false;
}
ProcedureInfo procedureInfo = (ProcedureInfo) o;
- return lastProcId == procedureInfo.lastProcId
+ return lastProcId.get() == procedureInfo.lastProcId.get()
&& procedureMap.equals(procedureInfo.procedureMap);
}
}