This is an automated email from the ASF dual-hosted git repository.
agresch pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/storm.git
The following commit(s) were added to refs/heads/master by this push:
new dcaf4fb STORM-3740 cancel downloading blobs when releasing worker
slot (#3374)
dcaf4fb is described below
commit dcaf4fbded8c22a5e6a5ba2cf1f04b08945f9e7c
Author: agresch <[email protected]>
AuthorDate: Fri Feb 5 10:12:03 2021 -0600
STORM-3740 cancel downloading blobs when releasing worker slot (#3374)
---
.../org/apache/storm/daemon/supervisor/Slot.java | 24 ++++++++++++++++++++--
.../org/apache/storm/localizer/AsyncLocalizer.java | 9 +++-----
2 files changed, 25 insertions(+), 8 deletions(-)
diff --git
a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Slot.java
b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Slot.java
index 0079fa0..18fab2d 100644
--- a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Slot.java
+++ b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Slot.java
@@ -286,6 +286,7 @@ public class Slot extends Thread implements AutoCloseable,
BlobChangingCallback
assert (dynamicState.container.areAllProcessesDead());
dynamicState.container.cleanUp();
+ dynamicState.cancelPendingBlobs();
staticState.localizer.releaseSlotFor(dynamicState.currentAssignment,
staticState.port);
DynamicState ret = dynamicState.withCurrentAssignment(null, null);
if (nextState != null) {
@@ -311,6 +312,7 @@ public class Slot extends Thread implements AutoCloseable,
BlobChangingCallback
dynamicState =
dynamicState.withChangingBlobs(Collections.emptySet());
}
+ dynamicState.cancelPendingBlobs();
if (!dynamicState.pendingChangingBlobs.isEmpty()) {
dynamicState =
dynamicState.withPendingChangingBlobs(Collections.emptySet(), null);
}
@@ -404,6 +406,7 @@ public class Slot extends Thread implements AutoCloseable,
BlobChangingCallback
if
(!EquivalenceUtils.areLocalAssignmentsEquivalent(dynamicState.newAssignment,
dynamicState.pendingLocalization)) {
//Scheduling changed
+ dynamicState.cancelPendingBlobs();
staticState.localizer.releaseSlotFor(dynamicState.pendingLocalization,
staticState.port);
// Switch to the new assignment even if localization hasn't
completed, or go to empty state
// if no new assignment.
@@ -439,10 +442,13 @@ public class Slot extends Thread implements
AutoCloseable, BlobChangingCallback
} else {
LOG.error("{}", e.getCause().getMessage());
}
- // release the reference on all blobs associated with this
topology.
+
+ // release the reference on all blobs associated with this worker.
+ dynamicState.cancelPendingBlobs();
staticState.localizer.releaseSlotFor(dynamicState.pendingLocalization,
staticState.port);
// we wait for 3 seconds
Time.sleepSecs(3);
+
//Try again, or go to empty if assignment has been nulled
return prepareForNewAssignmentNoWorkersRunning(dynamicState
.withPendingLocalization(null, null),
@@ -475,10 +481,12 @@ public class Slot extends Thread implements
AutoCloseable, BlobChangingCallback
// but the container is already not running.
LOG.info("SLOT {}: Assignment Changed from {} to {}",
staticState.port,
dynamicState.currentAssignment,
dynamicState.newAssignment);
+ dynamicState.cancelPendingBlobs();
if (dynamicState.currentAssignment != null) {
staticState.localizer.releaseSlotFor(dynamicState.currentAssignment,
staticState.port);
}
-
staticState.localizer.releaseSlotFor(dynamicState.pendingChangingBlobsAssignment,
staticState.port);
+
staticState.localizer.releaseSlotFor(dynamicState.pendingChangingBlobsAssignment,
+ staticState.port);
return
prepareForNewAssignmentNoWorkersRunning(dynamicState.withCurrentAssignment(null,
null),
staticState);
}
@@ -1288,6 +1296,18 @@ public class Slot extends Thread implements
AutoCloseable, BlobChangingCallback
pendingChangingBlobs,
pendingChangingBlobsAssignment,
this.slotMetrics);
}
+
+ private void cancelPendingBlobs() {
+ // Make sure any downloading blobs in the background are stopped.
+ // This prevents a race condition where we could be adding
references to a
+ // delayed downloading blob after the slot gets released, causing
orphaned blobs.
+ for (Future future : pendingChangingBlobs) {
+ if (!future.isDone()) {
+ LOG.info("Canceling download of {}", future);
+ future.cancel(true);
+ }
+ }
+ }
}
static class TopoProfileAction {
diff --git
a/storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java
b/storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java
index 2c3eaa1..32015ce 100644
--- a/storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java
+++ b/storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java
@@ -207,7 +207,8 @@ public class AsyncLocalizer implements AutoCloseable {
}
/**
- * Request that all of the blobs necessary for this topology be downloaded.
+ * Request that all of the blobs necessary for this topology be
downloaded. Note that this adds references to
+ * blobs asynchronously in background threads.
*
* @param assignment the assignment that needs the blobs
* @param port the port the assignment is a part of
@@ -443,6 +444,7 @@ public class AsyncLocalizer implements AutoCloseable {
* @throws IOException on any error
*/
public void releaseSlotFor(LocalAssignment assignment, int port) throws
IOException {
+
PortAndAssignment pna = new PortAndAssignmentImpl(port, assignment);
final String topologyId = assignment.get_topology_id();
LOG.info("Releasing slot for {} {}", topologyId, port);
@@ -466,11 +468,6 @@ public class AsyncLocalizer implements AutoCloseable {
topoConfBlob.removeReference(pna);
}
-
- // ALERT: A possible race condition should have been resolved
- // by separating the thread pools into downloadExecService and
taskExecService
- // https://github.com/apache/storm/pull/3153
- // Will need further investigation if the race condition happens again
List<LocalResource> localResources;
try {
// Precondition1: Base blob stormconf.ser and stormcode.ser are
available