This is an automated email from the ASF dual-hosted git repository.
jxue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git
The following commit(s) were added to refs/heads/master by this push:
new fbefa6490 Change TopStateHandoffReportStage to be an Async Stage as it
is the slowest stage in the pipeline. It can be run async since it does not add
to ClusterEvent and simply computes and reports metrics. No following stages
depend on it. (#2610)
fbefa6490 is described below
commit fbefa64909781d479ec486a93d22bba416322fb8
Author: Zachary Pinto <[email protected]>
AuthorDate: Thu Aug 31 20:10:13 2023 -0700
Change TopStateHandoffReportStage to be an Async Stage as it is the slowest
stage in the pipeline. It can be run async since it does not add to
ClusterEvent and simply computes and reports metrics. No following stages
depend on it. (#2610)
In some cases TopStateHandoffReportStage is taking a very long portion of
the total pipeline execution time. In order to speed up the total pipeline
execution time, we will make TopStateHandoffReportStage async since it is
simply computing metrics and reporting without adding to ClusterEvent.
---
.../helix/controller/pipeline/AsyncWorkerType.java | 1 +
.../stages/TopStateHandoffReportStage.java | 22 +++++++++++++++-------
.../helix/controller/stages/BaseStageTest.java | 11 ++++++++---
3 files changed, 24 insertions(+), 10 deletions(-)
diff --git
a/helix-core/src/main/java/org/apache/helix/controller/pipeline/AsyncWorkerType.java
b/helix-core/src/main/java/org/apache/helix/controller/pipeline/AsyncWorkerType.java
index fcbf03f6b..a1afb95f2 100644
---
a/helix-core/src/main/java/org/apache/helix/controller/pipeline/AsyncWorkerType.java
+++
b/helix-core/src/main/java/org/apache/helix/controller/pipeline/AsyncWorkerType.java
@@ -27,6 +27,7 @@ package org.apache.helix.controller.pipeline;
*/
public enum AsyncWorkerType {
+ TopStateHandoffReportWorker,
TargetExternalViewCalcWorker,
PersistAssignmentWorker,
ExternalViewComputeWorker,
diff --git
a/helix-core/src/main/java/org/apache/helix/controller/stages/TopStateHandoffReportStage.java
b/helix-core/src/main/java/org/apache/helix/controller/stages/TopStateHandoffReportStage.java
index 77a84a448..aec55aae5 100644
---
a/helix-core/src/main/java/org/apache/helix/controller/stages/TopStateHandoffReportStage.java
+++
b/helix-core/src/main/java/org/apache/helix/controller/stages/TopStateHandoffReportStage.java
@@ -26,7 +26,8 @@ import org.apache.helix.controller.LogUtil;
import org.apache.helix.controller.dataproviders.BaseControllerDataProvider;
import
org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
import
org.apache.helix.controller.dataproviders.WorkflowControllerDataProvider;
-import org.apache.helix.controller.pipeline.AbstractBaseStage;
+import org.apache.helix.controller.pipeline.AbstractAsyncBaseStage;
+import org.apache.helix.controller.pipeline.AsyncWorkerType;
import org.apache.helix.controller.pipeline.StageException;
import org.apache.helix.model.CurrentState;
import org.apache.helix.model.LiveInstance;
@@ -42,20 +43,27 @@ import org.slf4j.LoggerFactory;
/**
* Observe top state handoff and report latency
*/
-public class TopStateHandoffReportStage extends AbstractBaseStage {
+public class TopStateHandoffReportStage extends AbstractAsyncBaseStage {
private static final long DEFAULT_HANDOFF_USER_LATENCY = 0L;
private static Logger LOG =
LoggerFactory.getLogger(TopStateHandoffReportStage.class);
public static final long TIMESTAMP_NOT_RECORDED = -1L;
@Override
- public void process(ClusterEvent event) throws Exception {
+ public AsyncWorkerType getAsyncWorkerType() {
+ return AsyncWorkerType.TopStateHandoffReportWorker;
+ }
+
+ @Override
+ public void execute(final ClusterEvent event) throws Exception {
_eventId = event.getEventId();
- final BaseControllerDataProvider cache =
event.getAttribute(AttributeName.ControllerDataProvider.name());
- final Long lastPipelineFinishTimestamp = event
-
.getAttributeWithDefault(AttributeName.LastRebalanceFinishTimeStamp.name(),
+ final BaseControllerDataProvider cache =
+ event.getAttribute(AttributeName.ControllerDataProvider.name());
+ final Long lastPipelineFinishTimestamp =
+
event.getAttributeWithDefault(AttributeName.LastRebalanceFinishTimeStamp.name(),
TIMESTAMP_NOT_RECORDED);
final Map<String, Resource> resourceMap =
event.getAttribute(AttributeName.RESOURCES.name());
- final CurrentStateOutput currentStateOutput =
event.getAttribute(AttributeName.CURRENT_STATE.name());
+ final CurrentStateOutput currentStateOutput =
+ event.getAttribute(AttributeName.CURRENT_STATE.name());
final ClusterStatusMonitor clusterStatusMonitor =
event.getAttribute(AttributeName.clusterStatusMonitor.name());
diff --git
a/helix-core/src/test/java/org/apache/helix/controller/stages/BaseStageTest.java
b/helix-core/src/test/java/org/apache/helix/controller/stages/BaseStageTest.java
index 2d4aa0988..fe77383e0 100644
---
a/helix-core/src/test/java/org/apache/helix/controller/stages/BaseStageTest.java
+++
b/helix-core/src/test/java/org/apache/helix/controller/stages/BaseStageTest.java
@@ -31,8 +31,7 @@ import org.apache.helix.HelixAdmin;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixManager;
import org.apache.helix.PropertyKey.Builder;
-import org.apache.helix.model.Message;
-import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.apache.helix.controller.pipeline.AbstractAsyncBaseStage;
import org.apache.helix.controller.pipeline.Stage;
import org.apache.helix.controller.pipeline.StageContext;
import org.apache.helix.mock.MockHelixAdmin;
@@ -44,10 +43,12 @@ import org.apache.helix.model.IdealState;
import org.apache.helix.model.IdealState.RebalanceMode;
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.model.LiveInstance;
+import org.apache.helix.model.Message;
import org.apache.helix.model.Resource;
import org.apache.helix.model.ResourceConfig;
import org.apache.helix.model.StateModelDefinition;
import org.apache.helix.tools.StateModelConfigGenerator;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.testng.ITestContext;
import org.testng.annotations.AfterClass;
import org.testng.annotations.AfterMethod;
@@ -209,7 +210,11 @@ public class BaseStageTest {
stage.init(context);
stage.preProcess();
try {
- stage.process(event);
+ if (stage instanceof AbstractAsyncBaseStage) {
+ ((AbstractAsyncBaseStage) stage).execute(event);
+ } else {
+ stage.process(event);
+ }
} catch (Exception e) {
e.printStackTrace();
}