This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 1512a93845 [IOTDB-2880] Fix procedure worker threads config doesn't
take effect, and remove id lock. (#6129)
1512a93845 is described below
commit 1512a938457041b612ecfb1295955028b5d0e52f
Author: cmlmakahts <[email protected]>
AuthorDate: Fri Jun 3 18:14:55 2022 +0800
[IOTDB-2880] Fix procedure worker threads config doesn't take effect, and
remove id lock. (#6129)
---
.../assembly/resources/conf/iotdb-confignode.properties | 14 --------------
.../apache/iotdb/confignode/manager/ProcedureManager.java | 2 +-
.../confignode/procedure/CompletedProcedureRecycler.java | 3 ---
.../org/apache/iotdb/confignode/procedure/Procedure.java | 2 +-
.../iotdb/confignode/procedure/ProcedureExecutor.java | 12 ------------
5 files changed, 2 insertions(+), 31 deletions(-)
diff --git a/confignode/src/assembly/resources/conf/iotdb-confignode.properties
b/confignode/src/assembly/resources/conf/iotdb-confignode.properties
index 49dbe85247..b746926955 100644
--- a/confignode/src/assembly/resources/conf/iotdb-confignode.properties
+++ b/confignode/src/assembly/resources/conf/iotdb-confignode.properties
@@ -194,20 +194,6 @@ target_confignode=0.0.0.0:22277
# If its prefix is "/", then the path is absolute. Otherwise, it is relative.
# consensus_dir=data/consensus
-
-# procedure wal dir
-# If this property is unset, system will save the data in the default relative
path directory under the confignode folder(i.e.,
%CONFIGNODE_HOME%/data/consensus).
-# If it is absolute, system will save the data in exact location it points to.
-# If it is relative, system will save the data in the relative path directory
it indicates under the confignode folder.
-# Note: If data_dir is assigned an empty string(i.e.,zero-size), it will be
handled as a relative path.
-# For windows platform
-# If its prefix is a drive specifier followed by "\\", or if its prefix is
"\\\\", then the path is absolute. Otherwise, it is relative.
-# proc_wal_dir=data\\proc
-# For Linux platform
-# If its prefix is "/", then the path is absolute. Otherwise, it is relative.
-# proc_wal_dir=data/proc
-
-
# UDF lib dir
# If this property is unset, system will save the data in the default relative
path directory under
# the UDF folder(i.e., %CONFIGNODE_HOME%/ext/udf).
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
index c3cae2178b..30c776f4a0 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
@@ -68,7 +68,7 @@ public class ProcedureManager {
public void shiftExecutor(boolean running) {
if (running) {
if (!executor.isRunning()) {
- executor.init(configNodeConf.getSchemaReplicationFactor());
+ executor.init(configNodeConf.getProcedureCoreWorkerThreadsSize());
executor.startWorkers();
executor.startCompletedCleaner(
configNodeConf.getProcedureCompletedCleanInterval(),
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/CompletedProcedureRecycler.java
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/CompletedProcedureRecycler.java
index 66af46fa9a..46d1b47dbc 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/CompletedProcedureRecycler.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/CompletedProcedureRecycler.java
@@ -80,8 +80,5 @@ public class CompletedProcedureRecycler<Env> extends
InternalProcedure<Env> {
if (batchCount > 0) {
store.delete(batchIds, 0, batchCount);
}
- // let the store do some cleanup works, i.e, delete the place marker for
preserving the max
- // procedure id.
- store.cleanup();
}
}
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/Procedure.java
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/Procedure.java
index 9561ab177c..ec2a8d024a 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/Procedure.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/Procedure.java
@@ -57,7 +57,7 @@ public abstract class Procedure<Env> implements
Comparable<Procedure<Env>> {
private ProcedureState state = ProcedureState.INITIALIZING;
private int childrenLatch = 0;
- private org.apache.iotdb.confignode.procedure.exception.ProcedureException
exception;
+ private ProcedureException exception;
private volatile long timeout = NO_TIMEOUT;
private volatile long lastUpdate;
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/ProcedureExecutor.java
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/ProcedureExecutor.java
index 5719a351ac..67980f6404 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/ProcedureExecutor.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/ProcedureExecutor.java
@@ -46,13 +46,10 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.ReentrantLock;
public class ProcedureExecutor<Env> {
private static final Logger LOG =
LoggerFactory.getLogger(ProcedureExecutor.class);
- private final ConcurrentHashMap<Long, ReentrantLock> idLockMap = new
ConcurrentHashMap<>();
-
private final ConcurrentHashMap<Long, CompletedProcedureContainer<Env>>
completed =
new ConcurrentHashMap<>();
@@ -638,10 +635,7 @@ public class ProcedureExecutor<Env> {
* @return procedure lock state
*/
private ProcedureLockState executeRollback(Procedure<Env> procedure) {
- ReentrantLock idLock =
- idLockMap.computeIfAbsent(procedure.getProcId(), procId -> new
ReentrantLock());
try {
- idLock.lock();
procedure.doRollback(this.environment);
} catch (IOException e) {
LOG.error("Roll back failed for {}", procedure, e);
@@ -649,8 +643,6 @@ public class ProcedureExecutor<Env> {
LOG.warn("Interrupted exception occured for {}", procedure, e);
} catch (Throwable t) {
LOG.error("CODE-BUG: runtime exception for {}", procedure, t);
- } finally {
- idLock.unlock();
}
cleanupAfterRollback(procedure);
return ProcedureLockState.LOCK_ACQUIRED;
@@ -746,16 +738,12 @@ public class ProcedureExecutor<Env> {
this.activeProcedure = procedure;
int activeCount = activeExecutorCount.incrementAndGet();
startTime.set(System.currentTimeMillis());
- ReentrantLock idLock =
- idLockMap.computeIfAbsent(procedure.getProcId(), id -> new
ReentrantLock());
- idLock.lock();
executeProcedure(procedure);
activeCount = activeExecutorCount.decrementAndGet();
LOG.trace("Halt pid={}, activeCount={}", procedure.getProcId(),
activeCount);
this.activeProcedure = null;
lastUpdated = System.currentTimeMillis();
startTime.set(lastUpdated);
- idLock.unlock();
}
} catch (Throwable throwable) {