This is an automated email from the ASF dual-hosted git repository.
jiajunwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git
The following commit(s) were added to refs/heads/master by this push:
new 41c4d48 Fix Periodic rebalancer Timer leak (#1456)
41c4d48 is described below
commit 41c4d48b263128e65f36c3243bfb1b2fdab9f038
Author: xyuanlu <[email protected]>
AuthorDate: Mon Oct 26 12:13:59 2020 -0700
Fix Periodic rebalancer Timer leak (#1456)
In current startPeriodRebalance, two thread may interference with each
other. This may result in one timer got canceled twice, two timers are created
with one timer leaked. This PR changes Timer to use a
SingleThreadScheduledExecutor and adds a synchronized block in start/stop
PeriodRebalance.
---
.../helix/controller/GenericHelixController.java | 47 ++++++++++++++--------
1 file changed, 30 insertions(+), 17 deletions(-)
diff --git
a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
index 06e0fa1..a771c43 100644
---
a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
+++
b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
@@ -34,6 +34,7 @@ import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
@@ -172,13 +173,15 @@ public class GenericHelixController implements
IdealStateChangeListener, LiveIns
private boolean _inMaintenanceMode;
/**
- * The timer that can periodically run the rebalancing pipeline. The timer
will start if there is
- * one resource group has the config to use the timer.
+ * The executors that can periodically run the rebalancing pipeline. A
+ * SingleThreadScheduledExecutor will start if there is resource group that
has the config to do
+ * periodically rebalance.
*/
- Timer _periodicalRebalanceTimer = null;
+ private static final ScheduledExecutorService _periodicalRebalanceExecutor =
+ Executors.newSingleThreadScheduledExecutor();
+ private ScheduledFuture _periodicRebalanceFutureTask = null;
long _timerPeriod = Long.MAX_VALUE;
-
/**
* The timer that triggers the on-demand rebalance pipeline.
*/
@@ -329,19 +332,23 @@ public class GenericHelixController implements
IdealStateChangeListener, LiveIns
/**
* Starts the rebalancing timer with the specified period. Start the timer
if necessary; If the
* period is smaller than the current period, cancel the current timer and
use the new period.
+ * note: For case where 2 threads change value from v0->v1 and v0->v2 at the
same time, the
+ * result is indeterminable.
*/
void startPeriodRebalance(long period, HelixManager manager) {
if (period != _timerPeriod) {
logger.info("Controller starting periodical rebalance timer at period "
+ period);
- if (_periodicalRebalanceTimer != null) {
- _periodicalRebalanceTimer.cancel();
+ ScheduledFuture lastScheduledFuture;
+ synchronized (_periodicalRebalanceExecutor) {
+ lastScheduledFuture = _periodicRebalanceFutureTask;
+ _timerPeriod = period;
+ _periodicRebalanceFutureTask = _periodicalRebalanceExecutor
+ .scheduleAtFixedRate(new RebalanceTask(manager,
ClusterEventType.PeriodicalRebalance),
+ _timerPeriod, _timerPeriod, TimeUnit.MILLISECONDS);
+ }
+ if (lastScheduledFuture != null && !lastScheduledFuture.isCancelled()) {
+ lastScheduledFuture.cancel(false /* mayInterruptIfRunning */);
}
- _periodicalRebalanceTimer =
- new Timer("GenericHelixController_" + _clusterName +
"_periodical_Timer", true);
- _timerPeriod = period;
- _periodicalRebalanceTimer
- .scheduleAtFixedRate(new RebalanceTask(manager,
ClusterEventType.PeriodicalRebalance),
- _timerPeriod, _timerPeriod);
} else {
logger.info("Controller already has periodical rebalance timer at period
" + _timerPeriod);
}
@@ -352,11 +359,11 @@ public class GenericHelixController implements
IdealStateChangeListener, LiveIns
*/
void stopPeriodRebalance() {
logger.info("Controller stopping periodical rebalance timer at period " +
_timerPeriod);
- if (_periodicalRebalanceTimer != null) {
- _periodicalRebalanceTimer.cancel();
- _periodicalRebalanceTimer = null;
- _timerPeriod = Long.MAX_VALUE;
- logger.info("Controller stopped periodical rebalance timer at period " +
_timerPeriod);
+ synchronized (_periodicalRebalanceExecutor) {
+ if (_periodicRebalanceFutureTask != null &&
!_periodicRebalanceFutureTask.isCancelled()) {
+ _periodicRebalanceFutureTask.cancel(false /* mayInterruptIfRunning */);
+ _timerPeriod = Long.MAX_VALUE;
+ }
}
}
@@ -1300,6 +1307,12 @@ public class GenericHelixController implements
IdealStateChangeListener, LiveIns
public void shutdown() throws InterruptedException {
stopPeriodRebalance();
+ _periodicalRebalanceExecutor.shutdown();
+ if (!_periodicalRebalanceExecutor
+ .awaitTermination(EVENT_THREAD_JOIN_TIMEOUT, TimeUnit.MILLISECONDS)) {
+ _periodicalRebalanceExecutor.shutdownNow();
+ }
+
shutdownOnDemandTimer();
logger.info("Shutting down {} pipeline", Pipeline.Type.DEFAULT.name());
shutdownPipeline(_eventThread, _eventQueue);