Github user srdo commented on a diff in the pull request: https://github.com/apache/storm/pull/1642#discussion_r78993008 --- Diff: storm-core/src/jvm/org/apache/storm/daemon/supervisor/Supervisor.java --- @@ -17,135 +17,231 @@ */ package org.apache.storm.daemon.supervisor; +import java.io.File; +import java.io.IOException; +import java.net.UnknownHostException; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.atomic.AtomicReference; + import org.apache.commons.io.FileUtils; import org.apache.storm.Config; import org.apache.storm.StormTimer; -import org.apache.storm.daemon.supervisor.timer.RunProfilerActions; +import org.apache.storm.cluster.ClusterStateContext; +import org.apache.storm.cluster.ClusterUtils; +import org.apache.storm.cluster.DaemonType; +import org.apache.storm.cluster.IStormClusterState; +import org.apache.storm.daemon.DaemonCommon; import org.apache.storm.daemon.supervisor.timer.SupervisorHealthCheck; import org.apache.storm.daemon.supervisor.timer.SupervisorHeartbeat; import org.apache.storm.daemon.supervisor.timer.UpdateBlobs; +import org.apache.storm.event.EventManager; import org.apache.storm.event.EventManagerImp; +import org.apache.storm.generated.LocalAssignment; +import org.apache.storm.localizer.AsyncLocalizer; +import org.apache.storm.localizer.ILocalizer; import org.apache.storm.localizer.Localizer; import org.apache.storm.messaging.IContext; import org.apache.storm.metric.StormMetricsRegistry; import org.apache.storm.scheduler.ISupervisor; import org.apache.storm.utils.ConfigUtils; +import org.apache.storm.utils.LocalState; +import org.apache.storm.utils.Time; import org.apache.storm.utils.Utils; import org.apache.storm.utils.VersionInfo; +import org.apache.zookeeper.data.ACL; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.File; -import java.io.InterruptedIOException; -import java.util.Collection; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.Callable; - -public class Supervisor { +public class Supervisor implements DaemonCommon, AutoCloseable { private static final Logger LOG = LoggerFactory.getLogger(Supervisor.class); + private final Map<String, Object> conf; + private final IContext sharedContext; + private volatile boolean active; + private final ISupervisor iSupervisor; + private final Utils.UptimeComputer upTime; + private final String stormVersion; + private final IStormClusterState stormClusterState; + private final LocalState localState; + private final String supervisorId; + private final String assignmentId; + private final String hostName; + // used for reporting used ports when heartbeating + private final AtomicReference<Map<Long, LocalAssignment>> currAssignment; + private final StormTimer heartbeatTimer; + private final StormTimer eventTimer; + private final StormTimer blobUpdateTimer; + private final Localizer localizer; + private final ILocalizer asyncLocalizer; + private EventManager eventManager; + private ReadClusterState readState; + + private Supervisor(ISupervisor iSupervisor) throws IOException { + this(Utils.readStormConfig(), null, iSupervisor); + } + + public Supervisor(Map<String, Object> conf, IContext sharedContext, ISupervisor iSupervisor) throws IOException { + this.conf = conf; + this.iSupervisor = iSupervisor; + this.active = true; + this.upTime = Utils.makeUptimeComputer(); + this.stormVersion = VersionInfo.getVersion(); + this.sharedContext = sharedContext; + + iSupervisor.prepare(conf, ConfigUtils.supervisorIsupervisorDir(conf)); + + List<ACL> acls = null; + if (Utils.isZkAuthenticationConfiguredStormServer(conf)) { + acls = SupervisorUtils.supervisorZkAcls(); + } + + try { + this.stormClusterState = ClusterUtils.mkStormClusterState(conf, acls, new ClusterStateContext(DaemonType.SUPERVISOR)); + } catch (Exception e) { + LOG.error("supervisor can't create stormClusterState"); + throw Utils.wrapInRuntime(e); + } + + try { + this.localState = ConfigUtils.supervisorState(conf); + this.localizer = Utils.createLocalizer(conf, ConfigUtils.supervisorLocalDir(conf)); + this.asyncLocalizer = new AsyncLocalizer(conf, this.localizer); + } catch (IOException e) { + throw Utils.wrapInRuntime(e); + } + this.supervisorId = iSupervisor.getSupervisorId(); + this.assignmentId = iSupervisor.getAssignmentId(); + + try { + this.hostName = Utils.hostname(conf); + } catch (UnknownHostException e) { + throw Utils.wrapInRuntime(e); + } + + this.currAssignment = new AtomicReference<Map<Long, LocalAssignment>>(new HashMap<Long,LocalAssignment>()); + + this.heartbeatTimer = new StormTimer(null, new DefaultUncaughtExceptionHandler()); + + this.eventTimer = new StormTimer(null, new DefaultUncaughtExceptionHandler()); + + this.blobUpdateTimer = new StormTimer("blob-update-timer", new DefaultUncaughtExceptionHandler()); + } + + public String getId() { + return supervisorId; + } - private SyncProcessEvent localSyncProcess; + IContext getSharedContext() { + return sharedContext; + } + + public Map<String, Object> getConf() { + return conf; + } + + public ISupervisor getiSupervisor() { + return iSupervisor; + } + + public Utils.UptimeComputer getUpTime() { + return upTime; + } + + public String getStormVersion() { + return stormVersion; + } + + public IStormClusterState getStormClusterState() { + return stormClusterState; + } - public void setLocalSyncProcess(SyncProcessEvent localSyncProcess) { - this.localSyncProcess = localSyncProcess; + LocalState getLocalState() { + return localState; } + public String getAssignmentId() { + return assignmentId; + } + + public String getHostName() { + return hostName; + } + + public AtomicReference<Map<Long, LocalAssignment>> getCurrAssignment() { + return currAssignment; + } + + public Localizer getLocalizer() { + return localizer; + } + + ILocalizer getAsyncLocalizer() { + return asyncLocalizer; + } + + EventManager getEventManger() { + return eventManager; + } + /** - * in local state, supervisor stores who its current assignments are another thread launches events to restart any dead processes if necessary - * - * @param conf - * @param sharedContext - * @param iSupervisor - * @return - * @throws Exception + * Launch the supervisor */ - public SupervisorManager mkSupervisor(final Map conf, IContext sharedContext, ISupervisor iSupervisor) throws Exception { - SupervisorManager supervisorManager = null; - try { - LOG.info("Starting Supervisor with conf {}", conf); - iSupervisor.prepare(conf, ConfigUtils.supervisorIsupervisorDir(conf)); - String path = ConfigUtils.supervisorTmpDir(conf); - FileUtils.cleanDirectory(new File(path)); - - final SupervisorData supervisorData = new SupervisorData(conf, sharedContext, iSupervisor); - Localizer localizer = supervisorData.getLocalizer(); - - SupervisorHeartbeat hb = new SupervisorHeartbeat(conf, supervisorData); - hb.run(); - // should synchronize supervisor so it doesn't launch anything after being down (optimization) - Integer heartbeatFrequency = Utils.getInt(conf.get(Config.SUPERVISOR_HEARTBEAT_FREQUENCY_SECS)); - supervisorData.getHeartbeatTimer().scheduleRecurring(0, heartbeatFrequency, hb); - - Set<String> downloadedStormIds = SupervisorUtils.readDownLoadedStormIds(conf); - for (String stormId : downloadedStormIds) { - SupervisorUtils.addBlobReferences(localizer, stormId, conf); - } - // do this after adding the references so we don't try to clean things being used - localizer.startCleaner(); + public void launch() throws Exception { + LOG.info("Starting Supervisor with conf {}", conf); + String path = ConfigUtils.supervisorTmpDir(conf); + FileUtils.cleanDirectory(new File(path)); - EventManagerImp syncSupEventManager = new EventManagerImp(false); - EventManagerImp syncProcessManager = new EventManagerImp(false); - - SyncProcessEvent syncProcessEvent = null; - if (ConfigUtils.isLocalMode(conf)) { - localSyncProcess.init(supervisorData); - syncProcessEvent = localSyncProcess; - } else { - syncProcessEvent = new SyncProcessEvent(supervisorData); - } + Localizer localizer = getLocalizer(); - SyncSupervisorEvent syncSupervisorEvent = new SyncSupervisorEvent(supervisorData, syncProcessEvent, syncSupEventManager, syncProcessManager); - UpdateBlobs updateBlobsThread = new UpdateBlobs(supervisorData); - RunProfilerActions runProfilerActionThread = new RunProfilerActions(supervisorData); + SupervisorHeartbeat hb = new SupervisorHeartbeat(conf, this); + hb.run(); + // should synchronize supervisor so it doesn't launch anything after being down (optimization) + Integer heartbeatFrequency = Utils.getInt(conf.get(Config.SUPERVISOR_HEARTBEAT_FREQUENCY_SECS)); + heartbeatTimer.scheduleRecurring(0, heartbeatFrequency, hb); - if ((Boolean) conf.get(Config.SUPERVISOR_ENABLE)) { - StormTimer eventTimer = supervisorData.getEventTimer(); - // This isn't strictly necessary, but it doesn't hurt and ensures that the machine stays up - // to date even if callbacks don't all work exactly right - eventTimer.scheduleRecurring(0, 10, new EventManagerPushCallback(syncSupervisorEvent, syncSupEventManager)); + this.eventManager = new EventManagerImp(false); + this.readState = new ReadClusterState(this); + + Set<String> downloadedStormIds = SupervisorUtils.readDownLoadedStormIds(conf); --- End diff -- Nitpick: Download isn't two words
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---