http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/579baa5b/helix-core/src/main/java/org/apache/helix/manager/zk/AbstractManager.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/AbstractManager.java b/helix-core/src/main/java/org/apache/helix/manager/zk/AbstractManager.java deleted file mode 100644 index 4f549e4..0000000 --- a/helix-core/src/main/java/org/apache/helix/manager/zk/AbstractManager.java +++ /dev/null @@ -1,693 +0,0 @@ -package org.apache.helix.manager.zk; - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.LinkedList; -import java.util.List; -import java.util.concurrent.TimeUnit; - -import org.I0Itec.zkclient.IZkStateListener; -import org.I0Itec.zkclient.ZkConnection; -import org.apache.helix.BaseDataAccessor; -import org.apache.helix.ClusterMessagingService; -import org.apache.helix.ConfigAccessor; -import org.apache.helix.ConfigChangeListener; -import org.apache.helix.ControllerChangeListener; -import org.apache.helix.CurrentStateChangeListener; -import org.apache.helix.ExternalViewChangeListener; -import org.apache.helix.HealthStateChangeListener; -import org.apache.helix.HelixAdmin; -import org.apache.helix.HelixDataAccessor; -import org.apache.helix.HelixException; -import org.apache.helix.HelixManager; -import org.apache.helix.HelixManagerProperties; -import org.apache.helix.HelixTimerTask; -import org.apache.helix.IdealStateChangeListener; -import org.apache.helix.InstanceConfigChangeListener; -import org.apache.helix.InstanceType; -import org.apache.helix.LiveInstanceChangeListener; -import org.apache.helix.LiveInstanceInfoProvider; -import org.apache.helix.MessageListener; -import org.apache.helix.PreConnectCallback; -import org.apache.helix.PropertyKey; -import org.apache.helix.PropertyPathConfig; -import org.apache.helix.PropertyType; -import org.apache.helix.ScopedConfigChangeListener; -import org.apache.helix.ZNRecord; -import org.apache.helix.HelixConstants.ChangeType; -import org.apache.helix.PropertyKey.Builder; -import org.apache.helix.healthcheck.ParticipantHealthReportCollector; -import org.apache.helix.messaging.DefaultMessagingService; -import org.apache.helix.model.HelixConfigScope.ConfigScopeProperty; -import org.apache.helix.participant.StateMachineEngine; -import org.apache.helix.store.zk.ZkHelixPropertyStore; -import org.apache.log4j.Logger; -import org.apache.zookeeper.Watcher.Event.EventType; -import org.apache.zookeeper.Watcher.Event.KeeperState; -import org.apache.zookeeper.ZooKeeper.States; - -public abstract class AbstractManager implements HelixManager, IZkStateListener { - private static Logger LOG = Logger.getLogger(AbstractManager.class); - - final String _zkAddress; - final String _clusterName; - final String _instanceName; - final InstanceType _instanceType; - final int _sessionTimeout; - final List<PreConnectCallback> _preConnectCallbacks; - protected final List<CallbackHandler> _handlers; - final HelixManagerProperties _properties; - - /** - * helix version# - */ - final String _version; - - protected ZkClient _zkclient = null; - final DefaultMessagingService _messagingService; - - BaseDataAccessor<ZNRecord> _baseDataAccessor; - ZKHelixDataAccessor _dataAccessor; - final Builder _keyBuilder; - ConfigAccessor _configAccessor; - ZkHelixPropertyStore<ZNRecord> _helixPropertyStore; - LiveInstanceInfoProvider _liveInstanceInfoProvider = null; - final List<HelixTimerTask> _timerTasks = new ArrayList<HelixTimerTask>(); - - volatile String _sessionId; - - /** - * Keep track of timestamps that zk State has become Disconnected - * If in a _timeWindowLengthMs window zk State has become Disconnected - * for more than_maxDisconnectThreshold times disconnect the zkHelixManager - */ - final List<Long> _disconnectTimeHistory = new LinkedList<Long>(); - - final int _flappingTimeWindowMs; - final int _maxDisconnectThreshold; - - public AbstractManager(String zkAddress, String clusterName, String instanceName, - InstanceType instanceType) { - - LOG.info("Create a zk-based cluster manager. zkSvr: " + zkAddress + ", clusterName: " - + clusterName + ", instanceName: " + instanceName + ", type: " + instanceType); - - _zkAddress = zkAddress; - _clusterName = clusterName; - _instanceType = instanceType; - _instanceName = instanceName; - _preConnectCallbacks = new LinkedList<PreConnectCallback>(); - _handlers = new ArrayList<CallbackHandler>(); - _properties = new HelixManagerProperties("cluster-manager-version.properties"); - _version = _properties.getVersion(); - - _keyBuilder = new Builder(clusterName); - _messagingService = new DefaultMessagingService(this); - - /** - * use system property if available - */ - _flappingTimeWindowMs = - getSystemPropertyAsInt("helixmanager.flappingTimeWindow", - ZKHelixManager.FLAPPING_TIME_WINDIOW); - - _maxDisconnectThreshold = - getSystemPropertyAsInt("helixmanager.maxDisconnectThreshold", - ZKHelixManager.MAX_DISCONNECT_THRESHOLD); - - _sessionTimeout = - getSystemPropertyAsInt("zk.session.timeout", ZkClient.DEFAULT_SESSION_TIMEOUT); - - } - - private int getSystemPropertyAsInt(String propertyKey, int propertyDefaultValue) { - String valueString = System.getProperty(propertyKey, "" + propertyDefaultValue); - - try { - int value = Integer.parseInt(valueString); - if (value > 0) { - return value; - } - } catch (NumberFormatException e) { - LOG.warn("Exception while parsing property: " + propertyKey + ", string: " + valueString - + ", using default value: " + propertyDefaultValue); - } - - return propertyDefaultValue; - } - - /** - * different types of helix manager should impl its own handle new session logic - */ - // public abstract void handleNewSession(); - - @Override - public void connect() throws Exception { - LOG.info("ClusterManager.connect()"); - if (isConnected()) { - LOG.warn("Cluster manager: " + _instanceName + " for cluster: " + _clusterName - + " already connected. skip connect"); - return; - } - - try { - createClient(); - _messagingService.onConnected(); - } catch (Exception e) { - LOG.error("fail to connect " + _instanceName, e); - disconnect(); - throw e; - } - } - - @Override - public boolean isConnected() { - if (_zkclient == null) { - return false; - } - ZkConnection zkconnection = (ZkConnection) _zkclient.getConnection(); - if (zkconnection != null) { - States state = zkconnection.getZookeeperState(); - return state == States.CONNECTED; - } - return false; - } - - /** - * specific disconnect logic for each helix-manager type - */ - abstract void doDisconnect(); - - /** - * This function can be called when the connection are in bad state(e.g. flapping), - * in which isConnected() could be false and we want to disconnect from cluster. - */ - @Override - public void disconnect() { - LOG.info("disconnect " + _instanceName + "(" + _instanceType + ") from " + _clusterName); - - try { - /** - * stop all timer tasks - */ - stopTimerTasks(); - - /** - * shutdown thread pool first to avoid reset() being invoked in the middle of state - * transition - */ - _messagingService.getExecutor().shutdown(); - - // TODO reset user defined handlers only - resetHandlers(); - - _dataAccessor.shutdown(); - - doDisconnect(); - - _zkclient.unsubscribeAll(); - } finally { - _zkclient.close(); - LOG.info("Cluster manager: " + _instanceName + " disconnected"); - } - } - - @Override - public void addIdealStateChangeListener(IdealStateChangeListener listener) throws Exception { - addListener(listener, new Builder(_clusterName).idealStates(), ChangeType.IDEAL_STATE, - new EventType[] { - EventType.NodeDataChanged, EventType.NodeDeleted, EventType.NodeCreated - }); - } - - @Override - public void addLiveInstanceChangeListener(LiveInstanceChangeListener listener) throws Exception { - addListener(listener, new Builder(_clusterName).liveInstances(), ChangeType.LIVE_INSTANCE, - new EventType[] { - EventType.NodeDataChanged, EventType.NodeChildrenChanged, EventType.NodeDeleted, - EventType.NodeCreated - }); - } - - @Override - public void addConfigChangeListener(ConfigChangeListener listener) throws Exception { - addListener(listener, new Builder(_clusterName).instanceConfigs(), ChangeType.INSTANCE_CONFIG, - new EventType[] { - EventType.NodeChildrenChanged - }); - } - - @Override - public void addInstanceConfigChangeListener(InstanceConfigChangeListener listener) - throws Exception { - addListener(listener, new Builder(_clusterName).instanceConfigs(), ChangeType.INSTANCE_CONFIG, - new EventType[] { - EventType.NodeChildrenChanged - }); - } - - @Override - public void addConfigChangeListener(ScopedConfigChangeListener listener, ConfigScopeProperty scope) - throws Exception { - Builder keyBuilder = new Builder(_clusterName); - - PropertyKey propertyKey = null; - switch (scope) { - case CLUSTER: - propertyKey = keyBuilder.clusterConfigs(); - break; - case PARTICIPANT: - propertyKey = keyBuilder.instanceConfigs(); - break; - case RESOURCE: - propertyKey = keyBuilder.resourceConfigs(); - break; - default: - break; - } - - if (propertyKey != null) { - addListener(listener, propertyKey, ChangeType.CONFIG, new EventType[] { - EventType.NodeChildrenChanged - }); - } else { - LOG.error("Can't add listener to config scope: " + scope); - } - } - - @Override - public void addMessageListener(MessageListener listener, String instanceName) { - addListener(listener, new Builder(_clusterName).messages(instanceName), ChangeType.MESSAGE, - new EventType[] { - EventType.NodeChildrenChanged, EventType.NodeDeleted, EventType.NodeCreated - }); - } - - @Override - public void addCurrentStateChangeListener(CurrentStateChangeListener listener, - String instanceName, String sessionId) throws Exception { - addListener(listener, new Builder(_clusterName).currentStates(instanceName, sessionId), - ChangeType.CURRENT_STATE, new EventType[] { - EventType.NodeChildrenChanged, EventType.NodeDeleted, EventType.NodeCreated - }); - } - - @Override - public void addHealthStateChangeListener(HealthStateChangeListener listener, String instanceName) - throws Exception { - addListener(listener, new Builder(_clusterName).healthReports(instanceName), ChangeType.HEALTH, - new EventType[] { - EventType.NodeChildrenChanged, EventType.NodeDeleted, EventType.NodeCreated - }); - } - - @Override - public void addExternalViewChangeListener(ExternalViewChangeListener listener) throws Exception { - addListener(listener, new Builder(_clusterName).externalViews(), ChangeType.EXTERNAL_VIEW, - new EventType[] { - EventType.NodeChildrenChanged, EventType.NodeDeleted, EventType.NodeCreated - }); - } - - @Override - public void addControllerListener(ControllerChangeListener listener) { - addListener(listener, new Builder(_clusterName).controller(), ChangeType.CONTROLLER, - new EventType[] { - EventType.NodeChildrenChanged, EventType.NodeDeleted, EventType.NodeCreated - }); - } - - void addControllerMessageListener(MessageListener listener) { - addListener(listener, new Builder(_clusterName).controllerMessages(), - ChangeType.MESSAGES_CONTROLLER, new EventType[] { - EventType.NodeChildrenChanged, EventType.NodeDeleted, EventType.NodeCreated - }); - } - - @Override - public boolean removeListener(PropertyKey key, Object listener) { - LOG.info("Removing listener: " + listener + " on path: " + key.getPath() + " from cluster: " - + _clusterName + " by instance: " + _instanceName); - - synchronized (this) { - List<CallbackHandler> toRemove = new ArrayList<CallbackHandler>(); - for (CallbackHandler handler : _handlers) { - // compare property-key path and listener reference - if (handler.getPath().equals(key.getPath()) && handler.getListener().equals(listener)) { - toRemove.add(handler); - } - } - - _handlers.removeAll(toRemove); - - // handler.reset() may modify the handlers list, so do it outside the iteration - for (CallbackHandler handler : toRemove) { - handler.reset(); - } - } - - return true; - } - - @Override - public HelixDataAccessor getHelixDataAccessor() { - checkConnected(); - return _dataAccessor; - } - - @Override - public ConfigAccessor getConfigAccessor() { - checkConnected(); - return _configAccessor; - } - - @Override - public String getClusterName() { - return _clusterName; - } - - @Override - public String getInstanceName() { - return _instanceName; - } - - @Override - public String getSessionId() { - checkConnected(); - return _sessionId; - } - - @Override - public long getLastNotificationTime() { - // TODO Auto-generated method stub - return 0; - } - - @Override - public HelixAdmin getClusterManagmentTool() { - checkConnected(); - if (_zkclient != null) { - return new ZKHelixAdmin(_zkclient); - } - - LOG.error("Couldn't get ZKClusterManagementTool because zkclient is null"); - return null; - } - - @Override - public synchronized ZkHelixPropertyStore<ZNRecord> getHelixPropertyStore() { - checkConnected(); - - if (_helixPropertyStore == null) { - String path = PropertyPathConfig.getPath(PropertyType.PROPERTYSTORE, _clusterName); - - _helixPropertyStore = - new ZkHelixPropertyStore<ZNRecord>(new ZkBaseDataAccessor<ZNRecord>(_zkclient), path, - null); - } - - return _helixPropertyStore; - } - - @Override - public ClusterMessagingService getMessagingService() { - // The caller can register message handler factories on messaging service before the - // helix manager is connected. Thus we do not check connected here - return _messagingService; - } - - @Override - public ParticipantHealthReportCollector getHealthReportCollector() { - // helix-participant will override this - return null; - } - - @Override - public InstanceType getInstanceType() { - return _instanceType; - } - - @Override - public String getVersion() { - return _version; - } - - @Override - public HelixManagerProperties getProperties() { - return _properties; - } - - @Override - public StateMachineEngine getStateMachineEngine() { - // helix-participant will override this - return null; - } - - @Override - public abstract boolean isLeader(); - - @Override - public void startTimerTasks() { - for (HelixTimerTask task : _timerTasks) { - task.start(); - } - - } - - @Override - public void stopTimerTasks() { - for (HelixTimerTask task : _timerTasks) { - task.stop(); - } - - } - - @Override - public void addPreConnectCallback(PreConnectCallback callback) { - LOG.info("Adding preconnect callback: " + callback); - _preConnectCallbacks.add(callback); - } - - @Override - public void setLiveInstanceInfoProvider(LiveInstanceInfoProvider liveInstanceInfoProvider) { - _liveInstanceInfoProvider = liveInstanceInfoProvider; - } - - /** - * wait until we get a non-zero session-id. note that we might lose zkconnection - * right after we read session-id. but it's ok to get stale session-id and we will have - * another handle-new-session callback to correct this. - */ - protected void waitUntilConnected() { - boolean isConnected; - do { - isConnected = - _zkclient.waitUntilConnected(ZkClient.DEFAULT_CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS); - if (!isConnected) { - LOG.error("fail to connect zkserver: " + _zkAddress + " in " - + ZkClient.DEFAULT_CONNECTION_TIMEOUT + "ms. expiredSessionId: " + _sessionId - + ", clusterName: " + _clusterName); - continue; - } - - ZkConnection zkConnection = ((ZkConnection) _zkclient.getConnection()); - _sessionId = Long.toHexString(zkConnection.getZookeeper().getSessionId()); - - /** - * at the time we read session-id, zkconnection might be lost again - * wait until we get a non-zero session-id - */ - } while ("0".equals(_sessionId)); - - LOG.info("Handling new session, session id: " + _sessionId + ", instance: " + _instanceName - + ", instanceTye: " + _instanceType + ", cluster: " + _clusterName + ", zkconnection: " - + ((ZkConnection) _zkclient.getConnection()).getZookeeper()); - } - - protected void checkConnected() { - if (!isConnected()) { - throw new HelixException("ClusterManager not connected. Call clusterManager.connect()"); - } - } - - protected void addListener(Object listener, PropertyKey propertyKey, ChangeType changeType, - EventType[] eventType) { - checkConnected(); - - PropertyType type = propertyKey.getType(); - - synchronized (this) { - for (CallbackHandler handler : _handlers) { - // compare property-key path and listener reference - if (handler.getPath().equals(propertyKey.getPath()) - && handler.getListener().equals(listener)) { - LOG.info("Listener: " + listener + " on path: " + propertyKey.getPath() - + " already exists. skip add"); - - return; - } - } - - CallbackHandler newHandler = - new CallbackHandler(this, _zkclient, propertyKey, listener, eventType, changeType); - - _handlers.add(newHandler); - LOG.info("Added listener: " + listener + " for type: " + type + " to path: " - + newHandler.getPath()); - } - } - - protected void initHandlers(List<CallbackHandler> handlers) { - synchronized (this) { - if (handlers != null) { - for (CallbackHandler handler : handlers) { - handler.init(); - LOG.info("init handler: " + handler.getPath() + ", " + handler.getListener()); - } - } - } - } - - protected void resetHandlers() { - synchronized (this) { - if (_handlers != null) { - // get a copy of the list and iterate over the copy list - // in case handler.reset() modify the original handler list - List<CallbackHandler> tmpHandlers = new ArrayList<CallbackHandler>(); - tmpHandlers.addAll(_handlers); - - for (CallbackHandler handler : tmpHandlers) { - handler.reset(); - LOG.info("reset handler: " + handler.getPath() + ", " + handler.getListener()); - } - } - } - } - - /** - * different helix-manager may override this to have a cache-enabled based-data-accessor - * @param baseDataAccessor - * @return - */ - BaseDataAccessor<ZNRecord> createBaseDataAccessor(ZkBaseDataAccessor<ZNRecord> baseDataAccessor) { - return baseDataAccessor; - } - - void createClient() throws Exception { - PathBasedZkSerializer zkSerializer = - ChainedPathZkSerializer.builder(new ZNRecordStreamingSerializer()).build(); - - _zkclient = - new ZkClient(_zkAddress, _sessionTimeout, ZkClient.DEFAULT_CONNECTION_TIMEOUT, zkSerializer); - - ZkBaseDataAccessor<ZNRecord> baseDataAccessor = new ZkBaseDataAccessor<ZNRecord>(_zkclient); - - _baseDataAccessor = createBaseDataAccessor(baseDataAccessor); - - _dataAccessor = new ZKHelixDataAccessor(_clusterName, _instanceType, _baseDataAccessor); - _configAccessor = new ConfigAccessor(_zkclient); - - int retryCount = 0; - - _zkclient.subscribeStateChanges(this); - while (retryCount < 3) { - try { - _zkclient.waitUntilConnected(_sessionTimeout, TimeUnit.MILLISECONDS); - handleStateChanged(KeeperState.SyncConnected); - handleNewSession(); - break; - } catch (HelixException e) { - LOG.error("fail to createClient.", e); - throw e; - } catch (Exception e) { - retryCount++; - - LOG.error("fail to createClient. retry " + retryCount, e); - if (retryCount == 3) { - throw e; - } - } - } - } - - // TODO separate out flapping detection code - @Override - public void handleStateChanged(KeeperState state) throws Exception { - switch (state) { - case SyncConnected: - ZkConnection zkConnection = (ZkConnection) _zkclient.getConnection(); - LOG.info("KeeperState: " + state + ", zookeeper:" + zkConnection.getZookeeper()); - break; - case Disconnected: - LOG.info("KeeperState:" + state + ", disconnectedSessionId: " + _sessionId + ", instance: " - + _instanceName + ", type: " + _instanceType); - - /** - * Track the time stamp that the disconnected happens, then check history and see if - * we should disconnect the helix-manager - */ - _disconnectTimeHistory.add(System.currentTimeMillis()); - if (isFlapping()) { - LOG.error("instanceName: " + _instanceName + " is flapping. diconnect it. " - + " maxDisconnectThreshold: " + _maxDisconnectThreshold + " disconnects in " - + _flappingTimeWindowMs + "ms."); - disconnect(); - } - break; - case Expired: - LOG.info("KeeperState:" + state + ", expiredSessionId: " + _sessionId + ", instance: " - + _instanceName + ", type: " + _instanceType); - break; - default: - break; - } - } - - /** - * If zk state has changed into Disconnected for _maxDisconnectThreshold times during previous - * _timeWindowLengthMs Ms - * time window, we think that there are something wrong going on and disconnect the zkHelixManager - * from zk. - */ - private boolean isFlapping() { - if (_disconnectTimeHistory.size() == 0) { - return false; - } - long mostRecentTimestamp = _disconnectTimeHistory.get(_disconnectTimeHistory.size() - 1); - - // Remove disconnect history timestamp that are older than _flappingTimeWindowMs ago - while ((_disconnectTimeHistory.get(0) + _flappingTimeWindowMs) < mostRecentTimestamp) { - _disconnectTimeHistory.remove(0); - } - return _disconnectTimeHistory.size() > _maxDisconnectThreshold; - } - - /** - * controller should override it to return a list of timers that need to start/stop when - * leadership changes - * @return - */ - protected List<HelixTimerTask> getControllerHelixTimerTasks() { - return null; - } -}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/579baa5b/helix-core/src/main/java/org/apache/helix/manager/zk/ControllerManager.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ControllerManager.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ControllerManager.java deleted file mode 100644 index 1ed6dea..0000000 --- a/helix-core/src/main/java/org/apache/helix/manager/zk/ControllerManager.java +++ /dev/null @@ -1,175 +0,0 @@ -package org.apache.helix.manager.zk; - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.Timer; - -import org.apache.helix.BaseDataAccessor; -import org.apache.helix.HelixTimerTask; -import org.apache.helix.InstanceType; -import org.apache.helix.PropertyPathConfig; -import org.apache.helix.PropertyType; -import org.apache.helix.ZNRecord; -import org.apache.helix.HelixConstants.ChangeType; -import org.apache.helix.controller.GenericHelixController; -import org.apache.helix.healthcheck.HealthStatsAggregationTask; -import org.apache.helix.healthcheck.HealthStatsAggregator; -import org.apache.helix.model.LiveInstance; -import org.apache.helix.monitoring.ZKPathDataDumpTask; -import org.apache.log4j.Logger; -import org.apache.zookeeper.Watcher.Event.EventType; - -public class ControllerManager extends AbstractManager { - private static Logger LOG = Logger.getLogger(ControllerManager.class); - - final GenericHelixController _controller = new GenericHelixController(); - - // TODO merge into GenericHelixController - private CallbackHandler _leaderElectionHandler = null; - - /** - * status dump timer-task - */ - static class StatusDumpTask extends HelixTimerTask { - Timer _timer = null; - final ZkClient zkclient; - final AbstractManager helixController; - - public StatusDumpTask(ZkClient zkclient, AbstractManager helixController) { - this.zkclient = zkclient; - this.helixController = helixController; - } - - @Override - public void start() { - long initialDelay = 30 * 60 * 1000; - long period = 120 * 60 * 1000; - int timeThresholdNoChange = 180 * 60 * 1000; - - if (_timer == null) { - LOG.info("Start StatusDumpTask"); - _timer = new Timer("StatusDumpTimerTask", true); - _timer.scheduleAtFixedRate(new ZKPathDataDumpTask(helixController, zkclient, - timeThresholdNoChange), initialDelay, period); - } - - } - - @Override - public void stop() { - if (_timer != null) { - LOG.info("Stop StatusDumpTask"); - _timer.cancel(); - _timer = null; - } - } - } - - public ControllerManager(String zkAddress, String clusterName, String instanceName) { - super(zkAddress, clusterName, instanceName, InstanceType.CONTROLLER); - - _timerTasks.add(new HealthStatsAggregationTask(new HealthStatsAggregator(this))); - _timerTasks.add(new StatusDumpTask(_zkclient, this)); - } - - @Override - protected List<HelixTimerTask> getControllerHelixTimerTasks() { - return _timerTasks; - } - - @Override - public void handleNewSession() throws Exception { - waitUntilConnected(); - - /** - * reset all handlers, make sure cleanup completed for previous session - * disconnect if fail to cleanup - */ - if (_leaderElectionHandler != null) { - _leaderElectionHandler.reset(); - } - // TODO reset user defined handlers only - resetHandlers(); - - /** - * from here on, we are dealing with new session - */ - - if (_leaderElectionHandler != null) { - _leaderElectionHandler.init(); - } else { - _leaderElectionHandler = - new CallbackHandler(this, _zkclient, _keyBuilder.controller(), - new DistributedLeaderElection(this, _controller), new EventType[] { - EventType.NodeChildrenChanged, EventType.NodeDeleted, EventType.NodeCreated - }, ChangeType.CONTROLLER); - } - - /** - * init handlers - * ok to init message handler and controller handlers twice - * the second init will be skipped (see CallbackHandler) - */ - initHandlers(_handlers); - } - - @Override - void doDisconnect() { - if (_leaderElectionHandler != null) { - _leaderElectionHandler.reset(); - } - } - - @Override - public boolean isLeader() { - if (!isConnected()) { - return false; - } - - try { - LiveInstance leader = _dataAccessor.getProperty(_keyBuilder.controllerLeader()); - if (leader != null) { - String leaderName = leader.getInstanceName(); - String sessionId = leader.getSessionId(); - if (leaderName != null && leaderName.equals(_instanceName) && sessionId != null - && sessionId.equals(_sessionId)) { - return true; - } - } - } catch (Exception e) { - // log - } - return false; - } - - /** - * helix-controller uses a write-through cache for external-view - */ - @Override - BaseDataAccessor<ZNRecord> createBaseDataAccessor(ZkBaseDataAccessor<ZNRecord> baseDataAccessor) { - String extViewPath = PropertyPathConfig.getPath(PropertyType.EXTERNALVIEW, _clusterName); - return new ZkCacheBaseDataAccessor<ZNRecord>(baseDataAccessor, Arrays.asList(extViewPath)); - - } - -} http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/579baa5b/helix-core/src/main/java/org/apache/helix/manager/zk/ControllerManagerHelper.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ControllerManagerHelper.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ControllerManagerHelper.java index ff3a264..d2b520b 100644 --- a/helix-core/src/main/java/org/apache/helix/manager/zk/ControllerManagerHelper.java +++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ControllerManagerHelper.java @@ -36,14 +36,14 @@ import org.apache.log4j.Logger; public class ControllerManagerHelper { private static Logger LOG = Logger.getLogger(ControllerManagerHelper.class); - final AbstractManager _manager; + final HelixManager _manager; final DefaultMessagingService _messagingService; final List<HelixTimerTask> _controllerTimerTasks; - public ControllerManagerHelper(AbstractManager manager) { + public ControllerManagerHelper(HelixManager manager, List<HelixTimerTask> controllerTimerTasks) { _manager = manager; _messagingService = (DefaultMessagingService) manager.getMessagingService(); - _controllerTimerTasks = manager.getControllerHelixTimerTasks(); + _controllerTimerTasks = controllerTimerTasks; } public void addListenersToController(GenericHelixController controller) { http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/579baa5b/helix-core/src/main/java/org/apache/helix/manager/zk/DistributedControllerManager.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/DistributedControllerManager.java b/helix-core/src/main/java/org/apache/helix/manager/zk/DistributedControllerManager.java deleted file mode 100644 index c9ad0f3..0000000 --- a/helix-core/src/main/java/org/apache/helix/manager/zk/DistributedControllerManager.java +++ /dev/null @@ -1,190 +0,0 @@ -package org.apache.helix.manager.zk; - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -import java.util.ArrayList; -import java.util.List; - -import org.apache.helix.HelixException; -import org.apache.helix.HelixTimerTask; -import org.apache.helix.InstanceType; -import org.apache.helix.PreConnectCallback; -import org.apache.helix.HelixConstants.ChangeType; -import org.apache.helix.controller.GenericHelixController; -import org.apache.helix.healthcheck.HealthStatsAggregationTask; -import org.apache.helix.healthcheck.HealthStatsAggregator; -import org.apache.helix.healthcheck.ParticipantHealthReportCollector; -import org.apache.helix.healthcheck.ParticipantHealthReportCollectorImpl; -import org.apache.helix.healthcheck.ParticipantHealthReportTask; -import org.apache.helix.model.LiveInstance; -import org.apache.helix.participant.HelixStateMachineEngine; -import org.apache.helix.participant.StateMachineEngine; -import org.apache.log4j.Logger; -import org.apache.zookeeper.Watcher.Event.EventType; - -public class DistributedControllerManager extends AbstractManager { - private static Logger LOG = Logger.getLogger(DistributedControllerManager.class); - - final StateMachineEngine _stateMachineEngine; - final ParticipantHealthReportCollectorImpl _participantHealthInfoCollector; - - CallbackHandler _leaderElectionHandler = null; - final GenericHelixController _controller = new GenericHelixController(); - - /** - * hold timer tasks for controller only - * we need to add/remove controller timer tasks during handle new session - */ - final List<HelixTimerTask> _controllerTimerTasks = new ArrayList<HelixTimerTask>(); - - public DistributedControllerManager(String zkAddress, String clusterName, String instanceName) { - super(zkAddress, clusterName, instanceName, InstanceType.CONTROLLER_PARTICIPANT); - - _stateMachineEngine = new HelixStateMachineEngine(this); - _participantHealthInfoCollector = new ParticipantHealthReportCollectorImpl(this, _instanceName); - - _timerTasks.add(new ParticipantHealthReportTask(_participantHealthInfoCollector)); - - _controllerTimerTasks.add(new HealthStatsAggregationTask(new HealthStatsAggregator(this))); - _controllerTimerTasks.add(new ControllerManager.StatusDumpTask(_zkclient, this)); - - } - - @Override - public ParticipantHealthReportCollector getHealthReportCollector() { - checkConnected(); - return _participantHealthInfoCollector; - } - - @Override - public StateMachineEngine getStateMachineEngine() { - return _stateMachineEngine; - } - - @Override - protected List<HelixTimerTask> getControllerHelixTimerTasks() { - return _controllerTimerTasks; - } - - @Override - public void handleNewSession() throws Exception { - waitUntilConnected(); - - ParticipantManagerHelper participantHelper = - new ParticipantManagerHelper(this, _zkclient, _sessionTimeout); - - /** - * stop all timer tasks, reset all handlers, make sure cleanup completed for previous session - * disconnect if fail to cleanup - */ - stopTimerTasks(); - if (_leaderElectionHandler != null) { - _leaderElectionHandler.reset(); - } - resetHandlers(); - - /** - * clean up write-through cache - */ - _baseDataAccessor.reset(); - - /** - * from here on, we are dealing with new session - */ - if (!ZKUtil.isClusterSetup(_clusterName, _zkclient)) { - throw new HelixException("Cluster structure is not set up for cluster: " + _clusterName); - } - - /** - * auto-join - */ - participantHelper.joinCluster(); - - /** - * Invoke PreConnectCallbacks - */ - for (PreConnectCallback callback : _preConnectCallbacks) { - callback.onPreConnect(); - } - - participantHelper.createLiveInstance(); - - participantHelper.carryOverPreviousCurrentState(); - - participantHelper.setupMsgHandler(); - - /** - * leader election - */ - if (_leaderElectionHandler != null) { - _leaderElectionHandler.init(); - } else { - _leaderElectionHandler = - new CallbackHandler(this, _zkclient, _keyBuilder.controller(), - new DistributedLeaderElection(this, _controller), new EventType[] { - EventType.NodeChildrenChanged, EventType.NodeDeleted, EventType.NodeCreated - }, ChangeType.CONTROLLER); - } - - /** - * start health-check timer task - */ - participantHelper.createHealthCheckPath(); - startTimerTasks(); - - /** - * init handlers - * ok to init message handler, data-accessor, and controller handlers twice - * the second init will be skipped (see CallbackHandler) - */ - initHandlers(_handlers); - - } - - @Override - void doDisconnect() { - if (_leaderElectionHandler != null) { - _leaderElectionHandler.reset(); - } - } - - @Override - public boolean isLeader() { - if (!isConnected()) { - return false; - } - - try { - LiveInstance leader = _dataAccessor.getProperty(_keyBuilder.controllerLeader()); - if (leader != null) { - String leaderName = leader.getInstanceName(); - String sessionId = leader.getSessionId(); - if (leaderName != null && leaderName.equals(_instanceName) && sessionId != null - && sessionId.equals(_sessionId)) { - return true; - } - } - } catch (Exception e) { - // log - } - return false; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/579baa5b/helix-core/src/main/java/org/apache/helix/manager/zk/DistributedLeaderElection.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/DistributedLeaderElection.java b/helix-core/src/main/java/org/apache/helix/manager/zk/DistributedLeaderElection.java index 0ab8342..6a6d296 100644 --- a/helix-core/src/main/java/org/apache/helix/manager/zk/DistributedLeaderElection.java +++ b/helix-core/src/main/java/org/apache/helix/manager/zk/DistributedLeaderElection.java @@ -20,10 +20,12 @@ package org.apache.helix.manager.zk; */ import java.lang.management.ManagementFactory; +import java.util.List; import org.apache.helix.ControllerChangeListener; import org.apache.helix.HelixDataAccessor; import org.apache.helix.HelixManager; +import org.apache.helix.HelixTimerTask; import org.apache.helix.InstanceType; import org.apache.helix.NotificationContext; import org.apache.helix.PropertyType; @@ -40,12 +42,15 @@ import org.apache.log4j.Logger; public class DistributedLeaderElection implements ControllerChangeListener { private static Logger LOG = Logger.getLogger(DistributedLeaderElection.class); - final AbstractManager _manager; + final HelixManager _manager; final GenericHelixController _controller; + final List<HelixTimerTask> _controllerTimerTasks; - public DistributedLeaderElection(AbstractManager manager, GenericHelixController controller) { + public DistributedLeaderElection(HelixManager manager, GenericHelixController controller, + List<HelixTimerTask> controllerTimerTasks) { _manager = manager; _controller = controller; + _controllerTimerTasks = controllerTimerTasks; } /** @@ -68,7 +73,8 @@ public class DistributedLeaderElection implements ControllerChangeListener { return; } - ControllerManagerHelper controllerHelper = new ControllerManagerHelper(_manager); + ControllerManagerHelper controllerHelper = + new ControllerManagerHelper(_manager, _controllerTimerTasks); try { if (changeContext.getType().equals(NotificationContext.Type.INIT) || changeContext.getType().equals(NotificationContext.Type.CALLBACK)) { @@ -84,7 +90,7 @@ public class DistributedLeaderElection implements ControllerChangeListener { + _manager.getClusterName()); updateHistory(manager); - _manager._baseDataAccessor.reset(); + _manager.getHelixDataAccessor().getBaseDataAccessor().reset(); controllerHelper.addListenersToController(_controller); controllerHelper.startControllerTimerTasks(); } @@ -98,7 +104,7 @@ public class DistributedLeaderElection implements ControllerChangeListener { /** * clear write-through cache */ - _manager._baseDataAccessor.reset(); + _manager.getHelixDataAccessor().getBaseDataAccessor().reset(); } } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/579baa5b/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManager.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManager.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManager.java deleted file mode 100644 index 0af7e77..0000000 --- a/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManager.java +++ /dev/null @@ -1,155 +0,0 @@ -package org.apache.helix.manager.zk; - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; - -import org.apache.helix.BaseDataAccessor; -import org.apache.helix.HelixException; -import org.apache.helix.InstanceType; -import org.apache.helix.PreConnectCallback; -import org.apache.helix.PropertyPathConfig; -import org.apache.helix.PropertyType; -import org.apache.helix.ZNRecord; -import org.apache.helix.healthcheck.ParticipantHealthReportCollector; -import org.apache.helix.healthcheck.ParticipantHealthReportCollectorImpl; -import org.apache.helix.healthcheck.ParticipantHealthReportTask; -import org.apache.helix.participant.HelixStateMachineEngine; -import org.apache.helix.participant.StateMachineEngine; -import org.apache.log4j.Logger; - -public class ParticipantManager extends AbstractManager { - - private static Logger LOG = Logger.getLogger(ParticipantManager.class); - - /** - * state-transition message handler factory for helix-participant - */ - final StateMachineEngine _stateMachineEngine; - - final ParticipantHealthReportCollectorImpl _participantHealthInfoCollector; - - public ParticipantManager(String zkAddress, String clusterName, String instanceName) { - super(zkAddress, clusterName, instanceName, InstanceType.PARTICIPANT); - - _stateMachineEngine = new HelixStateMachineEngine(this); - _participantHealthInfoCollector = new ParticipantHealthReportCollectorImpl(this, _instanceName); - - _timerTasks.add(new ParticipantHealthReportTask(_participantHealthInfoCollector)); - } - - @Override - public ParticipantHealthReportCollector getHealthReportCollector() { - checkConnected(); - return _participantHealthInfoCollector; - } - - @Override - public StateMachineEngine getStateMachineEngine() { - return _stateMachineEngine; - } - - @Override - public void handleNewSession() { - waitUntilConnected(); - - /** - * stop timer tasks, reset all handlers, make sure cleanup completed for previous session - * disconnect if cleanup fails - */ - stopTimerTasks(); - resetHandlers(); - - /** - * clear write-through cache - */ - _baseDataAccessor.reset(); - - /** - * from here on, we are dealing with new session - */ - if (!ZKUtil.isClusterSetup(_clusterName, _zkclient)) { - throw new HelixException("Cluster structure is not set up for cluster: " + _clusterName); - } - - /** - * auto-join - */ - ParticipantManagerHelper participantHelper = - new ParticipantManagerHelper(this, _zkclient, _sessionTimeout); - participantHelper.joinCluster(); - - /** - * Invoke PreConnectCallbacks - */ - for (PreConnectCallback callback : _preConnectCallbacks) { - callback.onPreConnect(); - } - - participantHelper.createLiveInstance(); - - participantHelper.carryOverPreviousCurrentState(); - - /** - * setup message listener - */ - participantHelper.setupMsgHandler(); - - /** - * start health check timer task - */ - participantHelper.createHealthCheckPath(); - startTimerTasks(); - - /** - * init handlers - * ok to init message handler and data-accessor twice - * the second init will be skipped (see CallbackHandler) - */ - initHandlers(_handlers); - - } - - /** - * helix-participant uses a write-through cache for current-state - */ - @Override - BaseDataAccessor<ZNRecord> createBaseDataAccessor(ZkBaseDataAccessor<ZNRecord> baseDataAccessor) { - String curStatePath = - PropertyPathConfig.getPath(PropertyType.CURRENTSTATES, _clusterName, _instanceName); - return new ZkCacheBaseDataAccessor<ZNRecord>(baseDataAccessor, Arrays.asList(curStatePath)); - - } - - @Override - public boolean isLeader() { - return false; - } - - /** - * disconnect logic for helix-participant - */ - @Override - void doDisconnect() { - // nothing for participant - } -} http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/579baa5b/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManagerHelper.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManagerHelper.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManagerHelper.java index 70dd592..e7f9efb 100644 --- a/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManagerHelper.java +++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManagerHelper.java @@ -31,6 +31,7 @@ import org.apache.helix.HelixDataAccessor; import org.apache.helix.HelixException; import org.apache.helix.HelixManager; import org.apache.helix.InstanceType; +import org.apache.helix.LiveInstanceInfoProvider; import org.apache.helix.PropertyKey; import org.apache.helix.ZNRecord; import org.apache.helix.messaging.DefaultMessagingService; @@ -55,7 +56,7 @@ public class ParticipantManagerHelper { private static Logger LOG = Logger.getLogger(ParticipantManagerHelper.class); final ZkClient _zkclient; - final AbstractManager _manager; + final HelixManager _manager; final PropertyKey.Builder _keyBuilder; final String _clusterName; final String _instanceName; @@ -67,8 +68,10 @@ public class ParticipantManagerHelper { final ZKHelixDataAccessor _dataAccessor; final DefaultMessagingService _messagingService; final StateMachineEngine _stateMachineEngine; + final LiveInstanceInfoProvider _liveInstanceInfoProvider; - public ParticipantManagerHelper(AbstractManager manager, ZkClient zkclient, int sessionTimeout) { + public ParticipantManagerHelper(HelixManager manager, ZkClient zkclient, int sessionTimeout, + LiveInstanceInfoProvider liveInstanceInfoProvider) { _zkclient = zkclient; _manager = manager; _clusterName = manager.getClusterName(); @@ -82,6 +85,7 @@ public class ParticipantManagerHelper { _dataAccessor = (ZKHelixDataAccessor) manager.getHelixDataAccessor(); _messagingService = (DefaultMessagingService) manager.getMessagingService(); _stateMachineEngine = manager.getStateMachineEngine(); + _liveInstanceInfoProvider = liveInstanceInfoProvider; } public void joinCluster() { @@ -92,8 +96,8 @@ public class ParticipantManagerHelper { new HelixConfigScopeBuilder(ConfigScopeProperty.CLUSTER).forCluster( _manager.getClusterName()).build(); autoJoin = - Boolean - .parseBoolean(_configAccessor.get(scope, HelixManager.ALLOW_PARTICIPANT_AUTO_JOIN)); + Boolean.parseBoolean(_configAccessor.get(scope, + ZKHelixManager.ALLOW_PARTICIPANT_AUTO_JOIN)); LOG.info("instance: " + _instanceName + " auto-joining " + _clusterName + " is " + autoJoin); } catch (Exception e) { // autoJoin is false @@ -128,6 +132,19 @@ public class ParticipantManagerHelper { liveInstance.setHelixVersion(_manager.getVersion()); liveInstance.setLiveInstance(ManagementFactory.getRuntimeMXBean().getName()); + // LiveInstanceInfoProvider liveInstanceInfoProvider = _manager._liveInstanceInfoProvider; + if (_liveInstanceInfoProvider != null) { + LOG.info("invoke liveInstanceInfoProvider"); + ZNRecord additionalLiveInstanceInfo = + _liveInstanceInfoProvider.getAdditionalLiveInstanceInfo(); + if (additionalLiveInstanceInfo != null) { + additionalLiveInstanceInfo.merge(liveInstance.getRecord()); + ZNRecord mergedLiveInstance = new ZNRecord(additionalLiveInstanceInfo, _instanceName); + liveInstance = new LiveInstance(mergedLiveInstance); + LOG.info("instanceName: " + _instanceName + ", mergedLiveInstance: " + liveInstance); + } + } + boolean retry; do { retry = false; @@ -250,7 +267,7 @@ public class ParticipantManagerHelper { } } - public void setupMsgHandler() { + public void setupMsgHandler() throws Exception { _messagingService.registerMessageHandlerFactory(MessageType.STATE_TRANSITION.toString(), _stateMachineEngine); _manager.addMessageListener(_messagingService.getExecutor(), _instanceName);
