This is an automated email from the ASF dual-hosted git repository.

agresch pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/storm.git


The following commit(s) were added to refs/heads/master by this push:
     new dcaf4fb  STORM-3740 cancel downloading blobs when releasing worker 
slot (#3374)
dcaf4fb is described below

commit dcaf4fbded8c22a5e6a5ba2cf1f04b08945f9e7c
Author: agresch <[email protected]>
AuthorDate: Fri Feb 5 10:12:03 2021 -0600

    STORM-3740 cancel downloading blobs when releasing worker slot (#3374)
---
 .../org/apache/storm/daemon/supervisor/Slot.java   | 24 ++++++++++++++++++++--
 .../org/apache/storm/localizer/AsyncLocalizer.java |  9 +++-----
 2 files changed, 25 insertions(+), 8 deletions(-)

diff --git 
a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Slot.java 
b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Slot.java
index 0079fa0..18fab2d 100644
--- a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Slot.java
+++ b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Slot.java
@@ -286,6 +286,7 @@ public class Slot extends Thread implements AutoCloseable, 
BlobChangingCallback
         assert (dynamicState.container.areAllProcessesDead());
 
         dynamicState.container.cleanUp();
+        dynamicState.cancelPendingBlobs();
         staticState.localizer.releaseSlotFor(dynamicState.currentAssignment, 
staticState.port);
         DynamicState ret = dynamicState.withCurrentAssignment(null, null);
         if (nextState != null) {
@@ -311,6 +312,7 @@ public class Slot extends Thread implements AutoCloseable, 
BlobChangingCallback
             dynamicState = 
dynamicState.withChangingBlobs(Collections.emptySet());
         }
 
+        dynamicState.cancelPendingBlobs();
         if (!dynamicState.pendingChangingBlobs.isEmpty()) {
             dynamicState = 
dynamicState.withPendingChangingBlobs(Collections.emptySet(), null);
         }
@@ -404,6 +406,7 @@ public class Slot extends Thread implements AutoCloseable, 
BlobChangingCallback
 
             if 
(!EquivalenceUtils.areLocalAssignmentsEquivalent(dynamicState.newAssignment, 
dynamicState.pendingLocalization)) {
                 //Scheduling changed
+                dynamicState.cancelPendingBlobs();
                 
staticState.localizer.releaseSlotFor(dynamicState.pendingLocalization, 
staticState.port);
                 // Switch to the new assignment even if localization hasn't 
completed, or go to empty state
                 // if no new assignment.
@@ -439,10 +442,13 @@ public class Slot extends Thread implements 
AutoCloseable, BlobChangingCallback
             } else {
                 LOG.error("{}", e.getCause().getMessage());
             }
-            // release the reference on all blobs associated with this 
topology.
+
+            // release the reference on all blobs associated with this worker.
+            dynamicState.cancelPendingBlobs();
             
staticState.localizer.releaseSlotFor(dynamicState.pendingLocalization, 
staticState.port);
             // we wait for 3 seconds
             Time.sleepSecs(3);
+
             //Try again, or go to empty if assignment has been nulled
             return prepareForNewAssignmentNoWorkersRunning(dynamicState
                 .withPendingLocalization(null, null),
@@ -475,10 +481,12 @@ public class Slot extends Thread implements 
AutoCloseable, BlobChangingCallback
             // but the container is already not running.
             LOG.info("SLOT {}: Assignment Changed from {} to {}", 
staticState.port,
                      dynamicState.currentAssignment, 
dynamicState.newAssignment);
+            dynamicState.cancelPendingBlobs();
             if (dynamicState.currentAssignment != null) {
                 
staticState.localizer.releaseSlotFor(dynamicState.currentAssignment, 
staticState.port);
             }
-            
staticState.localizer.releaseSlotFor(dynamicState.pendingChangingBlobsAssignment,
 staticState.port);
+            
staticState.localizer.releaseSlotFor(dynamicState.pendingChangingBlobsAssignment,
+                    staticState.port);
             return 
prepareForNewAssignmentNoWorkersRunning(dynamicState.withCurrentAssignment(null,
 null),
                                                            staticState);
         }
@@ -1288,6 +1296,18 @@ public class Slot extends Thread implements 
AutoCloseable, BlobChangingCallback
                                     pendingChangingBlobs,
                                     pendingChangingBlobsAssignment, 
this.slotMetrics);
         }
+
+        private void cancelPendingBlobs() {
+            // Make sure any downloading blobs in the background are stopped.
+            // This prevents a race condition where we could be adding 
references to a
+            // delayed downloading blob after the slot gets released, causing 
orphaned blobs.
+            for (Future future : pendingChangingBlobs) {
+                if (!future.isDone()) {
+                    LOG.info("Canceling download of {}", future);
+                    future.cancel(true);
+                }
+            }
+        }
     }
 
     static class TopoProfileAction {
diff --git 
a/storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java 
b/storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java
index 2c3eaa1..32015ce 100644
--- a/storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java
+++ b/storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java
@@ -207,7 +207,8 @@ public class AsyncLocalizer implements AutoCloseable {
     }
 
     /**
-     * Request that all of the blobs necessary for this topology be downloaded.
+     * Request that all of the blobs necessary for this topology be 
downloaded.  Note that this adds references to
+     * blobs asynchronously in background threads.
      *
      * @param assignment the assignment that needs the blobs
      * @param port       the port the assignment is a part of
@@ -443,6 +444,7 @@ public class AsyncLocalizer implements AutoCloseable {
      * @throws IOException on any error
      */
     public void releaseSlotFor(LocalAssignment assignment, int port) throws 
IOException {
+
         PortAndAssignment pna = new PortAndAssignmentImpl(port, assignment);
         final String topologyId = assignment.get_topology_id();
         LOG.info("Releasing slot for {} {}", topologyId, port);
@@ -466,11 +468,6 @@ public class AsyncLocalizer implements AutoCloseable {
             topoConfBlob.removeReference(pna);
         }
 
-
-        // ALERT: A possible race condition should have been resolved
-        // by separating the thread pools into downloadExecService and 
taskExecService
-        // https://github.com/apache/storm/pull/3153
-        // Will need further investigation if the race condition happens again
         List<LocalResource> localResources;
         try {
             // Precondition1: Base blob stormconf.ser and stormcode.ser are 
available

Reply via email to