http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor/SyncSupervisorEvent.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor/SyncSupervisorEvent.java b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor/SyncSupervisorEvent.java index 32aa0f1..9ca1d8a 100644 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor/SyncSupervisorEvent.java +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor/SyncSupervisorEvent.java @@ -17,33 +17,11 @@ */ package com.alibaba.jstorm.daemon.supervisor; -import java.io.File; -import java.io.IOException; -import java.net.URL; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Set; -import java.util.UUID; -import java.util.concurrent.ConcurrentHashMap; - -import org.apache.commons.io.FileExistsException; -import org.apache.commons.io.FileUtils; -import org.apache.thrift.TException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import backtype.storm.Config; import backtype.storm.utils.LocalState; - import com.alibaba.jstorm.callback.RunnableCallback; import com.alibaba.jstorm.client.ConfigExtension; import com.alibaba.jstorm.cluster.Cluster; import com.alibaba.jstorm.cluster.Common; -import com.alibaba.jstorm.cluster.StormBase; import com.alibaba.jstorm.cluster.StormClusterState; import com.alibaba.jstorm.cluster.StormConfig; import com.alibaba.jstorm.daemon.worker.LocalAssignment; @@ -55,16 +33,26 @@ import com.alibaba.jstorm.utils.JStormServerUtils; import com.alibaba.jstorm.utils.JStormUtils; import com.alibaba.jstorm.utils.PathUtils; import com.alibaba.jstorm.utils.TimeUtils; +import org.apache.commons.io.FileExistsException; +import org.apache.commons.io.FileUtils; +import org.apache.thrift.TException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.net.URL; +import java.util.*; +import java.util.Map.Entry; /** - * supervisor SynchronizeSupervisor workflow (1) writer local assignment to - * LocalState (2) download new Assignment's topology (3) remove useless Topology - * (4) push one SyncProcessEvent to SyncProcessEvent's EventManager + * supervisor SynchronizeSupervisor workflow (1) writer local assignment to LocalState (2) download new Assignment's topology (3) remove useless Topology (4) + * push one SyncProcessEvent to SyncProcessEvent's EventManager + * @author Johnfang ([email protected]) */ class SyncSupervisorEvent extends RunnableCallback { - private static final Logger LOG = LoggerFactory - .getLogger(SyncSupervisorEvent.class); + private static final Logger LOG = LoggerFactory.getLogger(SyncSupervisorEvent.class); // private Supervisor supervisor; @@ -95,10 +83,8 @@ class SyncSupervisorEvent extends RunnableCallback { * @param localState * @param syncProcesses */ - public SyncSupervisorEvent(String supervisorId, Map conf, - EventManager processEventManager, EventManager syncSupEventManager, - StormClusterState stormClusterState, LocalState localState, - SyncProcessEvent syncProcesses, Heartbeat heartbeat) { + public SyncSupervisorEvent(String supervisorId, Map conf, EventManager processEventManager, EventManager syncSupEventManager, + StormClusterState stormClusterState, LocalState localState, SyncProcessEvent syncProcesses, Heartbeat heartbeat) { this.syncProcesses = syncProcesses; this.processEventManager = processEventManager; @@ -112,38 +98,30 @@ class SyncSupervisorEvent extends RunnableCallback { @Override public void run() { - LOG.debug("Synchronizing supervisor, interval seconds:" - + TimeUtils.time_delta(lastTime)); + LOG.debug("Synchronizing supervisor, interval seconds:" + TimeUtils.time_delta(lastTime)); lastTime = TimeUtils.current_time_secs(); try { - RunnableCallback syncCallback = - new EventManagerZkPusher(this, syncSupEventManager); + RunnableCallback syncCallback = new EventManagerZkPusher(this, syncSupEventManager); /** - * Step 1: get all assignments and register /ZK-dir/assignment and - * every assignment watch + * Step 1: get all assignments and register /ZK-dir/assignment and every assignment watch * */ - Map<String, Assignment> assignments = - Cluster.get_all_assignment(stormClusterState, syncCallback); + Map<String, Assignment> assignments = Cluster.get_all_assignment(stormClusterState, syncCallback); LOG.debug("Get all assignments " + assignments); /** - * Step 2: get topologyIds list from - * STORM-LOCAL-DIR/supervisor/stormdist/ + * Step 2: get topologyIds list from STORM-LOCAL-DIR/supervisor/stormdist/ */ - List<String> downloadedTopologyIds = - StormConfig.get_supervisor_toplogy_list(conf); + List<String> downloadedTopologyIds = StormConfig.get_supervisor_toplogy_list(conf); LOG.debug("Downloaded storm ids: " + downloadedTopologyIds); /** - * Step 3: get <port,LocalAssignments> from ZK local node's - * assignment + * Step 3: get <port,LocalAssignments> from ZK local node's assignment */ - Map<Integer, LocalAssignment> zkAssignment = - getLocalAssign(stormClusterState, supervisorId, assignments); + Map<Integer, LocalAssignment> zkAssignment = getLocalAssign(stormClusterState, supervisorId, assignments); Map<Integer, LocalAssignment> localAssignment; Set<String> updateTopologys; @@ -152,35 +130,31 @@ class SyncSupervisorEvent extends RunnableCallback { */ try { LOG.debug("Writing local assignment " + zkAssignment); - localAssignment = - (Map<Integer, LocalAssignment>) localState - .get(Common.LS_LOCAL_ASSIGNMENTS); + localAssignment = (Map<Integer, LocalAssignment>) localState.get(Common.LS_LOCAL_ASSIGNMENTS); if (localAssignment == null) { localAssignment = new HashMap<Integer, LocalAssignment>(); } localState.put(Common.LS_LOCAL_ASSIGNMENTS, zkAssignment); - updateTopologys = - getUpdateTopologys(localAssignment, zkAssignment); - Set<String> reDownloadTopologys = - getNeedReDownloadTopologys(localAssignment); + updateTopologys = getUpdateTopologys(localAssignment, zkAssignment, assignments); + Set<String> reDownloadTopologys = getNeedReDownloadTopologys(localAssignment); if (reDownloadTopologys != null) { updateTopologys.addAll(reDownloadTopologys); } } catch (IOException e) { - LOG.error("put LS_LOCAL_ASSIGNMENTS " + zkAssignment - + " of localState failed"); + LOG.error("put LS_LOCAL_ASSIGNMENTS " + zkAssignment + " of localState failed"); throw e; } /** * Step 5: download code from ZK */ - Map<String, String> topologyCodes = - getTopologyCodeLocations(assignments, supervisorId); + Map<String, String> topologyCodes = getTopologyCodeLocations(assignments, supervisorId); + + // downloadFailedTopologyIds which can't finished download binary from nimbus + Set<String> downloadFailedTopologyIds = new HashSet<String>(); - downloadTopology(topologyCodes, downloadedTopologyIds, - updateTopologys, assignments); + downloadTopology(topologyCodes, downloadedTopologyIds, updateTopologys, assignments, downloadFailedTopologyIds); /** * Step 6: remove any downloaded useless topology @@ -191,7 +165,7 @@ class SyncSupervisorEvent extends RunnableCallback { * Step 7: push syncProcesses Event */ // processEventManager.add(syncProcesses); - syncProcesses.run(zkAssignment); + syncProcesses.run(zkAssignment, downloadFailedTopologyIds); // If everything is OK, set the trigger to update heartbeat of // supervisor @@ -209,11 +183,9 @@ class SyncSupervisorEvent extends RunnableCallback { * @param conf * @param topologyId * @param masterCodeDir - * @param clusterMode * @throws IOException */ - private void downloadStormCode(Map conf, String topologyId, - String masterCodeDir) throws IOException, TException { + private void downloadStormCode(Map conf, String topologyId, String masterCodeDir) throws IOException, TException { String clusterMode = StormConfig.cluster_mode(conf); if (clusterMode.endsWith("distributed")) { @@ -224,17 +196,14 @@ class SyncSupervisorEvent extends RunnableCallback { } } - private void downloadLocalStormCode(Map conf, String topologyId, - String masterCodeDir) throws IOException, TException { + private void downloadLocalStormCode(Map conf, String topologyId, String masterCodeDir) throws IOException, TException { // STORM-LOCAL-DIR/supervisor/stormdist/storm-id - String stormroot = - StormConfig.supervisor_stormdist_root(conf, topologyId); + String stormroot = StormConfig.supervisor_stormdist_root(conf, topologyId); FileUtils.copyDirectory(new File(masterCodeDir), new File(stormroot)); - ClassLoader classloader = - Thread.currentThread().getContextClassLoader(); + ClassLoader classloader = Thread.currentThread().getContextClassLoader(); String resourcesJar = resourcesJar(); @@ -244,20 +213,16 @@ class SyncSupervisorEvent extends RunnableCallback { if (resourcesJar != null) { - LOG.info("Extracting resources from jar at " + resourcesJar - + " to " + targetDir); + LOG.info("Extracting resources from jar at " + resourcesJar + " to " + targetDir); - JStormUtils.extract_dir_from_jar(resourcesJar, - StormConfig.RESOURCES_SUBDIR, stormroot);// extract dir + JStormUtils.extract_dir_from_jar(resourcesJar, StormConfig.RESOURCES_SUBDIR, stormroot);// extract dir // from jar;; // util.clj } else if (url != null) { - LOG.info("Copying resources at " + url.toString() + " to " - + targetDir); + LOG.info("Copying resources at " + url.toString() + " to " + targetDir); - FileUtils.copyDirectory(new File(url.getFile()), (new File( - targetDir))); + FileUtils.copyDirectory(new File(url.getFile()), (new File(targetDir))); } } @@ -271,27 +236,21 @@ class SyncSupervisorEvent extends RunnableCallback { * @throws IOException * @throws TException */ - private void downloadDistributeStormCode(Map conf, String topologyId, - String masterCodeDir) throws IOException, TException { + private void downloadDistributeStormCode(Map conf, String topologyId, String masterCodeDir) throws IOException, TException { // STORM_LOCAL_DIR/supervisor/tmp/(UUID) - String tmproot = - StormConfig.supervisorTmpDir(conf) + File.separator - + UUID.randomUUID().toString(); + String tmproot = StormConfig.supervisorTmpDir(conf) + File.separator + UUID.randomUUID().toString(); // STORM_LOCAL_DIR/supervisor/stormdist/topologyId - String stormroot = - StormConfig.supervisor_stormdist_root(conf, topologyId); + String stormroot = StormConfig.supervisor_stormdist_root(conf, topologyId); - JStormServerUtils.downloadCodeFromMaster(conf, tmproot, masterCodeDir, - topologyId, true); + JStormServerUtils.downloadCodeFromMaster(conf, tmproot, masterCodeDir, topologyId, true); // tmproot/stormjar.jar String localFileJarTmp = StormConfig.stormjar_path(tmproot); // extract dir from jar - JStormUtils.extract_dir_from_jar(localFileJarTmp, - StormConfig.RESOURCES_SUBDIR, tmproot); + JStormUtils.extract_dir_from_jar(localFileJarTmp, StormConfig.RESOURCES_SUBDIR, tmproot); File srcDir = new File(tmproot); File destDir = new File(stormroot); @@ -325,8 +284,7 @@ class SyncSupervisorEvent extends RunnableCallback { List<String> rtn = new ArrayList<String>(); int size = jarPaths.size(); for (int i = 0; i < size; i++) { - if (JStormUtils.zipContainsDir(jarPaths.get(i), - StormConfig.RESOURCES_SUBDIR)) { + if (JStormUtils.zipContainsDir(jarPaths.get(i), StormConfig.RESOURCES_SUBDIR)) { rtn.add(jarPaths.get(i)); } } @@ -342,24 +300,19 @@ class SyncSupervisorEvent extends RunnableCallback { * * @param stormClusterState * @param supervisorId - * @param callback * @throws Exception * @returns map: {port,LocalAssignment} */ - private Map<Integer, LocalAssignment> getLocalAssign( - StormClusterState stormClusterState, String supervisorId, - Map<String, Assignment> assignments) throws Exception { + private Map<Integer, LocalAssignment> getLocalAssign(StormClusterState stormClusterState, String supervisorId, Map<String, Assignment> assignments) + throws Exception { - Map<Integer, LocalAssignment> portLA = - new HashMap<Integer, LocalAssignment>(); + Map<Integer, LocalAssignment> portLA = new HashMap<Integer, LocalAssignment>(); for (Entry<String, Assignment> assignEntry : assignments.entrySet()) { String topologyId = assignEntry.getKey(); Assignment assignment = assignEntry.getValue(); - Map<Integer, LocalAssignment> portTasks = - readMyTasks(stormClusterState, topologyId, supervisorId, - assignment); + Map<Integer, LocalAssignment> portTasks = readMyTasks(stormClusterState, topologyId, supervisorId, assignment); if (portTasks == null) { continue; } @@ -374,8 +327,7 @@ class SyncSupervisorEvent extends RunnableCallback { if (!portLA.containsKey(port)) { portLA.put(port, la); } else { - throw new RuntimeException( - "Should not have multiple topologys assigned to one port"); + throw new RuntimeException("Should not have multiple topologys assigned to one port"); } } } @@ -389,30 +341,27 @@ class SyncSupervisorEvent extends RunnableCallback { * @param stormClusterState * @param topologyId * @param supervisorId - * @param callback * @return Map: {port, LocalAssignment} * @throws Exception */ - private Map<Integer, LocalAssignment> readMyTasks( - StormClusterState stormClusterState, String topologyId, - String supervisorId, Assignment assignmenInfo) throws Exception { + private Map<Integer, LocalAssignment> readMyTasks(StormClusterState stormClusterState, String topologyId, String supervisorId, Assignment assignmentInfo) + throws Exception { - Map<Integer, LocalAssignment> portTasks = - new HashMap<Integer, LocalAssignment>(); + Map<Integer, LocalAssignment> portTasks = new HashMap<Integer, LocalAssignment>(); - Set<ResourceWorkerSlot> workers = assignmenInfo.getWorkers(); + Set<ResourceWorkerSlot> workers = assignmentInfo.getWorkers(); if (workers == null) { - LOG.error("No worker of assignement's " + assignmenInfo); + LOG.error("No worker of assignment's " + assignmentInfo); return portTasks; } for (ResourceWorkerSlot worker : workers) { if (!supervisorId.equals(worker.getNodeId())) continue; - portTasks.put(worker.getPort(), new LocalAssignment(topologyId, - worker.getTasks(), Common.topologyIdToName(topologyId), - worker.getMemSize(), worker.getCpu(), worker.getJvm(), - assignmenInfo.getTimeStamp())); + portTasks.put( + worker.getPort(), + new LocalAssignment(topologyId, worker.getTasks(), Common.topologyIdToName(topologyId), worker.getMemSize(), worker.getCpu(), worker + .getJvm(), assignmentInfo.getTimeStamp())); } return portTasks; @@ -421,14 +370,10 @@ class SyncSupervisorEvent extends RunnableCallback { /** * get mastercodedir for every topology * - * @param stormClusterState - * @param callback * @throws Exception * @returns Map: <topologyId, master-code-dir> from zookeeper */ - public static Map<String, String> getTopologyCodeLocations( - Map<String, Assignment> assignments, String supervisorId) - throws Exception { + public static Map<String, String> getTopologyCodeLocations(Map<String, Assignment> assignments, String supervisorId) throws Exception { Map<String, String> rtn = new HashMap<String, String>(); for (Entry<String, Assignment> entry : assignments.entrySet()) { @@ -448,9 +393,8 @@ class SyncSupervisorEvent extends RunnableCallback { return rtn; } - public void downloadTopology(Map<String, String> topologyCodes, - List<String> downloadedTopologyIds, Set<String> updateTopologys, - Map<String, Assignment> assignments) throws Exception { + public void downloadTopology(Map<String, String> topologyCodes, List<String> downloadedTopologyIds, Set<String> updateTopologys, + Map<String, Assignment> assignments, Set<String> downloadFailedTopologyIds) throws Exception { Set<String> downloadTopologys = new HashSet<String>(); @@ -459,38 +403,53 @@ class SyncSupervisorEvent extends RunnableCallback { String topologyId = entry.getKey(); String masterCodeDir = entry.getValue(); - if (!downloadedTopologyIds.contains(topologyId) - || updateTopologys.contains(topologyId)) { + if (!downloadedTopologyIds.contains(topologyId) || updateTopologys.contains(topologyId)) { - LOG.info("Downloading code for storm id " + topologyId - + " from " + masterCodeDir); + LOG.info("Downloading code for storm id " + topologyId + " from " + masterCodeDir); - try { - downloadStormCode(conf, topologyId, masterCodeDir); - // Update assignment timeStamp - StormConfig.write_supervisor_topology_timestamp(conf, - topologyId, assignments.get(topologyId) - .getTimeStamp()); - } catch (IOException e) { - LOG.error(e + " downloadStormCode failed " + "topologyId:" - + topologyId + "masterCodeDir:" + masterCodeDir); + int retry = 0; + while (retry < 3) { + try { + downloadStormCode(conf, topologyId, masterCodeDir); + // Update assignment timeStamp + StormConfig.write_supervisor_topology_timestamp(conf, topologyId, assignments.get(topologyId).getTimeStamp()); + break; + } catch (IOException e) { + LOG.error(e + " downloadStormCode failed " + "topologyId:" + topologyId + "masterCodeDir:" + masterCodeDir); - } catch (TException e) { - LOG.error(e + " downloadStormCode failed " + "topologyId:" - + topologyId + "masterCodeDir:" + masterCodeDir); + } catch (TException e) { + LOG.error(e + " downloadStormCode failed " + "topologyId:" + topologyId + "masterCodeDir:" + masterCodeDir); + } + retry++; + } + if (retry < 3) { + LOG.info("Finished downloading code for storm id " + topologyId + " from " + masterCodeDir); + downloadTopologys.add(topologyId); + } else { + LOG.error("Cann't download code for storm id " + topologyId + " from " + masterCodeDir); + downloadFailedTopologyIds.add(topologyId); } - LOG.info("Finished downloading code for storm id " + topologyId - + " from " + masterCodeDir); - downloadTopologys.add(topologyId); + } + } + // clear directory of topologyId is dangerous , so it only clear the topologyId which + // isn't contained by downloadedTopologyIds + for (String topologyId : downloadFailedTopologyIds) { + if (!downloadedTopologyIds.contains(topologyId)) { + try { + String stormroot = StormConfig.supervisor_stormdist_root(conf, topologyId); + File destDir = new File(stormroot); + FileUtils.deleteQuietly(destDir); + } catch (Exception e) { + LOG.error("Cann't clear directory about storm id " + topologyId + " on supervisor "); + } } } updateTaskCleanupTimeout(downloadTopologys); } - public void removeUselessTopology(Map<String, String> topologyCodes, - List<String> downloadedTopologyIds) { + public void removeUselessTopology(Map<String, String> topologyCodes, List<String> downloadedTopologyIds) { for (String topologyId : downloadedTopologyIds) { if (!topologyCodes.containsKey(topologyId)) { @@ -499,9 +458,7 @@ class SyncSupervisorEvent extends RunnableCallback { String path = null; try { - path = - StormConfig.supervisor_stormdist_root(conf, - topologyId); + path = StormConfig.supervisor_stormdist_root(conf, topologyId); PathUtils.rmr(path); } catch (IOException e) { String errMsg = "rmr the path:" + path + "failed\n"; @@ -511,13 +468,11 @@ class SyncSupervisorEvent extends RunnableCallback { } } - private Set<String> getUpdateTopologys( - Map<Integer, LocalAssignment> localAssignments, - Map<Integer, LocalAssignment> zkAssignments) { + private Set<String> getUpdateTopologys(Map<Integer, LocalAssignment> localAssignments, Map<Integer, LocalAssignment> zkAssignments, + Map<String, Assignment> assignments) { Set<String> ret = new HashSet<String>(); if (localAssignments != null && zkAssignments != null) { - for (Entry<Integer, LocalAssignment> entry : localAssignments - .entrySet()) { + for (Entry<Integer, LocalAssignment> entry : localAssignments.entrySet()) { Integer port = entry.getKey(); LocalAssignment localAssignment = entry.getValue(); @@ -526,14 +481,11 @@ class SyncSupervisorEvent extends RunnableCallback { if (localAssignment == null || zkAssignment == null) continue; - if (localAssignment.getTopologyId().equals( - zkAssignment.getTopologyId()) - && localAssignment.getTimeStamp() < zkAssignment - .getTimeStamp()) + Assignment assignment = assignments.get(localAssignment.getTopologyId()); + if (localAssignment.getTopologyId().equals(zkAssignment.getTopologyId()) && assignment != null + && assignment.isTopologyChange(localAssignment.getTimeStamp())) if (ret.add(localAssignment.getTopologyId())) { - LOG.info("Topology-" + localAssignment.getTopologyId() - + " has been updated. LocalTs=" - + localAssignment.getTimeStamp() + ", ZkTs=" + LOG.info("Topology-" + localAssignment.getTopologyId() + " has been updated. LocalTs=" + localAssignment.getTimeStamp() + ", ZkTs=" + zkAssignment.getTimeStamp()); } } @@ -542,49 +494,37 @@ class SyncSupervisorEvent extends RunnableCallback { return ret; } - private Set<String> getNeedReDownloadTopologys( - Map<Integer, LocalAssignment> localAssignment) { - Set<String> reDownloadTopologys = - syncProcesses.getTopologyIdNeedDownload().getAndSet(null); + private Set<String> getNeedReDownloadTopologys(Map<Integer, LocalAssignment> localAssignment) { + Set<String> reDownloadTopologys = syncProcesses.getTopologyIdNeedDownload().getAndSet(null); if (reDownloadTopologys == null || reDownloadTopologys.size() == 0) return null; Set<String> needRemoveTopologys = new HashSet<String>(); - Map<Integer, String> portToStartWorkerId = - syncProcesses.getPortToWorkerId(); - for (Entry<Integer, LocalAssignment> entry : localAssignment - .entrySet()) { + Map<Integer, String> portToStartWorkerId = syncProcesses.getPortToWorkerId(); + for (Entry<Integer, LocalAssignment> entry : localAssignment.entrySet()) { if (portToStartWorkerId.containsKey(entry.getKey())) needRemoveTopologys.add(entry.getValue().getTopologyId()); } - LOG.debug( - "worker is starting on these topology, so delay download topology binary: " - + needRemoveTopologys); + LOG.debug("worker is starting on these topology, so delay download topology binary: " + needRemoveTopologys); reDownloadTopologys.removeAll(needRemoveTopologys); if (reDownloadTopologys.size() > 0) - LOG.info("Following topologys is going to re-download the jars, " - + reDownloadTopologys); + LOG.info("Following topologys is going to re-download the jars, " + reDownloadTopologys); return reDownloadTopologys; } private void updateTaskCleanupTimeout(Set<String> topologys) { Map topologyConf = null; - Map<String, Integer> taskCleanupTimeouts = - new HashMap<String, Integer>(); + Map<String, Integer> taskCleanupTimeouts = new HashMap<String, Integer>(); for (String topologyId : topologys) { try { - topologyConf = - StormConfig.read_supervisor_topology_conf(conf, - topologyId); + topologyConf = StormConfig.read_supervisor_topology_conf(conf, topologyId); } catch (IOException e) { LOG.info("Failed to read conf for " + topologyId); } Integer cleanupTimeout = null; if (topologyConf != null) { - cleanupTimeout = - JStormUtils.parseInt(topologyConf - .get(ConfigExtension.TASK_CLEANUP_TIMEOUT_SEC)); + cleanupTimeout = JStormUtils.parseInt(topologyConf.get(ConfigExtension.TASK_CLEANUP_TIMEOUT_SEC)); } if (cleanupTimeout == null) { @@ -596,9 +536,7 @@ class SyncSupervisorEvent extends RunnableCallback { Map<String, Integer> localTaskCleanupTimeouts = null; try { - localTaskCleanupTimeouts = - (Map<String, Integer>) localState - .get(Common.LS_TASK_CLEANUP_TIMEOUT); + localTaskCleanupTimeouts = (Map<String, Integer>) localState.get(Common.LS_TASK_CLEANUP_TIMEOUT); } catch (IOException e) { LOG.error("Failed to read local task cleanup timeout map", e); } @@ -609,8 +547,7 @@ class SyncSupervisorEvent extends RunnableCallback { localTaskCleanupTimeouts.putAll(taskCleanupTimeouts); try { - localState.put(Common.LS_TASK_CLEANUP_TIMEOUT, - localTaskCleanupTimeouts); + localState.put(Common.LS_TASK_CLEANUP_TIMEOUT, localTaskCleanupTimeouts); } catch (IOException e) { LOG.error("Failed to write local task cleanup timeout map", e); }
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/BatchDrainerRunable.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/BatchDrainerRunable.java b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/BatchDrainerRunable.java index 81e4374..394c134 100755 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/BatchDrainerRunable.java +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/BatchDrainerRunable.java @@ -39,8 +39,7 @@ import com.alibaba.jstorm.utils.Pair; * */ public class BatchDrainerRunable extends DisruptorRunable { - private final static Logger LOG = LoggerFactory - .getLogger(BatchDrainerRunable.class); + private final static Logger LOG = LoggerFactory.getLogger(BatchDrainerRunable.class); public BatchDrainerRunable(WorkerData workerData) { super(workerData.getSendingQueue(), MetricDef.BATCH_DRAINER_THREAD); @@ -50,8 +49,7 @@ public class BatchDrainerRunable extends DisruptorRunable { @Override public void handleEvent(Object event, boolean endOfBatch) throws Exception { - Pair<IConnection, List<TaskMessage>> pair = - (Pair<IConnection, List<TaskMessage>>) event; + Pair<IConnection, List<TaskMessage>> pair = (Pair<IConnection, List<TaskMessage>>) event; pair.getFirst().send(pair.getSecond()); http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/ContextMaker.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/ContextMaker.java b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/ContextMaker.java index a260323..47e73b8 100755 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/ContextMaker.java +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/ContextMaker.java @@ -17,25 +17,23 @@ */ package com.alibaba.jstorm.daemon.worker; -import java.io.IOException; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Set; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import backtype.storm.generated.StormTopology; import backtype.storm.generated.StreamInfo; import backtype.storm.task.TopologyContext; import backtype.storm.tuple.Fields; import backtype.storm.utils.ThriftTopologyUtils; - import com.alibaba.jstorm.cluster.StormConfig; import com.alibaba.jstorm.utils.JStormUtils; import com.alibaba.jstorm.utils.PathUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; /** * ContextMaker This class is used to create TopologyContext @@ -56,8 +54,7 @@ public class ContextMaker { @SuppressWarnings("rawtypes") public ContextMaker(WorkerData workerData) { /* - * Map stormConf, String topologyId, String workerId, HashMap<Integer, - * String> tasksToComponent, Integer port, List<Integer> workerTasks + * Map stormConf, String topologyId, String workerId, HashMap<Integer, String> tasksToComponent, Integer port, List<Integer> workerTasks */ this.workerData = workerData; this.workerTasks = JStormUtils.mk_list(workerData.getTaskids()); @@ -67,12 +64,9 @@ public class ContextMaker { String topologyId = workerData.getTopologyId(); String workerId = workerData.getWorkerId(); - String distroot = - StormConfig - .supervisor_stormdist_root(stormConf, topologyId); + String distroot = StormConfig.supervisor_stormdist_root(stormConf, topologyId); - resourcePath = - StormConfig.supervisor_storm_resources_path(distroot); + resourcePath = StormConfig.supervisor_storm_resources_path(distroot); pidDir = StormConfig.worker_pids_root(stormConf, workerId); @@ -85,43 +79,32 @@ public class ContextMaker { } } - public TopologyContext makeTopologyContext(StormTopology topology, - Integer taskId, clojure.lang.Atom openOrPrepareWasCalled) { + public TopologyContext makeTopologyContext(StormTopology topology, Integer taskId, clojure.lang.Atom openOrPrepareWasCalled) { Map stormConf = workerData.getStormConf(); String topologyId = workerData.getTopologyId(); - HashMap<String, Map<String, Fields>> componentToStreamToFields = - new HashMap<String, Map<String, Fields>>(); + HashMap<String, Map<String, Fields>> componentToStreamToFields = new HashMap<String, Map<String, Fields>>(); Set<String> components = ThriftTopologyUtils.getComponentIds(topology); for (String component : components) { - Map<String, Fields> streamToFieldsMap = - new HashMap<String, Fields>(); + Map<String, Fields> streamToFieldsMap = new HashMap<String, Fields>(); - Map<String, StreamInfo> streamInfoMap = - ThriftTopologyUtils.getComponentCommon(topology, component) - .get_streams(); + Map<String, StreamInfo> streamInfoMap = ThriftTopologyUtils.getComponentCommon(topology, component).get_streams(); for (Entry<String, StreamInfo> entry : streamInfoMap.entrySet()) { String streamId = entry.getKey(); StreamInfo streamInfo = entry.getValue(); - streamToFieldsMap.put(streamId, - new Fields(streamInfo.get_output_fields())); + streamToFieldsMap.put(streamId, new Fields(streamInfo.get_output_fields())); } componentToStreamToFields.put(component, streamToFieldsMap); } - return new TopologyContext(topology, stormConf, - workerData.getTasksToComponent(), - workerData.getComponentToSortedTasks(), - componentToStreamToFields, topologyId, resourcePath, pidDir, - taskId, workerData.getPort(), workerTasks, - workerData.getDefaultResources(), - workerData.getUserResources(), workerData.getExecutorData(), - workerData.getRegisteredMetrics(), openOrPrepareWasCalled); + return new TopologyContext(topology, stormConf, workerData.getTasksToComponent(), workerData.getComponentToSortedTasks(), componentToStreamToFields, + topologyId, resourcePath, pidDir, taskId, workerData.getPort(), workerTasks, workerData.getDefaultResources(), workerData.getUserResources(), + workerData.getExecutorData(), workerData.getRegisteredMetrics(), openOrPrepareWasCalled, workerData.getZkCluster()); } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/DrainerRunable.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/DrainerRunable.java b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/DrainerRunable.java index 3477cc4..c19947a 100755 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/DrainerRunable.java +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/DrainerRunable.java @@ -46,8 +46,7 @@ import com.alibaba.jstorm.utils.Pair; * */ public class DrainerRunable extends DisruptorRunable { - private final static Logger LOG = LoggerFactory - .getLogger(DrainerRunable.class); + private final static Logger LOG = LoggerFactory.getLogger(DrainerRunable.class); private DisruptorQueue transferQueue; private ConcurrentHashMap<WorkerSlot, IConnection> nodeportSocket; @@ -92,8 +91,7 @@ public class DrainerRunable extends DisruptorRunable { if (conn.isClosed() == true) { // if connection has been closed, just skip the package - LOG.debug("Skip one tuple of " + taskId - + ", due to close connection of " + nodePort); + LOG.debug("Skip one tuple of " + taskId + ", due to close connection of " + nodePort); return; } @@ -113,11 +111,8 @@ public class DrainerRunable extends DisruptorRunable { } public void handleFinish() { - for (Entry<IConnection, List<TaskMessage>> entry : dispatchMap - .entrySet()) { - Pair<IConnection, List<TaskMessage>> pair = - new Pair<IConnection, List<TaskMessage>>(entry.getKey(), - entry.getValue()); + for (Entry<IConnection, List<TaskMessage>> entry : dispatchMap.entrySet()) { + Pair<IConnection, List<TaskMessage>> pair = new Pair<IConnection, List<TaskMessage>>(entry.getKey(), entry.getValue()); sendingQueue.publish(pair); } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/LocalAssignment.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/LocalAssignment.java b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/LocalAssignment.java index 312c57f..1221680 100755 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/LocalAssignment.java +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/LocalAssignment.java @@ -38,8 +38,7 @@ public class LocalAssignment implements Serializable { private String jvm; private long timeStamp; - public LocalAssignment(String topologyId, Set<Integer> taskIds, - String topologyName, long mem, int cpu, String jvm, long timeStamp) { + public LocalAssignment(String topologyId, Set<Integer> taskIds, String topologyName, long mem, int cpu, String jvm, long timeStamp) { this.topologyId = topologyId; this.taskIds = new HashSet<Integer>(taskIds); this.topologyName = topologyName; @@ -105,13 +104,8 @@ public class LocalAssignment implements Serializable { result = prime * result + ((jvm == null) ? 0 : jvm.hashCode()); result = prime * result + (int) (mem ^ (mem >>> 32)); result = prime * result + ((taskIds == null) ? 0 : taskIds.hashCode()); - result = - prime * result - + ((topologyId == null) ? 0 : topologyId.hashCode()); - result = - prime - * result - + ((topologyName == null) ? 0 : topologyName.hashCode()); + result = prime * result + ((topologyId == null) ? 0 : topologyId.hashCode()); + result = prime * result + ((topologyName == null) ? 0 : topologyName.hashCode()); result = prime * result + (int) (timeStamp & 0xffffffff); return result; } @@ -156,7 +150,6 @@ public class LocalAssignment implements Serializable { @Override public String toString() { - return ToStringBuilder.reflectionToString(this, - ToStringStyle.SHORT_PREFIX_STYLE); + return ToStringBuilder.reflectionToString(this, ToStringStyle.SHORT_PREFIX_STYLE); } } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/ProcessSimulator.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/ProcessSimulator.java b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/ProcessSimulator.java index 628e0f5..056b6f3 100755 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/ProcessSimulator.java +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/ProcessSimulator.java @@ -33,8 +33,7 @@ public class ProcessSimulator { * skip old function name: pid-counter */ - protected static ConcurrentHashMap<String, WorkerShutdown> processMap = - new ConcurrentHashMap<String, WorkerShutdown>(); + protected static ConcurrentHashMap<String, WorkerShutdown> processMap = new ConcurrentHashMap<String, WorkerShutdown>(); /** * Register process handler old function name: register-process http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/RefreshActive.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/RefreshActive.java b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/RefreshActive.java index 3f8acfc..bde8232 100755 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/RefreshActive.java +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/RefreshActive.java @@ -34,8 +34,7 @@ import com.alibaba.jstorm.task.TaskShutdownDameon; import com.alibaba.jstorm.utils.JStormUtils; /** - * Timely check whether topology is active or not and whether the metrics - * monitor is enable or disable from ZK + * Timely check whether topology is active or not and whether the metrics monitor is enable or disable from ZK * * @author yannian/Longda * @@ -63,9 +62,7 @@ public class RefreshActive extends RunnableCallback { this.conf = workerData.getStormConf(); this.zkCluster = workerData.getZkCluster(); this.topologyId = workerData.getTopologyId(); - this.frequence = - JStormUtils.parseInt(conf.get(Config.TASK_REFRESH_POLL_SECS), - 10); + this.frequence = JStormUtils.parseInt(conf.get(Config.TASK_REFRESH_POLL_SECS), 10); } @Override @@ -91,8 +88,7 @@ public class RefreshActive extends RunnableCallback { return; } - LOG.info("Old TopologyStatus:" + oldTopologyStatus - + ", new TopologyStatus:" + newTopologyStatus); + LOG.info("Old TopologyStatus:" + oldTopologyStatus + ", new TopologyStatus:" + newTopologyStatus); List<TaskShutdownDameon> tasks = workerData.getShutdownTasks(); if (tasks == null) { @@ -120,8 +116,7 @@ public class RefreshActive extends RunnableCallback { boolean newMonitorEnable = base.isEnableMonitor(); boolean oldMonitorEnable = monitorEnable.get(); if (newMonitorEnable != oldMonitorEnable) { - LOG.info("Change MonitorEnable from " + oldMonitorEnable - + " to " + newMonitorEnable); + LOG.info("Change MonitorEnable from " + oldMonitorEnable + " to " + newMonitorEnable); monitorEnable.set(newMonitorEnable); } } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/RefreshConnections.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/RefreshConnections.java b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/RefreshConnections.java index 48cc945..130985b 100644 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/RefreshConnections.java +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/RefreshConnections.java @@ -17,23 +17,10 @@ */ package com.alibaba.jstorm.daemon.worker; -import java.io.FileNotFoundException; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicBoolean; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import backtype.storm.Config; import backtype.storm.messaging.IConnection; import backtype.storm.messaging.IContext; import backtype.storm.scheduler.WorkerSlot; - import com.alibaba.jstorm.callback.RunnableCallback; import com.alibaba.jstorm.cluster.StormClusterState; import com.alibaba.jstorm.cluster.StormConfig; @@ -42,9 +29,15 @@ import com.alibaba.jstorm.schedule.Assignment.AssignmentType; import com.alibaba.jstorm.schedule.default_assign.ResourceWorkerSlot; import com.alibaba.jstorm.task.Task; import com.alibaba.jstorm.task.TaskShutdownDameon; -import com.alibaba.jstorm.task.heartbeat.TaskHeartbeat; import com.alibaba.jstorm.utils.JStormUtils; -import com.alibaba.jstorm.utils.TimeUtils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.FileNotFoundException; +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicBoolean; /** * @@ -56,8 +49,7 @@ import com.alibaba.jstorm.utils.TimeUtils; * */ public class RefreshConnections extends RunnableCallback { - private static Logger LOG = LoggerFactory - .getLogger(RefreshConnections.class); + private static Logger LOG = LoggerFactory.getLogger(RefreshConnections.class); private WorkerData workerData; @@ -102,13 +94,9 @@ public class RefreshConnections extends RunnableCallback { this.supervisorId = workerData.getSupervisorId(); // this.endpoint_socket_lock = endpoint_socket_lock; - frequence = - JStormUtils - .parseInt(conf.get(Config.TASK_REFRESH_POLL_SECS), 5); + frequence = JStormUtils.parseInt(conf.get(Config.TASK_REFRESH_POLL_SECS), 5); - taskTimeoutSecs = - JStormUtils.parseInt( - conf.get(Config.TASK_HEARTBEAT_FREQUENCY_SECS), 10); + taskTimeoutSecs = JStormUtils.parseInt(conf.get(Config.TASK_HEARTBEAT_FREQUENCY_SECS), 10); taskTimeoutSecs = taskTimeoutSecs * 3; } @@ -122,8 +110,7 @@ public class RefreshConnections extends RunnableCallback { // synchronized (this) { - Assignment assignment = - zkCluster.assignment_info(topologyId, this); + Assignment assignment = zkCluster.assignment_info(topologyId, this); if (assignment == null) { String errMsg = "Failed to get Assignment of " + topologyId; LOG.error(errMsg); @@ -137,47 +124,39 @@ public class RefreshConnections extends RunnableCallback { // updated. If so, the outbound // task map should be updated accordingly. try { - Long localAssignmentTS = - StormConfig.read_supervisor_topology_timestamp( - conf, topologyId); - if (localAssignmentTS.longValue() > workerData - .getAssignmentTs().longValue()) { + Long localAssignmentTS = StormConfig.read_supervisor_topology_timestamp(conf, topologyId); + if (localAssignmentTS.longValue() > workerData.getAssignmentTs().longValue()) { try { - if (assignment.getAssignmentType() == AssignmentType.Config) { + if (assignment.getAssignmentType() == AssignmentType.UpdateTopology) { LOG.info("Get config reload request for " + topologyId); // If config was updated, notify all tasks List<TaskShutdownDameon> taskShutdowns = workerData.getShutdownTasks(); Map newConf = StormConfig.read_supervisor_topology_conf(conf, topologyId); workerData.getStormConf().putAll(newConf); for (TaskShutdownDameon taskSD : taskShutdowns) { - taskSD.updateConf(newConf); + taskSD.update(newConf); } - workerData.setAssignmentType(AssignmentType.Config); + workerData.setAssignmentType(AssignmentType.UpdateTopology); } else { Set<Integer> addedTasks = getAddedTasks(assignment); - Set<Integer> removedTasks = - getRemovedTasks(assignment); - + Set<Integer> removedTasks = getRemovedTasks(assignment); + Set<Integer> updatedTasks = getUpdatedTasks(assignment); + workerData.updateWorkerData(assignment); - - if (removedTasks.size() > 0) - shutdownTasks(removedTasks); - if (addedTasks.size() > 0) - createTasks(addedTasks); - - Set<Integer> tmpOutboundTasks = - Worker.worker_output_tasks(workerData); + + shutdownTasks(removedTasks); + createTasks(addedTasks); + updateTasks(updatedTasks); + + Set<Integer> tmpOutboundTasks = Worker.worker_output_tasks(workerData); if (outboundTasks.equals(tmpOutboundTasks) == false) { for (int taskId : tmpOutboundTasks) { if (outboundTasks.contains(taskId) == false) - workerData - .addOutboundTaskStatusIfAbsent(taskId); + workerData.addOutboundTaskStatusIfAbsent(taskId); } - for (int taskId : workerData - .getOutboundTaskStatus().keySet()) { + for (int taskId : workerData.getOutboundTaskStatus().keySet()) { if (tmpOutboundTasks.contains(taskId) == false) { - workerData - .removeOutboundTaskStatus(taskId); + workerData.removeOutboundTaskStatus(taskId); } } workerData.setOutboundTasks(tmpOutboundTasks); @@ -196,23 +175,19 @@ public class RefreshConnections extends RunnableCallback { } } catch (FileNotFoundException e) { - LOG.warn( - "Failed to read supervisor topology timeStamp for " - + topologyId + " port=" - + workerData.getPort(), e); + LOG.warn("Failed to read supervisor topology timeStamp for " + topologyId + " port=" + workerData.getPort(), e); } Set<ResourceWorkerSlot> workers = assignment.getWorkers(); if (workers == null) { - String errMsg = - "Failed to get taskToResource of " + topologyId; + String errMsg = "Failed to get taskToResource of " + topologyId; LOG.error(errMsg); return; } - workerData.getWorkerToResource().addAll(workers); - Map<Integer, WorkerSlot> my_assignment = - new HashMap<Integer, WorkerSlot>(); + workerData.updateWorkerToResource(workers); + + Map<Integer, WorkerSlot> my_assignment = new HashMap<Integer, WorkerSlot>(); Map<String, String> node = assignment.getNodeHost(); @@ -220,11 +195,13 @@ public class RefreshConnections extends RunnableCallback { Set<WorkerSlot> need_connections = new HashSet<WorkerSlot>(); Set<Integer> localTasks = new HashSet<Integer>(); + Set<Integer> localNodeTasks = new HashSet<Integer>(); if (workers != null && outboundTasks != null) { for (ResourceWorkerSlot worker : workers) { - if (supervisorId.equals(worker.getNodeId()) - && worker.getPort() == workerData.getPort()) + if (supervisorId.equals(worker.getNodeId())) + localNodeTasks.addAll(worker.getTasks()); + if (supervisorId.equals(worker.getNodeId()) && worker.getPort() == workerData.getPort()) localTasks.addAll(worker.getTasks()); for (Integer id : worker.getTasks()) { if (outboundTasks.contains(id)) { @@ -236,6 +213,7 @@ public class RefreshConnections extends RunnableCallback { } taskNodeport.putAll(my_assignment); workerData.setLocalTasks(localTasks); + workerData.setLocalNodeTasks(localNodeTasks); // get which connection need to be remove or add Set<WorkerSlot> current_connections = nodeportSocket.keySet(); @@ -274,18 +252,9 @@ public class RefreshConnections extends RunnableCallback { nodeportSocket.remove(node_port).close(); } - // Update the status of all outbound tasks + // check the status of connections to all outbound tasks for (Integer taskId : outboundTasks) { - boolean isActive = false; - int currentTime = TimeUtils.current_time_secs(); - TaskHeartbeat tHB = - zkCluster.task_heartbeat(topologyId, taskId); - if (tHB != null) { - int taskReportTime = tHB.getTimeSecs(); - if ((currentTime - taskReportTime) < taskTimeoutSecs) - isActive = true; - } - workerData.updateOutboundTaskStatus(taskId, isActive); + workerData.updateOutboundTaskStatus(taskId, isOutTaskConnected(taskId)); } } } catch (Exception e) { @@ -307,16 +276,13 @@ public class RefreshConnections extends RunnableCallback { private Set<Integer> getAddedTasks(Assignment assignment) { Set<Integer> ret = new HashSet<Integer>(); try { - Set<Integer> taskIds = - assignment.getCurrentWorkerTasks( - workerData.getSupervisorId(), workerData.getPort()); + Set<Integer> taskIds = assignment.getCurrentWorkerTasks(workerData.getSupervisorId(), workerData.getPort()); for (Integer taskId : taskIds) { if (!(workerData.getTaskids().contains(taskId))) ret.add(taskId); } } catch (Exception e) { - LOG.warn("Failed to get added task list for" - + workerData.getTopologyId()); + LOG.warn("Failed to get added task list for" + workerData.getTopologyId()); ; } return ret; @@ -325,22 +291,36 @@ public class RefreshConnections extends RunnableCallback { private Set<Integer> getRemovedTasks(Assignment assignment) { Set<Integer> ret = new HashSet<Integer>(); try { - Set<Integer> taskIds = - assignment.getCurrentWorkerTasks( - workerData.getSupervisorId(), workerData.getPort()); + Set<Integer> taskIds = assignment.getCurrentWorkerTasks(workerData.getSupervisorId(), workerData.getPort()); for (Integer taskId : workerData.getTaskids()) { if (!(taskIds.contains(taskId))) ret.add(taskId); } } catch (Exception e) { - LOG.warn("Failed to get removed task list for" - + workerData.getTopologyId()); + LOG.warn("Failed to get removed task list for" + workerData.getTopologyId()); ; } return ret; } + private Set<Integer> getUpdatedTasks(Assignment assignment) { + Set<Integer> ret = new HashSet<Integer>(); + try { + Set<Integer> taskIds = assignment.getCurrentWorkerTasks(workerData.getSupervisorId(), workerData.getPort()); + for (Integer taskId : taskIds) { + if ((workerData.getTaskids().contains(taskId))) + ret.add(taskId); + } + } catch (Exception e) { + LOG.warn("Failed to get updated task list for" + workerData.getTopologyId()); + } + return ret; + } + private void createTasks(Set<Integer> tasks) { + if (tasks == null) + return; + for (Integer taskId : tasks) { try { TaskShutdownDameon shutdown = Task.mk_task(workerData, taskId); @@ -352,17 +332,50 @@ public class RefreshConnections extends RunnableCallback { } private void shutdownTasks(Set<Integer> tasks) { - for (Integer taskId : tasks) { + if (tasks == null) + return; + + List<TaskShutdownDameon> shutdowns = workerData.getShutdownDaemonbyTaskIds(tasks); + for (TaskShutdownDameon shutdown : shutdowns) { try { - List<TaskShutdownDameon> shutdowns = - workerData.getShutdownDaemonbyTaskIds(tasks); - for (TaskShutdownDameon shutdown : shutdowns) { - shutdown.shutdown(); - } + shutdown.shutdown(); } catch (Exception e) { - LOG.error("Failed to shutdown task-" + taskId, e); + LOG.error("Failed to shutdown task-" + shutdown.getTaskId(), e); } } } + private void updateTasks(Set<Integer> tasks) { + if (tasks == null) + return; + + List<TaskShutdownDameon> shutdowns = workerData.getShutdownDaemonbyTaskIds(tasks); + for (TaskShutdownDameon shutdown : shutdowns) { + try { + shutdown.getTask().updateTaskData(); + } catch (Exception e) { + LOG.error("Failed to update task-" + shutdown.getTaskId(), e); + } + } + } + + private boolean isOutTaskConnected(int taskId) { + boolean ret = false; + + if (workerData.getInnerTaskTransfer().get(taskId) != null) { + // Connections to inner tasks should be done after initialization. + // So return true here for all inner tasks. + ret = true; + } else { + WorkerSlot slot = taskNodeport.get(taskId); + if (slot != null) { + IConnection connection = nodeportSocket.get(slot); + if (connection != null) { + ret = connection.available(); + } + } + } + + return ret; + } } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/ShutdownableDameon.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/ShutdownableDameon.java b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/ShutdownableDameon.java index 2006b05..97932b9 100755 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/ShutdownableDameon.java +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/ShutdownableDameon.java @@ -21,7 +21,6 @@ import backtype.storm.daemon.Shutdownable; import com.alibaba.jstorm.cluster.DaemonCommon; -public interface ShutdownableDameon extends Shutdownable, DaemonCommon, - Runnable { +public interface ShutdownableDameon extends Shutdownable, DaemonCommon, Runnable { } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/VirtualPortDispatch.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/VirtualPortDispatch.java b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/VirtualPortDispatch.java index 21dc37c..a769cc1 100755 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/VirtualPortDispatch.java +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/VirtualPortDispatch.java @@ -38,23 +38,21 @@ import com.alibaba.jstorm.utils.DisruptorRunable; * */ public class VirtualPortDispatch extends DisruptorRunable { - private final static Logger LOG = LoggerFactory - .getLogger(VirtualPortDispatch.class); + private final static Logger LOG = LoggerFactory.getLogger(VirtualPortDispatch.class); private ConcurrentHashMap<Integer, DisruptorQueue> deserializeQueues; private IConnection recvConnection; - public VirtualPortDispatch(WorkerData workerData, - IConnection recvConnection, DisruptorQueue recvQueue) { + public VirtualPortDispatch(WorkerData workerData, IConnection recvConnection, DisruptorQueue recvQueue) { super(recvQueue, MetricDef.DISPATCH_THREAD); this.recvConnection = recvConnection; this.deserializeQueues = workerData.getDeserializeQueues(); } - + public void shutdownRecv() { - // don't need send shutdown command to every task + // don't need send shutdown command to every task // due to every task has been shutdown by workerData.active // at the same time queue has been fulll // byte shutdownCmd[] = { TaskStatus.SHUTDOWN }; @@ -87,8 +85,7 @@ public class VirtualPortDispatch extends DisruptorRunable { DisruptorQueue queue = deserializeQueues.get(task); if (queue == null) { - LOG.warn("Received invalid message directed at port " + task - + ". Dropping..."); + LOG.warn("Received invalid message directed at port " + task + ". Dropping..."); return; } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/Worker.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/Worker.java b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/Worker.java index d5cf9c8..2bf4c9c 100644 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/Worker.java +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/Worker.java @@ -17,22 +17,6 @@ */ package com.alibaba.jstorm.daemon.worker; -import java.io.BufferedReader; -import java.io.File; -import java.io.IOException; -import java.io.InputStream; -import java.io.InputStreamReader; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; - -import org.apache.commons.lang.StringUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import backtype.storm.Config; import backtype.storm.generated.Grouping; import backtype.storm.generated.StormTopology; @@ -41,30 +25,31 @@ import backtype.storm.messaging.IContext; import backtype.storm.task.TopologyContext; import backtype.storm.utils.DisruptorQueue; import backtype.storm.utils.Utils; - import com.alibaba.jstorm.callback.AsyncLoopThread; import com.alibaba.jstorm.callback.RunnableCallback; import com.alibaba.jstorm.client.ConfigExtension; -import com.alibaba.jstorm.cluster.Common; import com.alibaba.jstorm.cluster.StormConfig; +import com.alibaba.jstorm.metric.JStormMetricsReporter; import com.alibaba.jstorm.daemon.worker.hearbeat.SyncContainerHb; import com.alibaba.jstorm.daemon.worker.hearbeat.WorkerHeartbeatRunable; -import com.alibaba.jstorm.metric.JStormMetricsReporter; import com.alibaba.jstorm.task.Task; import com.alibaba.jstorm.task.TaskShutdownDameon; -import com.alibaba.jstorm.task.heartbeat.TaskHeartbeatRunable; import com.alibaba.jstorm.utils.JStormServerUtils; import com.alibaba.jstorm.utils.JStormUtils; -import com.alibaba.jstorm.utils.NetWorkUtils; import com.alibaba.jstorm.utils.PathUtils; import com.lmax.disruptor.WaitStrategy; import com.lmax.disruptor.dsl.ProducerType; +import org.apache.commons.lang.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.*; +import java.util.*; /** * worker entrance * * @author yannian/Longda - * */ public class Worker { @@ -76,26 +61,14 @@ public class Worker { private WorkerData workerData; @SuppressWarnings({ "rawtypes", "unchecked" }) - public Worker(Map conf, IContext context, String topology_id, - String supervisor_id, int port, String worker_id, String jar_path) - throws Exception { - - workerData = - new WorkerData(conf, context, topology_id, supervisor_id, port, - worker_id, jar_path); - + public Worker(Map conf, IContext context, String topology_id, String supervisor_id, int port, String worker_id, String jar_path) throws Exception { + workerData = new WorkerData(conf, context, topology_id, supervisor_id, port, worker_id, jar_path); } /** * get current task's output task list - * - * @param tasks_component - * @param mk_topology_context - * @param task_ids - * @throws Exception */ public static Set<Integer> worker_output_tasks(WorkerData workerData) { - ContextMaker context_maker = workerData.getContextMaker(); Set<Integer> task_ids = workerData.getTaskids(); StormTopology topology = workerData.getSysTopology(); @@ -103,16 +76,13 @@ public class Worker { Set<Integer> rtn = new HashSet<Integer>(); for (Integer taskid : task_ids) { - TopologyContext context = - context_maker.makeTopologyContext(topology, taskid, null); + TopologyContext context = context_maker.makeTopologyContext(topology, taskid, null); // <StreamId, <ComponentId, Grouping>> - Map<String, Map<String, Grouping>> targets = - context.getThisTargets(); + Map<String, Map<String, Grouping>> targets = context.getThisTargets(); for (Map<String, Grouping> e : targets.values()) { for (String componentId : e.keySet()) { - List<Integer> tasks = - context.getComponentTasks(componentId); + List<Integer> tasks = context.getComponentTasks(componentId); rtn.addAll(tasks); } } @@ -140,45 +110,46 @@ public class Worker { Set<Integer> taskids = workerData.getTaskids(); + Set<Thread> threads = new HashSet<Thread>(); + List<Task> taskArrayList = new ArrayList<Task>(); for (int taskid : taskids) { - - TaskShutdownDameon t = Task.mk_task(workerData, taskid); - - shutdowntasks.add(t); + Task task = new Task(workerData, taskid); + Thread thread =new Thread(task); + threads.add(thread); + taskArrayList.add(task); + thread.start(); + } + for (Thread thread : threads) { + thread.join(); + } + for (Task t : taskArrayList){ + shutdowntasks.add(t.getTaskShutdownDameon()); } - return shutdowntasks; } - + @Deprecated private DisruptorQueue startDispatchDisruptor() { - Map stormConf = workerData.getStormConf(); - - int queue_size = - Utils.getInt( - stormConf.get(Config.TOPOLOGY_TRANSFER_BUFFER_SIZE), - 1024); - WaitStrategy waitStrategy = - (WaitStrategy) JStormUtils.createDisruptorWaitStrategy(stormConf); - DisruptorQueue recvQueue = - DisruptorQueue.mkInstance("Dispatch", ProducerType.MULTI, - queue_size, waitStrategy); + Map stormConf = workerData.getStormConf(); + + int queue_size = Utils.getInt(stormConf.get(Config.TOPOLOGY_TRANSFER_BUFFER_SIZE), 1024); + WaitStrategy waitStrategy = (WaitStrategy) JStormUtils.createDisruptorWaitStrategy(stormConf); + DisruptorQueue recvQueue = DisruptorQueue.mkInstance("Dispatch", ProducerType.MULTI, queue_size, waitStrategy); // stop consumerStarted recvQueue.consumerStarted(); - + return recvQueue; } private void startDispatchThread() { - // remove dispatch thread, send tuple directly from nettyserver - //startDispatchDisruptor(); + // remove dispatch thread, send tuple directly from nettyserver + // startDispatchDisruptor(); IContext context = workerData.getContext(); String topologyId = workerData.getTopologyId(); - IConnection recvConnection = - context.bind(topologyId, workerData.getPort(), workerData.getDeserializeQueues()); - + IConnection recvConnection = context.bind(topologyId, workerData.getPort(), workerData.getDeserializeQueues()); + workerData.setRecvConnection(recvConnection); } @@ -191,40 +162,27 @@ public class Worker { // so create client connection before create task // refresh connection RefreshConnections refreshConn = makeRefreshConnections(); - AsyncLoopThread refreshconn = - new AsyncLoopThread(refreshConn, false, Thread.MIN_PRIORITY, - true); + AsyncLoopThread refreshconn = new AsyncLoopThread(refreshConn, false, Thread.MIN_PRIORITY, true); threads.add(refreshconn); // refresh ZK active status RefreshActive refreshZkActive = new RefreshActive(workerData); - AsyncLoopThread refreshzk = - new AsyncLoopThread(refreshZkActive, false, - Thread.MIN_PRIORITY, true); + AsyncLoopThread refreshzk = new AsyncLoopThread(refreshZkActive, false, Thread.MIN_PRIORITY, true); threads.add(refreshzk); // Sync heartbeat to Apsara Container - AsyncLoopThread syncContainerHbThread = - SyncContainerHb.mkWorkerInstance(workerData.getStormConf()); + AsyncLoopThread syncContainerHbThread = SyncContainerHb.mkWorkerInstance(workerData.getStormConf()); if (syncContainerHbThread != null) { threads.add(syncContainerHbThread); } - JStormMetricsReporter metricReporter = - new JStormMetricsReporter(workerData); - AsyncLoopThread metricThread = new AsyncLoopThread(metricReporter); - threads.add(metricThread); - - // create task heartbeat - TaskHeartbeatRunable taskHB = new TaskHeartbeatRunable(workerData); - AsyncLoopThread taskHBThread = new AsyncLoopThread(taskHB); - threads.add(taskHBThread); + JStormMetricsReporter metricReporter = new JStormMetricsReporter(workerData); + metricReporter.init(); + workerData.setMetricsReporter(metricReporter); // refresh hearbeat to Local dir RunnableCallback heartbeat_fn = new WorkerHeartbeatRunable(workerData); - AsyncLoopThread hb = - new AsyncLoopThread(heartbeat_fn, false, null, - Thread.NORM_PRIORITY, true); + AsyncLoopThread hb = new AsyncLoopThread(heartbeat_fn, false, null, Thread.NORM_PRIORITY, true); threads.add(hb); // shutdown task callbacks @@ -239,7 +197,6 @@ public class Worker { * create worker instance and run it * * @param conf - * @param mq_context * @param topology_id * @param supervisor_id * @param port @@ -248,9 +205,8 @@ public class Worker { * @throws Exception */ @SuppressWarnings("rawtypes") - public static WorkerShutdown mk_worker(Map conf, IContext context, - String topology_id, String supervisor_id, int port, - String worker_id, String jar_path) throws Exception { + public static WorkerShutdown mk_worker(Map conf, IContext context, String topology_id, String supervisor_id, int port, String worker_id, String jar_path) + throws Exception { StringBuilder sb = new StringBuilder(); sb.append("topologyId:" + topology_id + ", "); @@ -260,9 +216,7 @@ public class Worker { LOG.info("Begin to run worker:" + sb.toString()); - Worker w = - new Worker(conf, context, topology_id, supervisor_id, port, - worker_id, jar_path); + Worker w = new Worker(conf, context, topology_id, supervisor_id, port, worker_id, jar_path); w.redirectOutput(); @@ -271,8 +225,7 @@ public class Worker { public void redirectOutput() { - if (System.getenv("REDIRECT") == null - || !System.getenv("REDIRECT").equals("true")) { + if (System.getenv("REDIRECT") == null || !System.getenv("REDIRECT").equals("true")) { return; } @@ -283,9 +236,7 @@ public class Worker { DEFAULT_OUT_TARGET_FILE += ".out"; } - String outputFile = - ConfigExtension.getWorkerRedirectOutputFile(workerData - .getStormConf()); + String outputFile = ConfigExtension.getWorkerRedirectOutputFile(workerData.getStormConf()); if (outputFile == null) { outputFile = DEFAULT_OUT_TARGET_FILE; } else { @@ -302,7 +253,6 @@ public class Worker { outputFile = DEFAULT_OUT_TARGET_FILE; } } - } catch (Exception e) { LOG.warn("Failed to touch " + outputFile, e); outputFile = DEFAULT_OUT_TARGET_FILE; @@ -318,9 +268,7 @@ public class Worker { } /** - * Have one problem if the worker's start parameter length is longer than - * 4096, ps -ef|grep com.alibaba.jstorm.daemon.worker.Worker can't find - * worker + * Have one problem if the worker's start parameter length is longer than 4096, ps -ef|grep com.alibaba.jstorm.daemon.worker.Worker can't find worker * * @param port */ @@ -341,15 +289,11 @@ public class Worker { try { LOG.info("Begin to execute " + sb.toString()); - Process process = - JStormUtils.launch_process(sb.toString(), - new HashMap<String, String>(), false); - + Process process = JStormUtils.launch_process(sb.toString(), new HashMap<String, String>(), false); // Process process = Runtime.getRuntime().exec(sb.toString()); InputStream stdin = process.getInputStream(); - BufferedReader reader = - new BufferedReader(new InputStreamReader(stdin)); + BufferedReader reader = new BufferedReader(new InputStreamReader(stdin)); JStormUtils.sleepMs(1000); @@ -405,7 +349,6 @@ public class Worker { LOG.info("Skip kill myself"); continue; } - Integer pid = Integer.valueOf(fields[1]); LOG.info("Find one process :" + pid.toString()); @@ -415,9 +358,7 @@ public class Worker { continue; } } - } - return ret; } catch (IOException e) { LOG.info("Failed to execute " + sb.toString()); @@ -429,13 +370,10 @@ public class Worker { } public static void killOldWorker(String port) { - List<Integer> oldPids = getOldPortPids(port); for (Integer pid : oldPids) { - JStormUtils.kill(pid); } - } /** @@ -456,7 +394,6 @@ public class Worker { } StringBuilder sb = new StringBuilder(); - try { String topology_id = args[0]; String supervisor_id = args[1]; @@ -476,9 +413,7 @@ public class Worker { sb.append("workerId:" + worker_id + ", "); sb.append("jar_path:" + jar_path + "\n"); - WorkerShutdown sd = - mk_worker(conf, null, topology_id, supervisor_id, - Integer.parseInt(port_str), worker_id, jar_path); + WorkerShutdown sd = mk_worker(conf, null, topology_id, supervisor_id, Integer.parseInt(port_str), worker_id, jar_path); sd.join(); LOG.info("Successfully shutdown worker " + sb.toString());
