Github user abhishekagarwal87 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1184#discussion_r55942812
  
    --- Diff: 
storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncProcessEvent.java ---
    @@ -0,0 +1,672 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.storm.daemon.supervisor;
    +
    +import org.apache.commons.io.FileUtils;
    +import org.apache.commons.lang.StringUtils;
    +import org.apache.storm.Config;
    +import org.apache.storm.container.cgroup.CgroupManager;
    +import org.apache.storm.generated.ExecutorInfo;
    +import org.apache.storm.generated.LSWorkerHeartbeat;
    +import org.apache.storm.generated.LocalAssignment;
    +import org.apache.storm.generated.WorkerResources;
    +import org.apache.storm.utils.ConfigUtils;
    +import org.apache.storm.utils.LocalState;
    +import org.apache.storm.utils.Time;
    +import org.apache.storm.utils.Utils;
    +import org.eclipse.jetty.util.ConcurrentHashSet;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +import org.yaml.snakeyaml.Yaml;
    +
    +import java.io.File;
    +import java.io.FileWriter;
    +import java.io.IOException;
    +import java.util.*;
    +
    +/**
    + * 1. to kill are those in allocated that are dead or disallowed 2. kill 
the ones that should be dead - read pids, kill -9 and individually remove file 
- rmr
    + * heartbeat dir, rmdir pid dir, rmdir id dir (catch exception and log) 3. 
of the rest, figure out what assignments aren't yet satisfied 4. generate new 
worker
    + * ids, write new "approved workers" to LS 5. create local dir for worker 
id 5. launch new workers (give worker-id, port, and supervisor-id) 6. wait for 
workers
    + * launch
    + */
    +public class SyncProcessEvent implements Runnable {
    +
    +    private static Logger LOG = 
LoggerFactory.getLogger(SyncProcessEvent.class);
    +
    +    private  LocalState localState;
    +    private  SupervisorData supervisorData;
    +    public static final ExecutorInfo SYSTEM_EXECUTOR_INFO = new 
ExecutorInfo(-1, -1);
    +
    +    private class ProcessExitCallback implements Utils.ExitCodeCallable {
    +        private final String logPrefix;
    +        private final String workerId;
    +
    +        public ProcessExitCallback(String logPrefix, String workerId) {
    +            this.logPrefix = logPrefix;
    +            this.workerId = workerId;
    +        }
    +
    +        @Override
    +        public Object call() throws Exception {
    +            return null;
    +        }
    +
    +        @Override
    +        public Object call(int exitCode) {
    +            LOG.info("{} exited with code: {}", logPrefix, exitCode);
    +            supervisorData.getDeadWorkers().add(workerId);
    +            return null;
    +        }
    +    }
    +
    +    public SyncProcessEvent(){
    +
    +    }
    +    public SyncProcessEvent(SupervisorData supervisorData) {
    +        init(supervisorData);
    +    }
    +
    +    //TODO: initData is intended to local supervisor, so we will remove 
them after porting worker.clj to java
    +    public void init(SupervisorData supervisorData){
    +        this.supervisorData = supervisorData;
    +        this.localState = supervisorData.getLocalState();
    +    }
    +
    +
    +    /**
    +     * 1. to kill are those in allocated that are dead or disallowed 2. 
kill the ones that should be dead - read pids, kill -9 and individually remove 
file -
    +     * rmr heartbeat dir, rmdir pid dir, rmdir id dir (catch exception and 
log) 3. of the rest, figure out what assignments aren't yet satisfied 4. 
generate new
    +     * worker ids, write new "approved workers" to LS 5. create local dir 
for worker id 5. launch new workers (give worker-id, port, and supervisor-id) 
6. wait
    +     * for workers launch
    +     */
    +    @Override
    +    public void run() {
    +        LOG.debug("Syncing processes");
    +        try {
    +            Map conf = supervisorData.getConf();
    +            Map<Integer, LocalAssignment> assignedExecutors = 
localState.getLocalAssignmentsMap();
    +
    +            if (assignedExecutors == null) {
    +                assignedExecutors = new HashMap<>();
    +            }
    +            int now = Time.currentTimeSecs();
    +
    +            Map<String, StateHeartbeat> localWorkerStats = 
getLocalWorkerStats(supervisorData, assignedExecutors, now);
    +
    +            Set<String> keeperWorkerIds = new HashSet<>();
    +            Set<Integer> keepPorts = new HashSet<>();
    +            for (Map.Entry<String, StateHeartbeat> entry : 
localWorkerStats.entrySet()) {
    +                StateHeartbeat stateHeartbeat = entry.getValue();
    +                if (stateHeartbeat.getState() == State.VALID) {
    +                    keeperWorkerIds.add(entry.getKey());
    +                    
keepPorts.add(stateHeartbeat.getHeartbeat().get_port());
    +                }
    +            }
    +            Map<Integer, LocalAssignment> reassignExecutors = 
getReassignExecutors(assignedExecutors, keepPorts);
    +            Map<Integer, String> newWorkerIds = new HashMap<>();
    +            for (Integer port : reassignExecutors.keySet()) {
    +                newWorkerIds.put(port, Utils.uuid());
    +            }
    +            LOG.debug("Syncing processes");
    +            LOG.debug("Assigned executors: {}", assignedExecutors);
    +            LOG.debug("Allocated: {}", localWorkerStats);
    +
    +            for (Map.Entry<String, StateHeartbeat> entry : 
localWorkerStats.entrySet()) {
    +                StateHeartbeat stateHeartbeat = entry.getValue();
    +                if (stateHeartbeat.getState() != State.VALID) {
    +                    LOG.info("Shutting down and clearing state for id {}, 
Current supervisor time: {}, State: {}, Heartbeat: {}", entry.getKey(), now,
    +                            stateHeartbeat.getState(), 
stateHeartbeat.getHeartbeat());
    +                    shutWorker(supervisorData, entry.getKey());
    +                }
    +            }
    +            // start new workers
    +            Map<String, Integer> newWorkerPortToIds = 
startNewWorkers(newWorkerIds, reassignExecutors);
    +
    +            Map<String, Integer> allWorkerPortToIds = new HashMap<>();
    +            Map<String, Integer> approvedWorkers = 
localState.getApprovedWorkers();
    +            for (String keeper : keeperWorkerIds) {
    +                allWorkerPortToIds.put(keeper, 
approvedWorkers.get(keeper));
    +            }
    +            allWorkerPortToIds.putAll(newWorkerPortToIds);
    +            localState.setApprovedWorkers(allWorkerPortToIds);
    +            waitForWorkersLaunch(conf, newWorkerPortToIds.keySet());
    +
    +        } catch (Exception e) {
    +            LOG.error("Failed Sync Process", e);
    +            throw Utils.wrapInRuntime(e);
    +        }
    +
    +    }
    +
    +    protected void waitForWorkersLaunch(Map conf, Set<String> workerIds) 
throws Exception {
    +        int startTime = Time.currentTimeSecs();
    +        int timeOut = (int) 
conf.get(Config.NIMBUS_SUPERVISOR_TIMEOUT_SECS);
    +        for (String workerId : workerIds) {
    +            LocalState localState = ConfigUtils.workerState(conf, 
workerId);
    +            while (true) {
    +                LSWorkerHeartbeat hb = localState.getWorkerHeartBeat();
    +                if (hb != null || (Time.currentTimeSecs() - startTime) > 
timeOut)
    +                    break;
    +                LOG.info("{} still hasn't started", workerId);
    +                Time.sleep(500);
    +            }
    +            if (localState.getWorkerHeartBeat() == null) {
    +                LOG.info("Worker {} failed to start", workerId);
    +            }
    +        }
    +    }
    +
    +    protected Map<Integer, LocalAssignment> 
getReassignExecutors(Map<Integer, LocalAssignment> assignExecutors, 
Set<Integer> keepPorts) {
    +        Map<Integer, LocalAssignment> reassignExecutors = new HashMap<>();
    +        reassignExecutors.putAll(assignExecutors);
    +        for (Integer port : keepPorts) {
    +            reassignExecutors.remove(port);
    +        }
    +        return reassignExecutors;
    +    }
    +    
    +    /**
    +     * Returns map from worker id to worker heartbeat. if the heartbeat is 
nil, then the worker is dead
    +     * 
    +     * @param assignedExecutors
    +     * @return
    +     * @throws Exception
    +     */
    +    public Map<String, StateHeartbeat> getLocalWorkerStats(SupervisorData 
supervisorData, Map<Integer, LocalAssignment> assignedExecutors, int now) 
throws Exception {
    +        Map<String, StateHeartbeat> workerIdHbstate = new HashMap<>();
    +        Map conf = supervisorData.getConf();
    +        LocalState localState = supervisorData.getLocalState();
    +        Map<String, LSWorkerHeartbeat> idToHeartbeat = 
SupervisorUtils.readWorkerHeartbeats(conf);
    +        Map<String, Integer> approvedWorkers = 
localState.getApprovedWorkers();
    +        Set<String> approvedIds = new HashSet<>();
    +        if (approvedWorkers != null) {
    +            approvedIds.addAll(approvedWorkers.keySet());
    +        }
    +        for (Map.Entry<String, LSWorkerHeartbeat> entry : 
idToHeartbeat.entrySet()) {
    +            String workerId = entry.getKey();
    +            LSWorkerHeartbeat whb = entry.getValue();
    +            State state;
    +            if (whb == null) {
    +                state = State.NOT_STARTED;
    +            } else if (!approvedIds.contains(workerId) || 
!matchesAssignment(whb, assignedExecutors)) {
    +                state = State.DISALLOWED;
    +            } else if (supervisorData.getDeadWorkers().contains(workerId)) 
{
    +                LOG.info("Worker Process {} has died", workerId);
    +                state = State.TIMED_OUT;
    +            } else if (SupervisorUtils.isWorkerHbTimedOut(now, whb, conf)) 
{
    +                state = State.TIMED_OUT;
    +            } else {
    +                state = State.VALID;
    +            }
    +            LOG.debug("Worker:{} state:{} WorkerHeartbeat:{} at supervisor 
time-secs {}", workerId, state, whb, now);
    +            workerIdHbstate.put(workerId, new StateHeartbeat(state, whb));
    +        }
    +        return workerIdHbstate;
    +    }
    +
    +    protected boolean matchesAssignment(LSWorkerHeartbeat whb, 
Map<Integer, LocalAssignment> assignedExecutors) {
    +        LocalAssignment localAssignment = 
assignedExecutors.get(whb.get_port());
    +        if (localAssignment == null || 
!localAssignment.get_topology_id().equals(whb.get_topology_id())) {
    +            return false;
    +        }
    +        List<ExecutorInfo> executorInfos = new ArrayList<>();
    +        executorInfos.addAll(whb.get_executors());
    +        // remove SYSTEM_EXECUTOR_ID
    +        executorInfos.remove(SYSTEM_EXECUTOR_INFO);
    +        List<ExecutorInfo> localExecuorInfos = 
localAssignment.get_executors();
    +
    +        if (localExecuorInfos.size() != executorInfos.size())
    +            return false;
    +
    +        for (ExecutorInfo executorInfo : localExecuorInfos){
    +            if (!localExecuorInfos.contains(executorInfo))
    +                return false;
    +        }
    +        return true;
    +    }
    +
    +    /**
    +     * launch a worker in local mode.
    +     */
    +    protected void launchWorker(SupervisorData supervisorData, String 
stormId, Long port, String workerId, WorkerResources resources) throws 
IOException {
    +        // port this function after porting worker to java
    +    }
    +
    +    protected String getWorkerClassPath(String stormJar, Map stormConf) {
    +        List<String> topoClasspath = new ArrayList<>();
    +        Object object = stormConf.get(Config.TOPOLOGY_CLASSPATH);
    +
    +        if (object instanceof List) {
    +            topoClasspath.addAll((List<String>) object);
    +        } else if (object instanceof String){
    +            topoClasspath.add((String)object);
    +        }else {
    +            //ignore
    +        }
    +        String classPath = Utils.workerClasspath();
    +        String classAddPath = Utils.addToClasspath(classPath, 
Arrays.asList(stormJar));
    +        return Utils.addToClasspath(classAddPath, topoClasspath);
    +    }
    +
    +    /**
    +     * "Generates runtime childopts by replacing keys with topology-id, 
worker-id, port, mem-onheap"
    +     * 
    +     * @param value
    +     * @param workerId
    +     * @param stormId
    +     * @param port
    +     * @param memOnheap
    +     */
    +    public List<String> substituteChildopts(Object value, String workerId, 
String stormId, Long port, int memOnheap) {
    +        List<String> rets = new ArrayList<>();
    --- End diff --
    
    clojure code would have returned null if value is null. Also default type 
was assumed to be string. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to