Github user kishorvpatil commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2345#discussion_r143510721
  
    --- Diff: 
storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java ---
    @@ -156,17 +146,141 @@ public AsyncLocalizer(Map<String, Object> conf, 
AtomicReference<Map<Long, LocalA
                 DaemonConfig.SUPERVISOR_BLOBSTORE_DOWNLOAD_MAX_RETRIES), 3);
     
             execService = Executors.newScheduledThreadPool(threadPoolSize,
    -            new ThreadFactoryBuilder().setNameFormat("AsyncLocalizer 
Executor").build());
    +            new ThreadFactoryBuilder().setNameFormat("AsyncLocalizer 
Executor - %d").build());
             reconstructLocalizedResources();
     
             symlinksDisabled = 
(boolean)conf.getOrDefault(Config.DISABLE_SYMLINKS, false);
    -        basicPending = new HashMap<>();
             blobPending = new HashMap<>();
             this.currAssignment = currAssignment;
     
             recoverBlobReferences(portToAssignments);
         }
     
    +    public AsyncLocalizer(Map<String, Object> conf, 
AtomicReference<Map<Long, LocalAssignment>> currAssignment,
    +                          Map<Integer, LocalAssignment> portToAssignments) 
throws IOException {
    +        this(conf, AdvancedFSOps.make(conf), 
ConfigUtils.supervisorLocalDir(conf), currAssignment, portToAssignments);
    +    }
    +
    +    @VisibleForTesting
    +    LocallyCachedBlob getTopoJar(final String topologyId) throws 
IOException {
    +        String topoJarKey = ConfigUtils.masterStormJarKey(topologyId);
    +        LocallyCachedBlob topoJar = topologyBlobs.get(topoJarKey);
    +        if (topoJar == null) {
    +            topoJar = new LocallyCachedTopologyBlob(topologyId, 
isLocalMode, conf, fsOps,
    +                LocallyCachedTopologyBlob.TopologyBlobType.TOPO_JAR);
    +            topologyBlobs.put(topoJarKey, topoJar);
    +        }
    +        return topoJar;
    +    }
    +
    +    @VisibleForTesting
    +    LocallyCachedBlob getTopoCode(final String topologyId) throws 
IOException {
    +        String topoCodeKey = ConfigUtils.masterStormCodeKey(topologyId);
    +        LocallyCachedBlob topoCode = topologyBlobs.get(topoCodeKey);
    +        if (topoCode == null) {
    +            topoCode = new LocallyCachedTopologyBlob(topologyId, 
isLocalMode, conf, fsOps,
    +                LocallyCachedTopologyBlob.TopologyBlobType.TOPO_CODE);
    +            topologyBlobs.put(topoCodeKey, topoCode);
    +        }
    +        return topoCode;
    +    }
    +
    +    @VisibleForTesting
    +    LocallyCachedBlob getTopoConf(final String topologyId) throws 
IOException {
    +        String topoConfKey = ConfigUtils.masterStormConfKey(topologyId);
    +        LocallyCachedBlob topoConf = topologyBlobs.get(topoConfKey);
    +        if (topoConf == null) {
    +            topoConf = new LocallyCachedTopologyBlob(topologyId, 
isLocalMode, conf, fsOps,
    +                LocallyCachedTopologyBlob.TopologyBlobType.TOPO_CONF);
    +            topologyBlobs.put(topoConfKey, topoConf);
    +        }
    +        return topoConf;
    +    }
    +
    +    public synchronized CompletableFuture<Void> 
requestDownloadTopologyBlobs(final LocalAssignment assignment, final int port,
    +                                                                           
  final BlobChangingCallback cb) throws IOException {
    +        final String topologyId = assignment.get_topology_id();
    +
    +        CompletableFuture<Void> baseBlobs = 
requestDownloadBaseTopologyBlobs(assignment, port, cb);
    +        return baseBlobs.thenComposeAsync((v) -> {
    +            LocalDownloadedResource localResource = 
blobPending.get(topologyId);
    +            if (localResource == null) {
    +                Supplier<Void> supplier = new DownloadBlobs(topologyId, 
assignment.get_owner());
    +                localResource = new 
LocalDownloadedResource(CompletableFuture.supplyAsync(supplier, execService));
    +                blobPending.put(topologyId, localResource);
    +            }
    +            CompletableFuture<Void> r = localResource.reserve(port, 
assignment);
    +            LOG.debug("Reserved blobs {} {}", topologyId, localResource);
    +            return r;
    +        });
    +    }
    +
    +    @VisibleForTesting
    +    synchronized CompletableFuture<Void> 
requestDownloadBaseTopologyBlobs(final LocalAssignment assignment, final int 
port,
    +                                                                          
BlobChangingCallback cb) throws IOException {
    +        PortAndAssignment pna = new PortAndAssignment(port, assignment);
    +        final String topologyId = assignment.get_topology_id();
    +
    +        LocallyCachedBlob topoJar = getTopoJar(topologyId);
    +        topoJar.addReference(pna, cb);
    +
    +        LocallyCachedBlob topoCode = getTopoCode(topologyId);
    +        topoCode.addReference(pna, cb);
    +
    +        LocallyCachedBlob topoConf = getTopoConf(topologyId);
    +        topoConf.addReference(pna, cb);
    +
    +        CompletableFuture<Void> ret = 
topologyBasicDownloaded.get(topologyId);
    +        if (ret == null) {
    +            ret = downloadOrUpdate(topoJar, topoCode, topoConf);
    +        }
    +        return ret;
    +    }
    +
    +    private static final int ATTEMPTS_INTERVAL_TIME = 100;
    +
    +    private CompletableFuture<Void> downloadOrUpdate(LocallyCachedBlob ... 
blobs) {
    +        CompletableFuture<Void> [] all = new 
CompletableFuture[blobs.length];
    +        for (int i = 0; i < blobs.length; i++) {
    +            final LocallyCachedBlob blob = blobs[i];
    +            all[i] = CompletableFuture.runAsync(() -> {
    +                LOG.debug("STARTING download of {}", blob);
    +                try (ClientBlobStore blobStore = 
ServerUtils.getClientBlobStoreForSupervisor(conf)) {
    +                    boolean done = false;
    +                    long failures = 0;
    +                    while (!done) {
    +                        try {
    +                            synchronized (blob) {
    +                                long localVersion = blob.getLocalVersion();
    +                                long remoteVersion = 
blob.getRemoteVersion(blobStore);
    +                                if (localVersion != remoteVersion) {
    +                                    try {
    +                                        long newVersion = 
blob.downloadToTempLocation(blobStore);
    +                                        
blob.informAllOfChangeAndWaitForConsensus();
    +                                        blob.commitNewVersion(newVersion);
    +                                        blob.informAllChangeComplete();
    +                                    } finally {
    +                                        blob.cleanupOrphanedData();
    +                                    }
    +                                }
    +                            }
    +                            done = true;
    +                        } catch (Exception e) {
    +                            failures++;
    +                            if (failures > blobDownloadRetries) {
    +                                throw new RuntimeException("Could not 
download...", e);
    +                            }
    +                            LOG.warn("Failed to download blob {} will try 
again in {} ms", blob, ATTEMPTS_INTERVAL_TIME, e);
    +                            Utils.sleep(ATTEMPTS_INTERVAL_TIME);
    +                        }
    +                    }
    +                }
    +                LOG.debug("FINISHED download of {}", blob);
    --- End diff --
    
    Should this be info?


---

Reply via email to