Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1642#discussion_r78993008
  
    --- Diff: 
storm-core/src/jvm/org/apache/storm/daemon/supervisor/Supervisor.java ---
    @@ -17,135 +17,231 @@
      */
     package org.apache.storm.daemon.supervisor;
     
    +import java.io.File;
    +import java.io.IOException;
    +import java.net.UnknownHostException;
    +import java.util.Collection;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.concurrent.Callable;
    +import java.util.concurrent.atomic.AtomicReference;
    +
     import org.apache.commons.io.FileUtils;
     import org.apache.storm.Config;
     import org.apache.storm.StormTimer;
    -import org.apache.storm.daemon.supervisor.timer.RunProfilerActions;
    +import org.apache.storm.cluster.ClusterStateContext;
    +import org.apache.storm.cluster.ClusterUtils;
    +import org.apache.storm.cluster.DaemonType;
    +import org.apache.storm.cluster.IStormClusterState;
    +import org.apache.storm.daemon.DaemonCommon;
     import org.apache.storm.daemon.supervisor.timer.SupervisorHealthCheck;
     import org.apache.storm.daemon.supervisor.timer.SupervisorHeartbeat;
     import org.apache.storm.daemon.supervisor.timer.UpdateBlobs;
    +import org.apache.storm.event.EventManager;
     import org.apache.storm.event.EventManagerImp;
    +import org.apache.storm.generated.LocalAssignment;
    +import org.apache.storm.localizer.AsyncLocalizer;
    +import org.apache.storm.localizer.ILocalizer;
     import org.apache.storm.localizer.Localizer;
     import org.apache.storm.messaging.IContext;
     import org.apache.storm.metric.StormMetricsRegistry;
     import org.apache.storm.scheduler.ISupervisor;
     import org.apache.storm.utils.ConfigUtils;
    +import org.apache.storm.utils.LocalState;
    +import org.apache.storm.utils.Time;
     import org.apache.storm.utils.Utils;
     import org.apache.storm.utils.VersionInfo;
    +import org.apache.zookeeper.data.ACL;
     import org.slf4j.Logger;
     import org.slf4j.LoggerFactory;
     
    -import java.io.File;
    -import java.io.InterruptedIOException;
    -import java.util.Collection;
    -import java.util.Map;
    -import java.util.Set;
    -import java.util.concurrent.Callable;
    -
    -public class Supervisor {
    +public class Supervisor implements DaemonCommon, AutoCloseable {
         private static final Logger LOG = 
LoggerFactory.getLogger(Supervisor.class);
    +    private final Map<String, Object> conf;
    +    private final IContext sharedContext;
    +    private volatile boolean active;
    +    private final ISupervisor iSupervisor;
    +    private final Utils.UptimeComputer upTime;
    +    private final String stormVersion;
    +    private final IStormClusterState stormClusterState;
    +    private final LocalState localState;
    +    private final String supervisorId;
    +    private final String assignmentId;
    +    private final String hostName;
    +    // used for reporting used ports when heartbeating
    +    private final AtomicReference<Map<Long, LocalAssignment>> 
currAssignment;
    +    private final StormTimer heartbeatTimer;
    +    private final StormTimer eventTimer;
    +    private final StormTimer blobUpdateTimer;
    +    private final Localizer localizer;
    +    private final ILocalizer asyncLocalizer;
    +    private EventManager eventManager;
    +    private ReadClusterState readState;
    +    
    +    private Supervisor(ISupervisor iSupervisor) throws IOException {
    +        this(Utils.readStormConfig(), null, iSupervisor);
    +    }
    +    
    +    public Supervisor(Map<String, Object> conf, IContext sharedContext, 
ISupervisor iSupervisor) throws IOException {
    +        this.conf = conf;
    +        this.iSupervisor = iSupervisor;
    +        this.active = true;
    +        this.upTime = Utils.makeUptimeComputer();
    +        this.stormVersion = VersionInfo.getVersion();
    +        this.sharedContext = sharedContext;
    +        
    +        iSupervisor.prepare(conf, 
ConfigUtils.supervisorIsupervisorDir(conf));
    +        
    +        List<ACL> acls = null;
    +        if (Utils.isZkAuthenticationConfiguredStormServer(conf)) {
    +            acls = SupervisorUtils.supervisorZkAcls();
    +        }
    +
    +        try {
    +            this.stormClusterState = 
ClusterUtils.mkStormClusterState(conf, acls, new 
ClusterStateContext(DaemonType.SUPERVISOR));
    +        } catch (Exception e) {
    +            LOG.error("supervisor can't create stormClusterState");
    +            throw Utils.wrapInRuntime(e);
    +        }
    +
    +        try {
    +            this.localState = ConfigUtils.supervisorState(conf);
    +            this.localizer = Utils.createLocalizer(conf, 
ConfigUtils.supervisorLocalDir(conf));
    +            this.asyncLocalizer = new AsyncLocalizer(conf, this.localizer);
    +        } catch (IOException e) {
    +            throw Utils.wrapInRuntime(e);
    +        }
    +        this.supervisorId = iSupervisor.getSupervisorId();
    +        this.assignmentId = iSupervisor.getAssignmentId();
    +
    +        try {
    +            this.hostName = Utils.hostname(conf);
    +        } catch (UnknownHostException e) {
    +            throw Utils.wrapInRuntime(e);
    +        }
    +
    +        this.currAssignment = new AtomicReference<Map<Long, 
LocalAssignment>>(new HashMap<Long,LocalAssignment>());
    +
    +        this.heartbeatTimer = new StormTimer(null, new 
DefaultUncaughtExceptionHandler());
    +
    +        this.eventTimer = new StormTimer(null, new 
DefaultUncaughtExceptionHandler());
    +
    +        this.blobUpdateTimer = new StormTimer("blob-update-timer", new 
DefaultUncaughtExceptionHandler());
    +    }
    +    
    +    public String getId() {
    +        return supervisorId;
    +    }
         
    -    private SyncProcessEvent localSyncProcess;
    +    IContext getSharedContext() {
    +        return sharedContext;
    +    }
    +
    +    public Map<String, Object> getConf() {
    +        return conf;
    +    }
    +
    +    public ISupervisor getiSupervisor() {
    +        return iSupervisor;
    +    }
    +
    +    public Utils.UptimeComputer getUpTime() {
    +        return upTime;
    +    }
    +
    +    public String getStormVersion() {
    +        return stormVersion;
    +    }
    +
    +    public IStormClusterState getStormClusterState() {
    +        return stormClusterState;
    +    }
     
    -    public void setLocalSyncProcess(SyncProcessEvent localSyncProcess) {
    -        this.localSyncProcess = localSyncProcess;
    +    LocalState getLocalState() {
    +        return localState;
         }
     
    +    public String getAssignmentId() {
    +        return assignmentId;
    +    }
    +
    +    public String getHostName() {
    +        return hostName;
    +    }
    +
    +    public AtomicReference<Map<Long, LocalAssignment>> getCurrAssignment() 
{
    +        return currAssignment;
    +    }
    +
    +    public Localizer getLocalizer() {
    +        return localizer;
    +    }
    +    
    +    ILocalizer getAsyncLocalizer() {
    +        return asyncLocalizer;
    +    }
    +    
    +    EventManager getEventManger() {
    +        return eventManager;
    +    }
    +    
         /**
    -     * in local state, supervisor stores who its current assignments are 
another thread launches events to restart any dead processes if necessary
    -     * 
    -     * @param conf
    -     * @param sharedContext
    -     * @param iSupervisor
    -     * @return
    -     * @throws Exception
    +     * Launch the supervisor
          */
    -    public SupervisorManager mkSupervisor(final Map conf, IContext 
sharedContext, ISupervisor iSupervisor) throws Exception {
    -        SupervisorManager supervisorManager = null;
    -        try {
    -            LOG.info("Starting Supervisor with conf {}", conf);
    -            iSupervisor.prepare(conf, 
ConfigUtils.supervisorIsupervisorDir(conf));
    -            String path = ConfigUtils.supervisorTmpDir(conf);
    -            FileUtils.cleanDirectory(new File(path));
    -
    -            final SupervisorData supervisorData = new SupervisorData(conf, 
sharedContext, iSupervisor);
    -            Localizer localizer = supervisorData.getLocalizer();
    -
    -            SupervisorHeartbeat hb = new SupervisorHeartbeat(conf, 
supervisorData);
    -            hb.run();
    -            // should synchronize supervisor so it doesn't launch anything 
after being down (optimization)
    -            Integer heartbeatFrequency = 
Utils.getInt(conf.get(Config.SUPERVISOR_HEARTBEAT_FREQUENCY_SECS));
    -            supervisorData.getHeartbeatTimer().scheduleRecurring(0, 
heartbeatFrequency, hb);
    -
    -            Set<String> downloadedStormIds = 
SupervisorUtils.readDownLoadedStormIds(conf);
    -            for (String stormId : downloadedStormIds) {
    -                SupervisorUtils.addBlobReferences(localizer, stormId, 
conf);
    -            }
    -            // do this after adding the references so we don't try to 
clean things being used
    -            localizer.startCleaner();
    +    public void launch() throws Exception {
    +        LOG.info("Starting Supervisor with conf {}", conf);
    +        String path = ConfigUtils.supervisorTmpDir(conf);
    +        FileUtils.cleanDirectory(new File(path));
     
    -            EventManagerImp syncSupEventManager = new 
EventManagerImp(false);
    -            EventManagerImp syncProcessManager = new 
EventManagerImp(false);
    -
    -            SyncProcessEvent syncProcessEvent = null;
    -            if (ConfigUtils.isLocalMode(conf)) {
    -                localSyncProcess.init(supervisorData);
    -                syncProcessEvent = localSyncProcess;
    -            } else {
    -                syncProcessEvent = new SyncProcessEvent(supervisorData);
    -            }
    +        Localizer localizer = getLocalizer();
     
    -            SyncSupervisorEvent syncSupervisorEvent = new 
SyncSupervisorEvent(supervisorData, syncProcessEvent, syncSupEventManager, 
syncProcessManager);
    -            UpdateBlobs updateBlobsThread = new 
UpdateBlobs(supervisorData);
    -            RunProfilerActions runProfilerActionThread = new 
RunProfilerActions(supervisorData);
    +        SupervisorHeartbeat hb = new SupervisorHeartbeat(conf, this);
    +        hb.run();
    +        // should synchronize supervisor so it doesn't launch anything 
after being down (optimization)
    +        Integer heartbeatFrequency = 
Utils.getInt(conf.get(Config.SUPERVISOR_HEARTBEAT_FREQUENCY_SECS));
    +        heartbeatTimer.scheduleRecurring(0, heartbeatFrequency, hb);
     
    -            if ((Boolean) conf.get(Config.SUPERVISOR_ENABLE)) {
    -                StormTimer eventTimer = supervisorData.getEventTimer();
    -                // This isn't strictly necessary, but it doesn't hurt and 
ensures that the machine stays up
    -                // to date even if callbacks don't all work exactly right
    -                eventTimer.scheduleRecurring(0, 10, new 
EventManagerPushCallback(syncSupervisorEvent, syncSupEventManager));
    +        this.eventManager = new EventManagerImp(false);
    +        this.readState = new ReadClusterState(this);
    +        
    +        Set<String> downloadedStormIds = 
SupervisorUtils.readDownLoadedStormIds(conf);
    --- End diff --
    
    Nitpick: Download isn't two words


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to