Repository: helix Updated Branches: refs/heads/master 7a2b9693d -> 4a99bc43c
[HELIX-710] Create abstract state model for distributed leader standby helix service Project: http://git-wip-us.apache.org/repos/asf/helix/repo Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/4a99bc43 Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/4a99bc43 Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/4a99bc43 Branch: refs/heads/master Commit: 4a99bc43c6f22e478a49fb7f2bbac42d608f17b5 Parents: 7a2b969 Author: Harry Zhang <[email protected]> Authored: Thu Jun 28 14:32:51 2018 -0700 Committer: Harry Zhang <[email protected]> Committed: Thu Jun 28 14:32:51 2018 -0700 ---------------------------------------------------------------------- .../AbstractHelixLeaderStandbyStateModel.java | 106 +++++++++++++++++++ .../DistClusterControllerStateModel.java | 68 +++--------- 2 files changed, 123 insertions(+), 51 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/helix/blob/4a99bc43/helix-core/src/main/java/org/apache/helix/participant/AbstractHelixLeaderStandbyStateModel.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/participant/AbstractHelixLeaderStandbyStateModel.java b/helix-core/src/main/java/org/apache/helix/participant/AbstractHelixLeaderStandbyStateModel.java new file mode 100644 index 0000000..ba1982a --- /dev/null +++ b/helix-core/src/main/java/org/apache/helix/participant/AbstractHelixLeaderStandbyStateModel.java @@ -0,0 +1,106 @@ +package org.apache.helix.participant; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import org.apache.helix.NotificationContext; +import org.apache.helix.model.Message; +import org.apache.helix.participant.statemachine.StateModel; +import org.apache.helix.participant.statemachine.StateModelInfo; +import org.apache.helix.participant.statemachine.StateModelParser; +import org.apache.helix.participant.statemachine.StateTransitionError; +import org.apache.helix.participant.statemachine.Transition; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Generic leader-standby state model impl for helix services. It requires implementing + * service-specific o->s, s->l, l->s, s->o, and reset methods, and provides + * default impl for the reset + */ + +@StateModelInfo(initialState = "OFFLINE", states = { + "LEADER", "STANDBY" +}) +public abstract class AbstractHelixLeaderStandbyStateModel extends StateModel { + private final static Logger logger = + LoggerFactory.getLogger(AbstractHelixLeaderStandbyStateModel.class); + protected final String _zkAddr; + + public AbstractHelixLeaderStandbyStateModel(String zkAddr) { + _zkAddr = zkAddr; + StateModelParser parser = new StateModelParser(); + _currentState = parser.getInitialState(getClass()); + } + + @Transition(to = "STANDBY", from = "OFFLINE") + public abstract void onBecomeStandbyFromOffline(Message message, NotificationContext context); + + @Transition(to = "LEADER", from = "STANDBY") + public abstract void onBecomeLeaderFromStandby(Message message, NotificationContext context) + throws Exception; + + @Transition(to = "STANDBY", from = "LEADER") + public abstract void onBecomeStandbyFromLeader(Message message, NotificationContext context); + + @Transition(to = "OFFLINE", from = "STANDBY") + public abstract void onBecomeOfflineFromStandby(Message message, NotificationContext context); + + @Transition(to = "DROPPED", from = "OFFLINE") + public abstract void onBecomeDroppedFromOffline(Message message, NotificationContext context); + + @Transition(to = "OFFLINE", from = "DROPPED") + public void onBecomeOfflineFromDropped( + Message message, NotificationContext context) { + reset(); + logStateTransition("DROPPED", "OFFLINE", message == null ? "" : message.getPartitionName(), + message == null ? "" : message.getTgtName()); + } + + @Transition(to = "OFFLINE", from = "ERROR") + public void onBecomeOfflineFromError(Message message, NotificationContext context) { + reset(); + logStateTransition("ERROR", "OFFLINE", message == null ? "" : message.getPartitionName(), + message == null ? "" : message.getTgtName()); + } + + @Override + public void rollbackOnError(Message message, NotificationContext context, + StateTransitionError error) { + reset(); + logger.info("{} rolled back on error. Code: {}, Exception: {}", + getStateModeInstanceDescription(message == null ? "" : message.getPartitionName(), + message == null ? "" : message.getTgtName()), error == null ? "" : error.getCode(), + error == null ? "" : error.getException()); + } + + @Override + public abstract void reset(); + + protected String getStateModeInstanceDescription(String partitionName, String instanceName) { + return String.format("%s(%s) for partition %s on instance %s", + this.getClass().getSimpleName(), this.hashCode(), partitionName, instanceName); + } + + protected void logStateTransition(String fromState, String toState, String partitionName, + String instanceName) { + logger.info("Helix Service {} became {} from {}.", + getStateModeInstanceDescription(partitionName, instanceName), toState, fromState); + } +} http://git-wip-us.apache.org/repos/asf/helix/blob/4a99bc43/helix-core/src/main/java/org/apache/helix/participant/DistClusterControllerStateModel.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/participant/DistClusterControllerStateModel.java b/helix-core/src/main/java/org/apache/helix/participant/DistClusterControllerStateModel.java index 1195a24..e144a51 100644 --- a/helix-core/src/main/java/org/apache/helix/participant/DistClusterControllerStateModel.java +++ b/helix-core/src/main/java/org/apache/helix/participant/DistClusterControllerStateModel.java @@ -24,42 +24,33 @@ import org.apache.helix.HelixManagerFactory; import org.apache.helix.InstanceType; import org.apache.helix.NotificationContext; import org.apache.helix.model.Message; -import org.apache.helix.participant.statemachine.StateModel; import org.apache.helix.participant.statemachine.StateModelInfo; -import org.apache.helix.participant.statemachine.StateModelParser; -import org.apache.helix.participant.statemachine.StateTransitionError; -import org.apache.helix.participant.statemachine.Transition; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @StateModelInfo(initialState = "OFFLINE", states = { "LEADER", "STANDBY" }) -public class DistClusterControllerStateModel extends StateModel { +public class DistClusterControllerStateModel extends AbstractHelixLeaderStandbyStateModel { private static Logger logger = LoggerFactory.getLogger(DistClusterControllerStateModel.class); protected HelixManager _controller = null; - protected final String _zkAddr; public DistClusterControllerStateModel(String zkAddr) { - StateModelParser parser = new StateModelParser(); - _currentState = parser.getInitialState(DistClusterControllerStateModel.class); - _zkAddr = zkAddr; + super(zkAddr); } - @Transition(to = "STANDBY", from = "OFFLINE") + @Override public void onBecomeStandbyFromOffline(Message message, NotificationContext context) { - logger.info("Becoming standby from offline for " + message.getResourceName() + " and " + message - .getPartitionName()); + logStateTransition("OFFLINE", "STANDBY", message.getPartitionName(), message.getTgtName()); } - @Transition(to = "LEADER", from = "STANDBY") + @Override public void onBecomeLeaderFromStandby(Message message, NotificationContext context) throws Exception { String clusterName = message.getPartitionName(); String controllerName = message.getTgtName(); - logger.info(controllerName + " becomes leader from standby for " + clusterName); - // System.out.println(controllerName + " becomes leader from standby for " + clusterName); + logger.info(controllerName + " becoming leader from standby for " + clusterName); if (_controller == null) { _controller = @@ -67,6 +58,7 @@ public class DistClusterControllerStateModel extends StateModel { InstanceType.CONTROLLER, _zkAddr); _controller.connect(); _controller.startTimerTasks(); + logStateTransition("STANDBY", "LEADER", clusterName, controllerName); } else { logger.error("controller already exists:" + _controller.getInstanceName() + " for " + clusterName); @@ -74,7 +66,7 @@ public class DistClusterControllerStateModel extends StateModel { } - @Transition(to = "STANDBY", from = "LEADER") + @Override public void onBecomeStandbyFromLeader(Message message, NotificationContext context) { String clusterName = message.getPartitionName(); String controllerName = message.getTgtName(); @@ -82,59 +74,33 @@ public class DistClusterControllerStateModel extends StateModel { logger.info(controllerName + " becoming standby from leader for " + clusterName); if (_controller != null) { - _controller.disconnect(); - _controller = null; + reset(); + logStateTransition("LEADER", "STANDBY", clusterName, controllerName); } else { logger.error("No controller exists for " + clusterName); } } - @Transition(to = "OFFLINE", from = "STANDBY") + @Override public void onBecomeOfflineFromStandby(Message message, NotificationContext context) { - String clusterName = message.getPartitionName(); - String controllerName = message.getTgtName(); - - logger.info(controllerName + " becoming offline from standby for cluster:" + clusterName); - + logStateTransition("STANDBY", "OFFLINE", message.getPartitionName(), message.getTgtName()); } - @Transition(to = "DROPPED", from = "OFFLINE") + @Override public void onBecomeDroppedFromOffline(Message message, NotificationContext context) { - logger.info("Becoming dropped from offline"); - } - - @Transition(to = "OFFLINE", from = "DROPPED") - public void onBecomeOfflineFromDropped(Message message, NotificationContext context) { - logger.info("Becoming offline from dropped"); - } - - @Transition(to = "OFFLINE", from = "ERROR") - public void onBecomeOfflineFromError(Message message, NotificationContext context) { - logger.info("Becoming offline from error."); reset(); + logStateTransition("OFFLINE", "DROPPED", message == null ? "" : message.getPartitionName(), + message == null ? "" : message.getTgtName()); } @Override - public void rollbackOnError(Message message, NotificationContext context, - StateTransitionError error) { - String clusterName = message.getPartitionName(); - String controllerName = message.getTgtName(); - - logger.error(controllerName + " rollbacks on error for " + clusterName); - - if (_controller != null) { - _controller.disconnect(); - _controller = null; - } - + public String getStateModeInstanceDescription(String partitionName, String instanceName) { + return String.format("Controller for cluster %s on instance %s", partitionName, instanceName); } @Override public void reset() { if (_controller != null) { - // System.out.println("disconnect " + _controller.getInstanceName() - // + "(" + _controller.getInstanceType() - // + ") from " + _controller.getClusterName()); _controller.disconnect(); _controller = null; }
