Github user harshach commented on a diff in the pull request: https://github.com/apache/storm/pull/1642#discussion_r76274105 --- Diff: storm-core/src/jvm/org/apache/storm/daemon/supervisor/Supervisor.java --- @@ -17,135 +17,541 @@ */ package org.apache.storm.daemon.supervisor; +import java.io.File; +import java.io.IOException; +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; + import org.apache.commons.io.FileUtils; import org.apache.storm.Config; import org.apache.storm.StormTimer; -import org.apache.storm.daemon.supervisor.timer.RunProfilerActions; +import org.apache.storm.cluster.ClusterStateContext; +import org.apache.storm.cluster.ClusterUtils; +import org.apache.storm.cluster.DaemonType; +import org.apache.storm.cluster.IStateStorage; +import org.apache.storm.cluster.IStormClusterState; +import org.apache.storm.daemon.DaemonCommon; +import org.apache.storm.daemon.supervisor.Slot.MachineState; +import org.apache.storm.daemon.supervisor.Slot.TopoProfileAction; import org.apache.storm.daemon.supervisor.timer.SupervisorHealthCheck; import org.apache.storm.daemon.supervisor.timer.SupervisorHeartbeat; import org.apache.storm.daemon.supervisor.timer.UpdateBlobs; +import org.apache.storm.event.EventManager; import org.apache.storm.event.EventManagerImp; +import org.apache.storm.generated.Assignment; +import org.apache.storm.generated.ExecutorInfo; +import org.apache.storm.generated.LocalAssignment; +import org.apache.storm.generated.NodeInfo; +import org.apache.storm.generated.ProfileRequest; +import org.apache.storm.generated.WorkerResources; +import org.apache.storm.localizer.AsyncLocalizer; +import org.apache.storm.localizer.ILocalizer; import org.apache.storm.localizer.Localizer; import org.apache.storm.messaging.IContext; import org.apache.storm.metric.StormMetricsRegistry; import org.apache.storm.scheduler.ISupervisor; import org.apache.storm.utils.ConfigUtils; +import org.apache.storm.utils.LocalState; +import org.apache.storm.utils.Time; import org.apache.storm.utils.Utils; import org.apache.storm.utils.VersionInfo; +import org.apache.zookeeper.data.ACL; +import org.eclipse.jetty.util.ConcurrentHashSet; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.File; -import java.io.InterruptedIOException; -import java.util.Collection; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.Callable; +public class Supervisor implements DaemonCommon, AutoCloseable { + + public class ReadStateThread implements Runnable, AutoCloseable { + private final Map<String, Object> superConf; + private final IStormClusterState stormClusterState; + private final EventManager syncSupEventManager; + private final AtomicReference<Map<String, Map<String, Object>>> assignmentVersions; + private final Map<Integer, Slot> slots = new HashMap<>(); + private final AtomicInteger readRetry = new AtomicInteger(0); + private final String assignmentId; + private final ISupervisor iSuper; + private final ILocalizer localizer; + private final ContainerLauncher launcher; + private final String host; + private final LocalState localState; + private final IStormClusterState clusterState; + private final AtomicReference<Map<Long, LocalAssignment>> cachedAssignments; + + public ReadStateThread(EventManager syncSupEventManager) throws Exception { + this(getConf(), getStormClusterState(), syncSupEventManager, + getAssignmentId(), getiSupervisor(), + getAsyncLocalizer(), getHostName(), + getLocalState(), getStormClusterState(), + getCurrAssignment(), getSharedContext()); + } + + public ReadStateThread(Map<String, Object> superConf, IStormClusterState stormClusterState, + EventManager syncSupEventManager, String assignmentId, ISupervisor iSuper, + ILocalizer localizer, String host, LocalState localState, + IStormClusterState clusterState, AtomicReference<Map<Long, LocalAssignment>> cachedAssignments, + IContext sharedContext) throws Exception{ + this.superConf = superConf; + this.stormClusterState = stormClusterState; + this.syncSupEventManager = syncSupEventManager; + this.assignmentVersions = new AtomicReference<Map<String, Map<String, Object>>>(new HashMap<String, Map<String, Object>>()); + this.assignmentId = assignmentId; + this.iSuper = iSuper; + this.localizer = localizer; + this.host = host; + this.localState = localState; + this.clusterState = clusterState; + this.cachedAssignments = cachedAssignments; + + this.launcher = ContainerLauncher.mk(superConf, assignmentId, sharedContext); + + @SuppressWarnings("unchecked") + List<Number> ports = (List<Number>)superConf.get(Config.SUPERVISOR_SLOTS_PORTS); + for (Number port: ports) { + slots.put(port.intValue(), mkSlot(port.intValue())); + } + } + + private Slot mkSlot(int port) throws Exception { + Slot slot = new Slot(localizer, superConf, launcher, host, port, + localState, clusterState, iSuper, cachedAssignments); + slot.start(); + return slot; + } + + @Override + public synchronized void run() { + try { + Runnable syncCallback = new EventManagerPushCallback(this, syncSupEventManager); + List<String> stormIds = stormClusterState.assignments(syncCallback); + Map<String, Map<String, Object>> assignmentsSnapshot = + getAssignmentsSnapshot(stormClusterState, stormIds, assignmentVersions.get(), syncCallback); + + Map<Integer, LocalAssignment> allAssignments = + readAssignments(assignmentsSnapshot, assignmentId, readRetry); + if (allAssignments == null) { + //Something odd happened try again later + return; + } + Map<String, List<ProfileRequest>> topoIdToProfilerActions = getProfileActions(stormClusterState, stormIds); + + HashSet<Integer> assignedPorts = new HashSet<>(); + LOG.debug("Synchronizing supervisor"); + LOG.debug("All assignment: {}", allAssignments); + LOG.debug("Topology Ids -> Profiler Actions {}", topoIdToProfilerActions); + for (Integer port: allAssignments.keySet()) { + if (iSuper.confirmAssigned(port)) { + assignedPorts.add(port); + } + } + HashSet<Integer> allPorts = new HashSet<>(assignedPorts); + allPorts.addAll(slots.keySet()); + + Map<Integer, Set<TopoProfileAction>> filtered = new HashMap<>(); + for (Entry<String, List<ProfileRequest>> entry: topoIdToProfilerActions.entrySet()) { + String topoId = entry.getKey(); + if (entry.getValue() != null) { + for (ProfileRequest req: entry.getValue()) { + NodeInfo ni = req.get_nodeInfo(); + if (host.equals(ni.get_node())) { + Long port = ni.get_port().iterator().next(); + Set<TopoProfileAction> actions = filtered.get(port); + if (actions == null) { + actions = new HashSet<>(); + filtered.put(port.intValue(), actions); + } + actions.add(new TopoProfileAction(topoId, req)); + } + } + } + } + + for (Integer port: allPorts) { + Slot slot = slots.get(port); + if (slot == null) { + slot = mkSlot(port); + slots.put(port, slot); + } + slot.setNewAssignment(allAssignments.get(port)); + slot.addProfilerActions(filtered.get(port)); + } + + } catch (Exception e) { + LOG.error("Failed to Sync Supervisor", e); + throw new RuntimeException(e); + } + } + + protected Map<String, Map<String, Object>> getAssignmentsSnapshot(IStormClusterState stormClusterState, List<String> topoIds, + Map<String, Map<String, Object>> localAssignmentVersion, Runnable callback) throws Exception { + Map<String, Map<String, Object>> updateAssignmentVersion = new HashMap<>(); + for (String topoId : topoIds) { + Integer recordedVersion = -1; + Integer version = stormClusterState.assignmentVersion(topoId, callback); + if (localAssignmentVersion.containsKey(topoId) && localAssignmentVersion.get(topoId) != null) { + recordedVersion = (Integer) localAssignmentVersion.get(topoId).get(IStateStorage.VERSION); + } + if (version == null) { + // ignore + } else if (version == recordedVersion) { + updateAssignmentVersion.put(topoId, localAssignmentVersion.get(topoId)); + } else { + //TODO change this so we return an object not a map with magic keys + Map<String, Object> assignmentVersion = (Map<String, Object>) stormClusterState.assignmentInfoWithVersion(topoId, callback); + updateAssignmentVersion.put(topoId, assignmentVersion); + } + } + return updateAssignmentVersion; + } + + protected Map<String, List<ProfileRequest>> getProfileActions(IStormClusterState stormClusterState, List<String> stormIds) throws Exception { + Map<String, List<ProfileRequest>> ret = new HashMap<String, List<ProfileRequest>>(); + for (String stormId : stormIds) { + List<ProfileRequest> profileRequests = stormClusterState.getTopologyProfileRequests(stormId); + ret.put(stormId, profileRequests); + } + return ret; + } + + protected Map<Integer, LocalAssignment> readAssignments(Map<String, Map<String, Object>> assignmentsSnapshot, + String assignmentId, AtomicInteger retries) { + try { + Map<Integer, LocalAssignment> portLA = new HashMap<Integer, LocalAssignment>(); + for (Map.Entry<String, Map<String, Object>> assignEntry : assignmentsSnapshot.entrySet()) { + String topoId = assignEntry.getKey(); + Assignment assignment = (Assignment) assignEntry.getValue().get(IStateStorage.DATA); + + Map<Integer, LocalAssignment> portTasks = readMyExecutors(topoId, assignmentId, assignment); + + for (Map.Entry<Integer, LocalAssignment> entry : portTasks.entrySet()) { + + Integer port = entry.getKey(); + + LocalAssignment la = entry.getValue(); + + if (!portLA.containsKey(port)) { + portLA.put(port, la); + } else { + throw new RuntimeException("Should not have multiple topologys assigned to one port"); + } + } + } + retries.set(0); + return portLA; + } catch (RuntimeException e) { + if (retries.get() > 2) { --- End diff -- this should be config
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---