Github user kishorvpatil commented on a diff in the pull request: https://github.com/apache/storm/pull/2345#discussion_r143510721 --- Diff: storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java --- @@ -156,17 +146,141 @@ public AsyncLocalizer(Map<String, Object> conf, AtomicReference<Map<Long, LocalA DaemonConfig.SUPERVISOR_BLOBSTORE_DOWNLOAD_MAX_RETRIES), 3); execService = Executors.newScheduledThreadPool(threadPoolSize, - new ThreadFactoryBuilder().setNameFormat("AsyncLocalizer Executor").build()); + new ThreadFactoryBuilder().setNameFormat("AsyncLocalizer Executor - %d").build()); reconstructLocalizedResources(); symlinksDisabled = (boolean)conf.getOrDefault(Config.DISABLE_SYMLINKS, false); - basicPending = new HashMap<>(); blobPending = new HashMap<>(); this.currAssignment = currAssignment; recoverBlobReferences(portToAssignments); } + public AsyncLocalizer(Map<String, Object> conf, AtomicReference<Map<Long, LocalAssignment>> currAssignment, + Map<Integer, LocalAssignment> portToAssignments) throws IOException { + this(conf, AdvancedFSOps.make(conf), ConfigUtils.supervisorLocalDir(conf), currAssignment, portToAssignments); + } + + @VisibleForTesting + LocallyCachedBlob getTopoJar(final String topologyId) throws IOException { + String topoJarKey = ConfigUtils.masterStormJarKey(topologyId); + LocallyCachedBlob topoJar = topologyBlobs.get(topoJarKey); + if (topoJar == null) { + topoJar = new LocallyCachedTopologyBlob(topologyId, isLocalMode, conf, fsOps, + LocallyCachedTopologyBlob.TopologyBlobType.TOPO_JAR); + topologyBlobs.put(topoJarKey, topoJar); + } + return topoJar; + } + + @VisibleForTesting + LocallyCachedBlob getTopoCode(final String topologyId) throws IOException { + String topoCodeKey = ConfigUtils.masterStormCodeKey(topologyId); + LocallyCachedBlob topoCode = topologyBlobs.get(topoCodeKey); + if (topoCode == null) { + topoCode = new LocallyCachedTopologyBlob(topologyId, isLocalMode, conf, fsOps, + LocallyCachedTopologyBlob.TopologyBlobType.TOPO_CODE); + topologyBlobs.put(topoCodeKey, topoCode); + } + return topoCode; + } + + @VisibleForTesting + LocallyCachedBlob getTopoConf(final String topologyId) throws IOException { + String topoConfKey = ConfigUtils.masterStormConfKey(topologyId); + LocallyCachedBlob topoConf = topologyBlobs.get(topoConfKey); + if (topoConf == null) { + topoConf = new LocallyCachedTopologyBlob(topologyId, isLocalMode, conf, fsOps, + LocallyCachedTopologyBlob.TopologyBlobType.TOPO_CONF); + topologyBlobs.put(topoConfKey, topoConf); + } + return topoConf; + } + + public synchronized CompletableFuture<Void> requestDownloadTopologyBlobs(final LocalAssignment assignment, final int port, + final BlobChangingCallback cb) throws IOException { + final String topologyId = assignment.get_topology_id(); + + CompletableFuture<Void> baseBlobs = requestDownloadBaseTopologyBlobs(assignment, port, cb); + return baseBlobs.thenComposeAsync((v) -> { + LocalDownloadedResource localResource = blobPending.get(topologyId); + if (localResource == null) { + Supplier<Void> supplier = new DownloadBlobs(topologyId, assignment.get_owner()); + localResource = new LocalDownloadedResource(CompletableFuture.supplyAsync(supplier, execService)); + blobPending.put(topologyId, localResource); + } + CompletableFuture<Void> r = localResource.reserve(port, assignment); + LOG.debug("Reserved blobs {} {}", topologyId, localResource); + return r; + }); + } + + @VisibleForTesting + synchronized CompletableFuture<Void> requestDownloadBaseTopologyBlobs(final LocalAssignment assignment, final int port, + BlobChangingCallback cb) throws IOException { + PortAndAssignment pna = new PortAndAssignment(port, assignment); + final String topologyId = assignment.get_topology_id(); + + LocallyCachedBlob topoJar = getTopoJar(topologyId); + topoJar.addReference(pna, cb); + + LocallyCachedBlob topoCode = getTopoCode(topologyId); + topoCode.addReference(pna, cb); + + LocallyCachedBlob topoConf = getTopoConf(topologyId); + topoConf.addReference(pna, cb); + + CompletableFuture<Void> ret = topologyBasicDownloaded.get(topologyId); + if (ret == null) { + ret = downloadOrUpdate(topoJar, topoCode, topoConf); + } + return ret; + } + + private static final int ATTEMPTS_INTERVAL_TIME = 100; + + private CompletableFuture<Void> downloadOrUpdate(LocallyCachedBlob ... blobs) { + CompletableFuture<Void> [] all = new CompletableFuture[blobs.length]; + for (int i = 0; i < blobs.length; i++) { + final LocallyCachedBlob blob = blobs[i]; + all[i] = CompletableFuture.runAsync(() -> { + LOG.debug("STARTING download of {}", blob); + try (ClientBlobStore blobStore = ServerUtils.getClientBlobStoreForSupervisor(conf)) { + boolean done = false; + long failures = 0; + while (!done) { + try { + synchronized (blob) { + long localVersion = blob.getLocalVersion(); + long remoteVersion = blob.getRemoteVersion(blobStore); + if (localVersion != remoteVersion) { + try { + long newVersion = blob.downloadToTempLocation(blobStore); + blob.informAllOfChangeAndWaitForConsensus(); + blob.commitNewVersion(newVersion); + blob.informAllChangeComplete(); + } finally { + blob.cleanupOrphanedData(); + } + } + } + done = true; + } catch (Exception e) { + failures++; + if (failures > blobDownloadRetries) { + throw new RuntimeException("Could not download...", e); + } + LOG.warn("Failed to download blob {} will try again in {} ms", blob, ATTEMPTS_INTERVAL_TIME, e); + Utils.sleep(ATTEMPTS_INTERVAL_TIME); + } + } + } + LOG.debug("FINISHED download of {}", blob); --- End diff -- Should this be info?
---