This is an automated email from the ASF dual-hosted git repository.
hulee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git
The following commit(s) were added to refs/heads/master by this push:
new e4f3da4 Change the way Helix triggers rebalance (#472)
e4f3da4 is described below
commit e4f3da4cdb0c316763476f0dd92bffd01c1cc2ab
Author: Ali Reza Zamani Zadeh Najari <[email protected]>
AuthorDate: Tue Sep 24 10:28:24 2019 -0700
Change the way Helix triggers rebalance (#472)
A method is added which generates OnDemandRebalance event.
This event causes the controller to run the rebalance pipeline for both of
the pipelines.
"Touch" logic (as in directly reading and writing to ZNodes) has been
removed and replaced by this new method.
---
.../helix/common/caches/InstanceMessagesCache.java | 12 ++----
.../helix/controller/GenericHelixController.java | 43 ++++++++++++++++++++--
.../rebalancer/util/RebalanceScheduler.java | 7 +++-
.../main/java/org/apache/helix/task/TaskUtil.java | 4 +-
.../java/org/apache/helix/util/RebalanceUtil.java | 23 ++++++++++++
.../TestDelayedAutoRebalance.java | 2 -
...stDelayedAutoRebalanceWithDisabledInstance.java | 2 -
.../rebalancer/TestMixedModeAutoRebalance.java | 8 ----
8 files changed, 74 insertions(+), 27 deletions(-)
diff --git
a/helix-core/src/main/java/org/apache/helix/common/caches/InstanceMessagesCache.java
b/helix-core/src/main/java/org/apache/helix/common/caches/InstanceMessagesCache.java
index 97ece27..56045a6 100644
---
a/helix-core/src/main/java/org/apache/helix/common/caches/InstanceMessagesCache.java
+++
b/helix-core/src/main/java/org/apache/helix/common/caches/InstanceMessagesCache.java
@@ -39,6 +39,7 @@ import org.apache.helix.model.CurrentState;
import org.apache.helix.model.LiveInstance;
import org.apache.helix.model.Message;
import org.apache.helix.util.HelixUtil;
+import org.apache.helix.util.RebalanceUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -454,14 +455,9 @@ public class InstanceMessagesCache {
// Schedule a future rebalance pipeline run.
private void scheduleFuturePipeline(long rebalanceTime) {
- GenericHelixController controller =
GenericHelixController.getController(_clusterName);
- if (controller != null) {
- controller.scheduleRebalance(rebalanceTime);
- } else {
- LOG.warn(
- "Failed to schedule a future pipeline run for cluster {} at delay
{}, helix controller is null.",
- _clusterName, (rebalanceTime - System.currentTimeMillis()));
- }
+ long current = System.currentTimeMillis();
+ long delay = rebalanceTime - current;
+ RebalanceUtil.scheduleOnDemandPipeline(_clusterName, delay);
}
/**
diff --git
a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
index 931b4ef..b80b8cc 100644
---
a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
+++
b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
@@ -318,8 +318,10 @@ public class GenericHelixController implements
IdealStateChangeListener,
}
/**
+ * This function is deprecated. Please use
RebalanceUtil.scheduleInstantPipeline method instead.
* schedule a future rebalance pipeline run, delayed at given time.
*/
+ @Deprecated
public void scheduleRebalance(long rebalanceTime) {
if (_helixManager == null) {
logger.warn(
@@ -331,10 +333,6 @@ public class GenericHelixController implements
IdealStateChangeListener,
long delay = rebalanceTime - current;
if (rebalanceTime > current) {
- if (_onDemandRebalanceTimer == null) {
- _onDemandRebalanceTimer = new Timer(true);
- }
-
RebalanceTask preTask = _nextRebalanceTask.get();
if (preTask != null && preTask.getNextRebalanceTime() > current
&& preTask.getNextRebalanceTime() < rebalanceTime) {
@@ -357,6 +355,39 @@ public class GenericHelixController implements
IdealStateChangeListener,
}
}
+ /**
+ * Schedule an on demand rebalance pipeline.
+ * @param delay
+ */
+ public void scheduleOnDemandRebalance(long delay) {
+ if (_helixManager == null) {
+ logger.error("Failed to schedule a future pipeline run for cluster {}.
Helix manager is null!",
+ _clusterName);
+ return;
+ }
+ long currentTime = System.currentTimeMillis();
+ long rebalanceTime = currentTime + delay;
+ if (delay > 0) {
+ RebalanceTask preTask = _nextRebalanceTask.get();
+ if (preTask != null && preTask.getNextRebalanceTime() > currentTime
+ && preTask.getNextRebalanceTime() < rebalanceTime) {
+ // already have a earlier rebalance scheduled, no need to schedule
again.
+ return;
+ }
+ }
+
+ RebalanceTask newTask =
+ new RebalanceTask(_helixManager, ClusterEventType.OnDemandRebalance,
rebalanceTime);
+
+ _onDemandRebalanceTimer.schedule(newTask, delay);
+ logger.info("Scheduled instant pipeline run for cluster {}." ,
_helixManager.getClusterName());
+
+ RebalanceTask preTask = _nextRebalanceTask.getAndSet(newTask);
+ if (preTask != null) {
+ preTask.cancel();
+ }
+ }
+
private static PipelineRegistry createDefaultRegistry(String pipelineName) {
logger.info("createDefaultRegistry");
synchronized (GenericHelixController.class) {
@@ -457,6 +488,8 @@ public class GenericHelixController implements
IdealStateChangeListener,
registry.register(ClusterEventType.Resume, dataRefresh, dataPreprocess,
rebalancePipeline);
registry.register(ClusterEventType.PeriodicalRebalance, dataRefresh,
dataPreprocess,
rebalancePipeline);
+ registry.register(ClusterEventType.OnDemandRebalance, dataRefresh,
dataPreprocess,
+ rebalancePipeline);
return registry;
}
}
@@ -488,6 +521,8 @@ public class GenericHelixController implements
IdealStateChangeListener,
_asyncFIFOWorkerPool = new HashMap<>();
initializeAsyncFIFOWorkers();
+ _onDemandRebalanceTimer = new Timer(true);
+
// initialize pipelines at the end so we have everything else prepared
if (_enabledPipelineTypes.contains(Pipeline.Type.DEFAULT)) {
logger.info("Initializing {} pipeline", Pipeline.Type.DEFAULT.name());
diff --git
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/RebalanceScheduler.java
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/RebalanceScheduler.java
index 4812ce9..732734c 100644
---
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/RebalanceScheduler.java
+++
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/RebalanceScheduler.java
@@ -6,6 +6,7 @@ import org.apache.helix.PropertyKey;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.ResourceConfig;
+import org.apache.helix.util.RebalanceUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -124,15 +125,17 @@ public class RebalanceScheduler {
@Override
public void run() {
- invokeRebalance(_manager.getHelixDataAccessor(), _resource);
+ RebalanceUtil.scheduleOnDemandPipeline(_manager.getClusterName(), 0L);
}
}
/**
+ * This function is deprecated. Please use
RebalanceUtil.scheduleInstantPipeline method instead.
* Trigger the controller to perform rebalance for a given resource.
* @param accessor Helix data accessor
* @param resource the name of the resource changed to triggering the
execution
*/
+ @Deprecated
public static void invokeRebalance(HelixDataAccessor accessor, String
resource) {
LOG.info("invoke rebalance for " + resource);
PropertyKey key = accessor.keyBuilder().idealStates(resource);
@@ -149,10 +152,12 @@ public class RebalanceScheduler {
}
/**
+ * This function is deprecated. Please use
RebalanceUtil.scheduleInstantPipeline method instead.
* Trigger the controller to perform rebalance for a given resource.
* @param accessor Helix data accessor
* @param resource the name of the resource changed to triggering the
execution
*/
+ @Deprecated
public static void invokeRebalanceForResourceConfig(HelixDataAccessor
accessor, String resource) {
LOG.info("invoke rebalance for " + resource);
PropertyKey key = accessor.keyBuilder().resourceConfig(resource);
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
index ba642ae..b746fd2 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
@@ -39,6 +39,7 @@ import org.apache.helix.model.HelixConfigScope;
import org.apache.helix.model.ResourceConfig;
import org.apache.helix.model.builder.HelixConfigScopeBuilder;
import org.apache.helix.store.HelixPropertyStore;
+import org.apache.helix.util.RebalanceUtil;
import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jackson.type.TypeReference;
import org.slf4j.Logger;
@@ -1024,8 +1025,7 @@ public class TaskUtil {
List<String> resourceConfigs =
accessor.getChildNames(accessor.keyBuilder().resourceConfigs());
if (resourceConfigs.size() > 0) {
-
RebalanceScheduler.invokeRebalanceForResourceConfig(manager.getHelixDataAccessor(),
- resourceConfigs.get(0));
+ RebalanceUtil.scheduleOnDemandPipeline(manager.getClusterName(),
0L);
} else {
LOG.warn(
"No resource config to trigger rebalance for clean up contexts
for" + expiredJobs);
diff --git a/helix-core/src/main/java/org/apache/helix/util/RebalanceUtil.java
b/helix-core/src/main/java/org/apache/helix/util/RebalanceUtil.java
index 3f8c406..18163bd 100644
--- a/helix-core/src/main/java/org/apache/helix/util/RebalanceUtil.java
+++ b/helix-core/src/main/java/org/apache/helix/util/RebalanceUtil.java
@@ -27,10 +27,15 @@ import java.util.Map;
import java.util.TreeMap;
import org.apache.helix.HelixException;
+import org.apache.helix.controller.GenericHelixController;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.StateModelDefinition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class RebalanceUtil {
+ private static final Logger LOG =
LoggerFactory.getLogger(RebalanceUtil.class.getName());
+
public static Map<String, Object> buildInternalIdealState(IdealState state) {
// Try parse the partition number from name DB_n. If not, sort the
partitions and
// assign id
@@ -136,4 +141,22 @@ public class RebalanceUtil {
result[1] = slaveStateValue;
return result;
}
+
+ public static void scheduleOnDemandPipeline(String clusterName, long delay) {
+ if (clusterName == null) {
+ LOG.error("Failed to issue a pipeline run. ClusterName is null.");
+ return;
+ }
+ if (delay < 0L) {
+ LOG.error("Failed to issue a pipeline run. Delay is invalid.");
+ return;
+ }
+ GenericHelixController controller =
GenericHelixController.getController(clusterName);
+ if (controller != null) {
+ controller.scheduleOnDemandRebalance(delay);
+ } else {
+ LOG.error("Failed to issue a pipeline. Controller for cluster {} does
not exist.",
+ clusterName);
+ }
+ }
}
diff --git
a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/DelayedAutoRebalancer/TestDelayedAutoRebalance.java
b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/DelayedAutoRebalancer/TestDelayedAutoRebalance.java
index 167b3ae..0105a51 100644
---
a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/DelayedAutoRebalancer/TestDelayedAutoRebalance.java
+++
b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/DelayedAutoRebalancer/TestDelayedAutoRebalance.java
@@ -196,8 +196,6 @@ public class TestDelayedAutoRebalance extends ZkTestBase {
// disable delay rebalance for the entire cluster.
enableDelayRebalanceInCluster(_gZkClient, CLUSTER_NAME, false);
- // TODO: remove this once controller is listening on cluster config change.
- RebalanceScheduler.invokeRebalance(_controller.getHelixDataAccessor(),
_testDBs.get(0));
Thread.sleep(100);
Assert.assertTrue(_clusterVerifier.verifyByPolling());
for (String db : _testDBs) {
diff --git
a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/DelayedAutoRebalancer/TestDelayedAutoRebalanceWithDisabledInstance.java
b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/DelayedAutoRebalancer/TestDelayedAutoRebalanceWithDisabledInstance.java
index 4a89db9..746bdf3 100644
---
a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/DelayedAutoRebalancer/TestDelayedAutoRebalanceWithDisabledInstance.java
+++
b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/DelayedAutoRebalancer/TestDelayedAutoRebalanceWithDisabledInstance.java
@@ -272,8 +272,6 @@ public class TestDelayedAutoRebalanceWithDisabledInstance
extends TestDelayedAut
// disable delay rebalance for the entire cluster.
enableDelayRebalanceInCluster(_gZkClient, CLUSTER_NAME, false);
- // TODO: remove this once controller is listening on cluster config change.
- RebalanceScheduler.invokeRebalance(_controller.getHelixDataAccessor(),
_testDBs.get(0));
Thread.sleep(2000);
Assert.assertTrue(_clusterVerifier.verifyByPolling());
for (String db : _testDBs) {
diff --git
a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestMixedModeAutoRebalance.java
b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestMixedModeAutoRebalance.java
index a31229a..76560e9 100644
---
a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestMixedModeAutoRebalance.java
+++
b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestMixedModeAutoRebalance.java
@@ -141,8 +141,6 @@ public class TestMixedModeAutoRebalance extends ZkTestBase {
new
ResourceConfig.Builder(db).setPreferenceLists(userDefinedPreferenceLists).build();
_configAccessor.setResourceConfig(CLUSTER_NAME, db, resourceConfig);
- //TODO: Trigger rebalancer, remove this once Helix controller is listening
on resource config changes.
- RebalanceScheduler.invokeRebalance(_dataAccessor, db);
Assert.assertTrue(_clusterVerifier.verify(1000));
verifyUserDefinedPreferenceLists(db, userDefinedPreferenceLists,
userDefinedPartitions);
@@ -192,9 +190,6 @@ public class TestMixedModeAutoRebalance extends ZkTestBase {
new
ResourceConfig.Builder(db).setPreferenceLists(userDefinedPreferenceLists).build();
_configAccessor.setResourceConfig(CLUSTER_NAME, db, resourceConfig);
- //TODO: Trigger rebalancer, remove this once Helix controller is listening
on resource config changes.
- RebalanceScheduler.invokeRebalance(_dataAccessor, db);
-
Thread.sleep(1000);
ExternalView ev =
_gSetupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME,
db);
@@ -241,9 +236,6 @@ public class TestMixedModeAutoRebalance extends ZkTestBase {
resourceConfig.setPreferenceLists(lists);
userDefinedPartitions.remove(0);
_configAccessor.setResourceConfig(CLUSTER_NAME, db, resourceConfig);
-
- //TODO: Touch IS, remove this once Helix controller is listening on
resource config changes.
- RebalanceScheduler.invokeRebalance(_dataAccessor, db);
}
@AfterClass