Github user revans2 commented on a diff in the pull request:
https://github.com/apache/storm/pull/2345#discussion_r143576368
--- Diff:
storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java ---
@@ -156,17 +146,141 @@ public AsyncLocalizer(Map<String, Object> conf,
AtomicReference<Map<Long, LocalA
DaemonConfig.SUPERVISOR_BLOBSTORE_DOWNLOAD_MAX_RETRIES), 3);
execService = Executors.newScheduledThreadPool(threadPoolSize,
- new ThreadFactoryBuilder().setNameFormat("AsyncLocalizer
Executor").build());
+ new ThreadFactoryBuilder().setNameFormat("AsyncLocalizer
Executor - %d").build());
reconstructLocalizedResources();
symlinksDisabled =
(boolean)conf.getOrDefault(Config.DISABLE_SYMLINKS, false);
- basicPending = new HashMap<>();
blobPending = new HashMap<>();
this.currAssignment = currAssignment;
recoverBlobReferences(portToAssignments);
}
+ public AsyncLocalizer(Map<String, Object> conf,
AtomicReference<Map<Long, LocalAssignment>> currAssignment,
+ Map<Integer, LocalAssignment> portToAssignments)
throws IOException {
+ this(conf, AdvancedFSOps.make(conf),
ConfigUtils.supervisorLocalDir(conf), currAssignment, portToAssignments);
+ }
+
+ @VisibleForTesting
+ LocallyCachedBlob getTopoJar(final String topologyId) throws
IOException {
+ String topoJarKey = ConfigUtils.masterStormJarKey(topologyId);
+ LocallyCachedBlob topoJar = topologyBlobs.get(topoJarKey);
+ if (topoJar == null) {
+ topoJar = new LocallyCachedTopologyBlob(topologyId,
isLocalMode, conf, fsOps,
+ LocallyCachedTopologyBlob.TopologyBlobType.TOPO_JAR);
+ topologyBlobs.put(topoJarKey, topoJar);
+ }
+ return topoJar;
+ }
+
+ @VisibleForTesting
+ LocallyCachedBlob getTopoCode(final String topologyId) throws
IOException {
+ String topoCodeKey = ConfigUtils.masterStormCodeKey(topologyId);
+ LocallyCachedBlob topoCode = topologyBlobs.get(topoCodeKey);
+ if (topoCode == null) {
+ topoCode = new LocallyCachedTopologyBlob(topologyId,
isLocalMode, conf, fsOps,
+ LocallyCachedTopologyBlob.TopologyBlobType.TOPO_CODE);
+ topologyBlobs.put(topoCodeKey, topoCode);
+ }
+ return topoCode;
+ }
+
+ @VisibleForTesting
+ LocallyCachedBlob getTopoConf(final String topologyId) throws
IOException {
+ String topoConfKey = ConfigUtils.masterStormConfKey(topologyId);
+ LocallyCachedBlob topoConf = topologyBlobs.get(topoConfKey);
+ if (topoConf == null) {
+ topoConf = new LocallyCachedTopologyBlob(topologyId,
isLocalMode, conf, fsOps,
+ LocallyCachedTopologyBlob.TopologyBlobType.TOPO_CONF);
+ topologyBlobs.put(topoConfKey, topoConf);
+ }
+ return topoConf;
+ }
+
+ public synchronized CompletableFuture<Void>
requestDownloadTopologyBlobs(final LocalAssignment assignment, final int port,
+
final BlobChangingCallback cb) throws IOException {
+ final String topologyId = assignment.get_topology_id();
+
+ CompletableFuture<Void> baseBlobs =
requestDownloadBaseTopologyBlobs(assignment, port, cb);
+ return baseBlobs.thenComposeAsync((v) -> {
+ LocalDownloadedResource localResource =
blobPending.get(topologyId);
+ if (localResource == null) {
+ Supplier<Void> supplier = new DownloadBlobs(topologyId,
assignment.get_owner());
+ localResource = new
LocalDownloadedResource(CompletableFuture.supplyAsync(supplier, execService));
+ blobPending.put(topologyId, localResource);
+ }
+ CompletableFuture<Void> r = localResource.reserve(port,
assignment);
+ LOG.debug("Reserved blobs {} {}", topologyId, localResource);
+ return r;
+ });
+ }
+
+ @VisibleForTesting
+ synchronized CompletableFuture<Void>
requestDownloadBaseTopologyBlobs(final LocalAssignment assignment, final int
port,
+
BlobChangingCallback cb) throws IOException {
+ PortAndAssignment pna = new PortAndAssignment(port, assignment);
+ final String topologyId = assignment.get_topology_id();
+
+ LocallyCachedBlob topoJar = getTopoJar(topologyId);
+ topoJar.addReference(pna, cb);
+
+ LocallyCachedBlob topoCode = getTopoCode(topologyId);
+ topoCode.addReference(pna, cb);
+
+ LocallyCachedBlob topoConf = getTopoConf(topologyId);
+ topoConf.addReference(pna, cb);
+
+ CompletableFuture<Void> ret =
topologyBasicDownloaded.get(topologyId);
+ if (ret == null) {
+ ret = downloadOrUpdate(topoJar, topoCode, topoConf);
+ }
+ return ret;
+ }
+
+ private static final int ATTEMPTS_INTERVAL_TIME = 100;
+
+ private CompletableFuture<Void> downloadOrUpdate(LocallyCachedBlob ...
blobs) {
+ CompletableFuture<Void> [] all = new
CompletableFuture[blobs.length];
+ for (int i = 0; i < blobs.length; i++) {
+ final LocallyCachedBlob blob = blobs[i];
+ all[i] = CompletableFuture.runAsync(() -> {
+ LOG.debug("STARTING download of {}", blob);
+ try (ClientBlobStore blobStore =
ServerUtils.getClientBlobStoreForSupervisor(conf)) {
+ boolean done = false;
+ long failures = 0;
+ while (!done) {
+ try {
+ synchronized (blob) {
+ long localVersion = blob.getLocalVersion();
+ long remoteVersion =
blob.getRemoteVersion(blobStore);
+ if (localVersion != remoteVersion) {
+ try {
+ long newVersion =
blob.downloadToTempLocation(blobStore);
+
blob.informAllOfChangeAndWaitForConsensus();
+ blob.commitNewVersion(newVersion);
+ blob.informAllChangeComplete();
+ } finally {
+ blob.cleanupOrphanedData();
+ }
+ }
+ }
+ done = true;
+ } catch (Exception e) {
+ failures++;
+ if (failures > blobDownloadRetries) {
+ throw new RuntimeException("Could not
download...", e);
+ }
+ LOG.warn("Failed to download blob {} will try
again in {} ms", blob, ATTEMPTS_INTERVAL_TIME, e);
+ Utils.sleep(ATTEMPTS_INTERVAL_TIME);
+ }
+ }
+ }
+ LOG.debug("FINISHED download of {}", blob);
--- End diff --
I used it for debugging. If you see a good reason for it to be info I can
make it info, but I didn't see a lot of value in it.
---