[ 
https://issues.apache.org/jira/browse/STORM-1279?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15192876#comment-15192876
 ] 

ASF GitHub Bot commented on STORM-1279:
---------------------------------------

Github user abhishekagarwal87 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1184#discussion_r55965454
  
    --- Diff: 
storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncSupervisorEvent.java 
---
    @@ -0,0 +1,631 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.storm.daemon.supervisor;
    +
    +import org.apache.commons.io.FileUtils;
    +import org.apache.storm.Config;
    +import org.apache.storm.blobstore.BlobStore;
    +import org.apache.storm.blobstore.ClientBlobStore;
    +import org.apache.storm.cluster.IStateStorage;
    +import org.apache.storm.cluster.IStormClusterState;
    +import org.apache.storm.event.EventManager;
    +import org.apache.storm.generated.*;
    +import org.apache.storm.localizer.LocalResource;
    +import org.apache.storm.localizer.LocalizedResource;
    +import org.apache.storm.localizer.Localizer;
    +import org.apache.storm.utils.*;
    +import org.apache.thrift.transport.TTransportException;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.io.File;
    +import java.io.FileOutputStream;
    +import java.io.IOException;
    +import java.net.JarURLConnection;
    +import java.net.URL;
    +import java.nio.file.Files;
    +import java.nio.file.StandardCopyOption;
    +import java.util.*;
    +import java.util.concurrent.atomic.AtomicInteger;
    +
    +public class SyncSupervisorEvent implements Runnable {
    +
    +    private static final Logger LOG = 
LoggerFactory.getLogger(SyncSupervisorEvent.class);
    +
    +    private EventManager syncSupEventManager;
    +    private EventManager syncProcessManager;
    +    private IStormClusterState stormClusterState;
    +    private LocalState localState;
    +    private SyncProcessEvent syncProcesses;
    +    private SupervisorData supervisorData;
    +
    +    public SyncSupervisorEvent(SupervisorData supervisorData, 
SyncProcessEvent syncProcesses, EventManager syncSupEventManager,
    +            EventManager syncProcessManager) {
    +
    +        this.syncProcesses = syncProcesses;
    +        this.syncSupEventManager = syncSupEventManager;
    +        this.syncProcessManager = syncProcessManager;
    +        this.stormClusterState = supervisorData.getStormClusterState();
    +        this.localState = supervisorData.getLocalState();
    +        this.supervisorData = supervisorData;
    +    }
    +
    +    @Override
    +    public void run() {
    +        try {
    +            Map conf = supervisorData.getConf();
    +            Runnable syncCallback = new EventManagerPushCallback(this, 
syncSupEventManager);
    +            List<String> stormIds = 
stormClusterState.assignments(syncCallback);
    +            Map<String, Map<String, Object>> assignmentsSnapshot =
    +                    getAssignmentsSnapshot(stormClusterState, stormIds, 
supervisorData.getAssignmentVersions().get(), syncCallback);
    +            Map<String, List<ProfileRequest>> stormIdToProfilerActions = 
getProfileActions(stormClusterState, stormIds);
    +
    +            Set<String> allDownloadedTopologyIds = 
SupervisorUtils.readDownLoadedStormIds(conf);
    +            Map<String, String> stormcodeMap = 
readStormCodeLocations(assignmentsSnapshot);
    +            Map<Integer, LocalAssignment> existingAssignment = 
localState.getLocalAssignmentsMap();
    +            if (existingAssignment == null) {
    +                existingAssignment = new HashMap<>();
    +            }
    +
    +            Map<Integer, LocalAssignment> allAssignment =
    +                    readAssignments(assignmentsSnapshot, 
existingAssignment, supervisorData.getAssignmentId(), 
supervisorData.getSyncRetry());
    +
    +            Map<Integer, LocalAssignment> newAssignment = new HashMap<>();
    +            Set<String> assignedStormIds = new HashSet<>();
    +
    +            for (Map.Entry<Integer, LocalAssignment> entry : 
allAssignment.entrySet()) {
    +                if 
(supervisorData.getiSupervisor().confirmAssigned(entry.getKey())) {
    +                    newAssignment.put(entry.getKey(), entry.getValue());
    +                    
assignedStormIds.add(entry.getValue().get_topology_id());
    +                }
    +            }
    +
    +            Set<String> srashStormIds = verifyDownloadedFiles(conf, 
supervisorData.getLocalizer(), assignedStormIds, allDownloadedTopologyIds);
    +            Set<String> downloadedStormIds = new HashSet<>();
    +            downloadedStormIds.addAll(allDownloadedTopologyIds);
    +            downloadedStormIds.removeAll(srashStormIds);
    +
    +            LOG.debug("Synchronizing supervisor");
    +            LOG.debug("Storm code map: {}", stormcodeMap);
    +            LOG.debug("All assignment: {}", allAssignment);
    +            LOG.debug("New assignment: {}", newAssignment);
    +            LOG.debug("Assigned Storm Ids {}", assignedStormIds);
    +            LOG.debug("All Downloaded Ids {}", allDownloadedTopologyIds);
    +            LOG.debug("Checked Downloaded Ids {}", srashStormIds);
    +            LOG.debug("Downloaded Ids {}", downloadedStormIds);
    +            LOG.debug("Storm Ids Profiler Actions {}", 
stormIdToProfilerActions);
    +            // download code first
    +            // This might take awhile
    +            // - should this be done separately from usual monitoring?
    +            // should we only download when topology is assigned to this 
supervisor?
    +            for (Map.Entry<String, String> entry : 
stormcodeMap.entrySet()) {
    +                String stormId = entry.getKey();
    +                if (!downloadedStormIds.contains(stormId) && 
assignedStormIds.contains(stormId)) {
    +                    LOG.info("Downloading code for storm id {}.", stormId);
    +                    try {
    +                        downloadStormCode(conf, stormId, entry.getValue(), 
supervisorData.getLocalizer());
    +                    } catch (Exception e) {
    +                        if 
(Utils.exceptionCauseIsInstanceOf(NimbusLeaderNotFoundException.class, e)) {
    +                            LOG.warn("Nimbus leader was not available.", 
e);
    +                        } else if 
(Utils.exceptionCauseIsInstanceOf(TTransportException.class, e)) {
    +                            LOG.warn("There was a connection problem with 
nimbus.", e);
    +                        } else {
    +                            throw e;
    +                        }
    +                    }
    +                    LOG.info("Finished downloading code for storm id {}", 
stormId);
    +                }
    +            }
    +
    +            LOG.debug("Writing new assignment {}", newAssignment);
    +
    +            Set<Integer> killWorkers = new HashSet<>();
    +            killWorkers.addAll(existingAssignment.keySet());
    +            killWorkers.removeAll(newAssignment.keySet());
    +            for (Integer port : killWorkers) {
    +                supervisorData.getiSupervisor().killedWorker(port);
    +            }
    +
    +            killExistingWorkersWithChangeInComponents(supervisorData, 
existingAssignment, newAssignment);
    +
    +            
supervisorData.getiSupervisor().assigned(newAssignment.keySet());
    +            localState.setLocalAssignmentsMap(newAssignment);
    +            supervisorData.setAssignmentVersions(assignmentsSnapshot);
    +            
supervisorData.setStormIdToProfileActions(stormIdToProfilerActions);
    +
    +            Map<Long, LocalAssignment> convertNewAssignment = new 
HashMap<>();
    +            for (Map.Entry<Integer, LocalAssignment> entry : 
newAssignment.entrySet()) {
    +                convertNewAssignment.put(entry.getKey().longValue(), 
entry.getValue());
    +            }
    +            supervisorData.setCurrAssignment(convertNewAssignment);
    +            // remove any downloaded code that's no longer assigned or 
active
    +            // important that this happens after setting the local 
assignment so that
    +            // synchronize-supervisor doesn't try to launch workers for 
which the
    +            // resources don't exist
    +            if (Utils.isOnWindows()) {
    +                shutdownDisallowedWorkers();
    +            }
    +            for (String stormId : allDownloadedTopologyIds) {
    +                if (!stormcodeMap.containsKey(stormId)) {
    +                    LOG.info("Removing code for storm id {}.", stormId);
    +                    rmTopoFiles(conf, stormId, 
supervisorData.getLocalizer(), true);
    +                }
    +            }
    +            syncProcessManager.add(syncProcesses);
    +        } catch (Exception e) {
    +            LOG.error("Failed to Sync Supervisor", e);
    +            throw new RuntimeException(e);
    +        }
    +
    +    }
    +
    +    private void killExistingWorkersWithChangeInComponents(SupervisorData 
supervisorData, Map<Integer, LocalAssignment> existingAssignment,
    +            Map<Integer, LocalAssignment> newAssignment) throws Exception {
    +        LocalState localState = supervisorData.getLocalState();
    +        Map<Integer, LocalAssignment> assignedExecutors = 
localState.getLocalAssignmentsMap();
    +        if (assignedExecutors == null) {
    +            assignedExecutors = new HashMap<>();
    +        }
    +        int now = Time.currentTimeSecs();
    +        Map<String, StateHeartbeat> workerIdHbstate = 
syncProcesses.getLocalWorkerStats(supervisorData, assignedExecutors, now);
    +        Map<Integer, String> vaildPortToWorkerIds = new HashMap<>();
    +        for (Map.Entry<String, StateHeartbeat> entry : 
workerIdHbstate.entrySet()) {
    +            String workerId = entry.getKey();
    +            StateHeartbeat stateHeartbeat = entry.getValue();
    +            if (stateHeartbeat != null && stateHeartbeat.getState() == 
State.VALID) {
    +                
vaildPortToWorkerIds.put(stateHeartbeat.getHeartbeat().get_port(), workerId);
    +            }
    +        }
    +
    +        Map<Integer, LocalAssignment> intersectAssignment = new 
HashMap<>();
    +        for (Map.Entry<Integer, LocalAssignment> entry : 
newAssignment.entrySet()) {
    +            Integer port = entry.getKey();
    +            if (existingAssignment.containsKey(port)) {
    +                intersectAssignment.put(port, entry.getValue());
    +            }
    +        }
    +
    +        for (Integer port : intersectAssignment.keySet()) {
    +            List<ExecutorInfo> existExecutors = 
existingAssignment.get(port).get_executors();
    +            List<ExecutorInfo> newExecutors = 
newAssignment.get(port).get_executors();
    +            if (newExecutors.size() != existExecutors.size()) {
    +                syncProcesses.shutWorker(supervisorData, 
vaildPortToWorkerIds.get(port));
    +                continue;
    +            }
    +            for (ExecutorInfo executorInfo : newExecutors) {
    +                if (!existExecutors.contains(executorInfo)) {
    +                    syncProcesses.shutWorker(supervisorData, 
vaildPortToWorkerIds.get(port));
    +                    break;
    +                }
    +            }
    +
    +        }
    +    }
    +
    +    protected Map<String, Map<String, Object>> 
getAssignmentsSnapshot(IStormClusterState stormClusterState, List<String> 
stormIds,
    +            Map<String, Map<String, Object>> localAssignmentVersion, 
Runnable callback) throws Exception {
    +        Map<String, Map<String, Object>> updateAssignmentVersion = new 
HashMap<>();
    +        for (String stormId : stormIds) {
    +            Integer recordedVersion = -1;
    +            Integer version = stormClusterState.assignmentVersion(stormId, 
callback);
    +            if (localAssignmentVersion.containsKey(stormId) && 
localAssignmentVersion.get(stormId) != null) {
    +                recordedVersion = (Integer) 
localAssignmentVersion.get(stormId).get(IStateStorage.VERSION);
    +            }
    +            if (version == null) {
    +                // ignore
    +            } else if (version == recordedVersion) {
    +                updateAssignmentVersion.put(stormId, 
localAssignmentVersion.get(stormId));
    +            } else {
    +                Map<String, Object> assignmentVersion = (Map<String, 
Object>) stormClusterState.assignmentInfoWithVersion(stormId, callback);
    +                updateAssignmentVersion.put(stormId, assignmentVersion);
    +            }
    +        }
    +        return updateAssignmentVersion;
    +    }
    +
    +    protected Map<String, List<ProfileRequest>> 
getProfileActions(IStormClusterState stormClusterState, List<String> stormIds) 
throws Exception {
    +        Map<String, List<ProfileRequest>> ret = new HashMap<String, 
List<ProfileRequest>>();
    +        for (String stormId : stormIds) {
    +            List<ProfileRequest> profileRequests = 
stormClusterState.getTopologyProfileRequests(stormId);
    +            ret.put(stormId, profileRequests);
    +        }
    +        return ret;
    +    }
    +
    +    protected Map<String, String> readStormCodeLocations(Map<String, 
Map<String, Object>> assignmentsSnapshot) {
    +        Map<String, String> stormcodeMap = new HashMap<>();
    +        for (Map.Entry<String, Map<String, Object>> entry : 
assignmentsSnapshot.entrySet()) {
    +            Assignment assignment = (Assignment) 
(entry.getValue().get(IStateStorage.DATA));
    +            if (assignment != null) {
    +                stormcodeMap.put(entry.getKey(), 
assignment.get_master_code_dir());
    +            }
    +        }
    +        return stormcodeMap;
    +    }
    +
    +    /**
    +     * Remove a reference to a blob when its no longer needed.
    +     * 
    +     * @param localizer
    +     * @param stormId
    +     * @param conf
    +     */
    +    protected void removeBlobReferences(Localizer localizer, String 
stormId, Map conf) throws Exception {
    +        Map stormConf = ConfigUtils.readSupervisorStormConf(conf, stormId);
    +        Map<String, Map<String, Object>> blobstoreMap = (Map<String, 
Map<String, Object>>) stormConf.get(Config.TOPOLOGY_BLOBSTORE_MAP);
    +        String user = (String) 
stormConf.get(Config.TOPOLOGY_SUBMITTER_USER);
    +        String topoName = (String) stormConf.get(Config.TOPOLOGY_NAME);
    +        if (blobstoreMap != null) {
    +            for (Map.Entry<String, Map<String, Object>> entry : 
blobstoreMap.entrySet()) {
    +                String key = entry.getKey();
    +                Map<String, Object> blobInfo = entry.getValue();
    +                localizer.removeBlobReference(key, user, topoName, 
SupervisorUtils.shouldUncompressBlob(blobInfo));
    +            }
    +        }
    +    }
    +
    +    protected void rmTopoFiles(Map conf, String stormId, Localizer 
localizer, boolean isrmBlobRefs) throws IOException {
    +        String path = ConfigUtils.supervisorStormDistRoot(conf, stormId);
    +        try {
    +            if (isrmBlobRefs) {
    +                removeBlobReferences(localizer, stormId, conf);
    +            }
    +            if 
(Utils.getBoolean(conf.get(Config.SUPERVISOR_RUN_WORKER_AS_USER), false)) {
    +                SupervisorUtils.rmrAsUser(conf, stormId, path);
    +            } else {
    +                
Utils.forceDelete(ConfigUtils.supervisorStormDistRoot(conf, stormId));
    +            }
    +        } catch (Exception e) {
    +            LOG.info("Exception removing: {} ", stormId, e);
    +        }
    +    }
    +
    +    /**
    +     * Check for the files exists to avoid supervisor crashing Also makes 
sure there is no necessity for locking"
    +     * 
    +     * @param conf
    +     * @param localizer
    +     * @param assignedStormIds
    +     * @param allDownloadedTopologyIds
    +     * @return
    +     */
    +    protected Set<String> verifyDownloadedFiles(Map conf, Localizer 
localizer, Set<String> assignedStormIds, Set<String> allDownloadedTopologyIds)
    +            throws IOException {
    +        Set<String> srashStormIds = new HashSet<>();
    +        for (String stormId : allDownloadedTopologyIds) {
    +            if (assignedStormIds.contains(stormId)) {
    +                if (!SupervisorUtils.doRequiredTopoFilesExist(conf, 
stormId)) {
    +                    LOG.debug("Files not present in topology directory");
    +                    rmTopoFiles(conf, stormId, localizer, false);
    +                    srashStormIds.add(stormId);
    +                }
    +            }
    +        }
    +        return srashStormIds;
    +    }
    +
    +    /**
    +     * download code ; two cluster mode: local and distributed
    +     *
    +     * @param conf
    +     * @param stormId
    +     * @param masterCodeDir
    +     * @throws IOException
    +     */
    +    private void downloadStormCode(Map conf, String stormId, String 
masterCodeDir, Localizer localizer) throws Exception {
    +        String clusterMode = ConfigUtils.clusterMode(conf);
    +
    +        if (clusterMode.endsWith("distributed")) {
    +            downloadDistributeStormCode(conf, stormId, masterCodeDir, 
localizer);
    +        } else if (clusterMode.endsWith("local")) {
    +            downloadLocalStormCode(conf, stormId, masterCodeDir, 
localizer);
    +        }
    +    }
    +
    +    private void downloadLocalStormCode(Map conf, String stormId, String 
masterCodeDir, Localizer localizer) throws Exception {
    +
    +        String tmproot = ConfigUtils.supervisorTmpDir(conf) + 
Utils.FILE_PATH_SEPARATOR + Utils.uuid();
    +        String stormroot = ConfigUtils.supervisorStormDistRoot(conf, 
stormId);
    +        BlobStore blobStore = Utils.getNimbusBlobStore(conf, 
masterCodeDir, null);
    +        try {
    +            FileUtils.forceMkdir(new File(tmproot));
    +            String stormCodeKey = ConfigUtils.masterStormCodeKey(stormId);
    +            String stormConfKey = ConfigUtils.masterStormConfKey(stormId);
    +            String codePath = ConfigUtils.supervisorStormCodePath(tmproot);
    +            String confPath = ConfigUtils.supervisorStormConfPath(tmproot);
    +            blobStore.readBlobTo(stormCodeKey, new 
FileOutputStream(codePath), null);
    +            blobStore.readBlobTo(stormConfKey, new 
FileOutputStream(confPath), null);
    +        } finally {
    +            blobStore.shutdown();
    +        }
    +        FileUtils.moveDirectory(new File(tmproot), new File(stormroot));
    +        SupervisorUtils.setupStormCodeDir(conf, 
ConfigUtils.readSupervisorStormConf(conf, stormId), stormroot);
    +        ClassLoader classloader = 
Thread.currentThread().getContextClassLoader();
    +
    +        String resourcesJar = resourcesJar();
    +
    +        URL url = classloader.getResource(ConfigUtils.RESOURCES_SUBDIR);
    +
    +        String targetDir = stormroot + Utils.FILE_PATH_SEPARATOR + 
ConfigUtils.RESOURCES_SUBDIR;
    +
    +        if (resourcesJar != null) {
    +            LOG.info("Extracting resources from jar at {} to {}", 
resourcesJar, targetDir);
    +            Utils.extractDirFromJar(resourcesJar, 
ConfigUtils.RESOURCES_SUBDIR, stormroot);
    +        } else if (url != null) {
    +
    +            LOG.info("Copying resources at {} to {} ", url.toString(), 
targetDir);
    +            if (url.getProtocol() == "jar") {
    +                JarURLConnection urlConnection = (JarURLConnection) 
url.openConnection();
    +                
Utils.extractDirFromJar(urlConnection.getJarFileURL().getFile(), 
ConfigUtils.RESOURCES_SUBDIR, stormroot);
    +            } else {
    +                FileUtils.copyDirectory(new File(url.getFile()), (new 
File(targetDir)));
    +            }
    +        }
    +    }
    +
    +    /**
    +     * Downloading to permanent location is atomic
    +     * 
    +     * @param conf
    +     * @param stormId
    +     * @param masterCodeDir
    +     * @param localizer
    +     * @throws Exception
    +     */
    +    private void downloadDistributeStormCode(Map conf, String stormId, 
String masterCodeDir, Localizer localizer) throws Exception {
    +
    +        String tmproot = ConfigUtils.supervisorTmpDir(conf) + 
Utils.FILE_PATH_SEPARATOR + Utils.uuid();
    +        String stormroot = ConfigUtils.supervisorStormDistRoot(conf, 
stormId);
    +        ClientBlobStore blobStore = 
Utils.getClientBlobStoreForSupervisor(conf);
    +        FileUtils.forceMkdir(new File(tmproot));
    +        if (Utils.isOnWindows()) {
    +            if 
(Utils.getBoolean(conf.get(Config.SUPERVISOR_RUN_WORKER_AS_USER), false)) {
    +                throw new RuntimeException("ERROR: Windows doesn't 
implement setting the correct permissions");
    +            }
    +        } else {
    +            Utils.restrictPermissions(tmproot);
    +        }
    +        String stormJarKey = ConfigUtils.masterStormJarKey(stormId);
    +        String stormCodeKey = ConfigUtils.masterStormCodeKey(stormId);
    +        String stormConfKey = ConfigUtils.masterStormConfKey(stormId);
    +        String jarPath = ConfigUtils.supervisorStormJarPath(tmproot);
    +        String codePath = ConfigUtils.supervisorStormCodePath(tmproot);
    +        String confPath = ConfigUtils.supervisorStormConfPath(tmproot);
    +        Utils.downloadResourcesAsSupervisor(stormJarKey, jarPath, 
blobStore);
    +        Utils.downloadResourcesAsSupervisor(stormCodeKey, codePath, 
blobStore);
    +        Utils.downloadResourcesAsSupervisor(stormConfKey, confPath, 
blobStore);
    +        blobStore.shutdown();
    +        Utils.extractDirFromJar(jarPath, ConfigUtils.RESOURCES_SUBDIR, 
tmproot);
    +        downloadBlobsForTopology(conf, confPath, localizer, tmproot);
    +        if (IsDownloadBlobsForTopologySucceed(confPath, tmproot)) {
    +            LOG.info("Successfully downloaded blob resources for storm-id 
{}", stormId);
    +            FileUtils.forceMkdir(new File(stormroot));
    +            Files.move(new File(tmproot).toPath(), new 
File(stormroot).toPath(), StandardCopyOption.ATOMIC_MOVE);
    +            SupervisorUtils.setupStormCodeDir(conf, 
ConfigUtils.readSupervisorStormConf(conf, stormId), stormroot);
    +        } else {
    +            LOG.info("Failed to download blob resources for storm-id ", 
stormId);
    +            Utils.forceDelete(tmproot);
    +        }
    +    }
    +
    +    /**
    +     * Assert if all blobs are downloaded for the given topology
    +     * 
    +     * @param stormconfPath
    +     * @param targetDir
    +     * @return
    +     */
    +    protected boolean IsDownloadBlobsForTopologySucceed(String 
stormconfPath, String targetDir) throws IOException {
    +        Map stormConf = 
Utils.fromCompressedJsonConf(FileUtils.readFileToByteArray(new 
File(stormconfPath)));
    +        Map<String, Map<String, Object>> blobstoreMap = (Map<String, 
Map<String, Object>>) stormConf.get(Config.TOPOLOGY_BLOBSTORE_MAP);
    +        List<String> blobFileNames = new ArrayList<>();
    +        if (blobstoreMap != null) {
    +            for (Map.Entry<String, Map<String, Object>> entry : 
blobstoreMap.entrySet()) {
    +                String key = entry.getKey();
    +                Map<String, Object> blobInfo = entry.getValue();
    +                String ret = null;
    +                if (blobInfo != null && blobInfo.containsKey("localname")) 
{
    +                    ret = (String) blobInfo.get("localname");
    +                } else {
    +                    ret = key;
    +                }
    +                blobFileNames.add(ret);
    +            }
    +        }
    +        for (String string : blobFileNames) {
    +            if (!Utils.checkFileExists(string))
    +                return false;
    +        }
    +        return true;
    +    }
    +
    +    /**
    +     * Download all blobs listed in the topology configuration for a given 
topology.
    +     * 
    +     * @param conf
    +     * @param stormconfPath
    +     * @param localizer
    +     * @param tmpRoot
    +     */
    +    protected void downloadBlobsForTopology(Map conf, String 
stormconfPath, Localizer localizer, String tmpRoot) throws IOException {
    +        Map stormConf = ConfigUtils.readSupervisorStormConfGivenPath(conf, 
stormconfPath);
    +        Map<String, Map<String, Object>> blobstoreMap = (Map<String, 
Map<String, Object>>) stormConf.get(Config.TOPOLOGY_BLOBSTORE_MAP);
    +        String user = (String) 
stormConf.get(Config.TOPOLOGY_SUBMITTER_USER);
    +        String topoName = (String) stormConf.get(Config.TOPOLOGY_NAME);
    +        File userDir = localizer.getLocalUserFileCacheDir(user);
    +        List<LocalResource> localResourceList = 
SupervisorUtils.blobstoreMapToLocalresources(blobstoreMap);
    +        if (localResourceList.size() > 0) {
    +            if (!userDir.exists()) {
    +                FileUtils.forceMkdir(userDir);
    +            }
    +            try {
    +                List<LocalizedResource> localizedResources = 
localizer.getBlobs(localResourceList, user, topoName, userDir);
    +                setupBlobPermission(conf, user, userDir.toString());
    +                for (LocalizedResource localizedResource : 
localizedResources) {
    +                    File rsrcFilePath = new 
File(localizedResource.getFilePath());
    +                    String keyName = rsrcFilePath.getName();
    +                    String blobSymlinkTargetName = new 
File(localizedResource.getCurrentSymlinkPath()).getName();
    +
    +                    String symlinkName = null;
    +                    if (blobstoreMap != null) {
    +                        Map<String, Object> blobInfo = 
blobstoreMap.get(keyName);
    +                        if (blobInfo != null && 
blobInfo.containsKey("localname")) {
    --- End diff --
    
    this logic was wrapped inside get-blob-localname in clojure. In java, it is 
duplicated in three places. 


> port backtype.storm.daemon.supervisor to java
> ---------------------------------------------
>
>                 Key: STORM-1279
>                 URL: https://issues.apache.org/jira/browse/STORM-1279
>             Project: Apache Storm
>          Issue Type: New Feature
>          Components: storm-core
>            Reporter: Robert Joseph Evans
>            Assignee: John Fang
>              Labels: java-migration, jstorm-merger
>         Attachments: Discussion about supervisor.pdf
>
>
> https://github.com/apache/storm/tree/jstorm-import/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor
>  as an example
> backtype.storm.event usage should be replaced with built-in java threadpools.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to