>From Murtadha Hubail <[email protected]>:
Murtadha Hubail has uploaded this change for review. (
https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/4323 )
Change subject: [NO ISSUE][ING] Allow Active Runtime Workers To Exit Gracefully
......................................................................
[NO ISSUE][ING] Allow Active Runtime Workers To Exit Gracefully
- user model changes: no
- storage format changes: no
- interface changes: no
Details:
- Notify ActiveManager when active runtime storage side
workers are initialized and deinitialized.
- Track active runtime workers in ActiveManager.
- During ActiveManager shutdown, allow active runtime storage side
workers to exit gracefully with a timeout. This is done to allow
the active runtime storage side to process the last messages sent
from the ingestion side before the workers are interrupted.
Change-Id: I6d1b331e284156a8ee8fab441054d50cace36603
---
M
asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java
M
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaStoreNodePushable.java
2 files changed, 41 insertions(+), 3 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb
refs/changes/23/4323/1
diff --git
a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java
b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java
index 4adffda..dfa8b2f 100644
---
a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java
+++
b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java
@@ -20,6 +20,7 @@
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
@@ -42,6 +43,7 @@
import org.apache.hyracks.api.util.JavaSerializationUtils;
import org.apache.hyracks.control.nc.NodeControllerService;
import org.apache.hyracks.util.ExitUtil;
+import org.apache.hyracks.util.Span;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -57,6 +59,7 @@
private final String nodeId;
private final INCServiceContext serviceCtx;
private volatile boolean shutdown;
+ private final Set<Long> activeWorkers = Collections.synchronizedSet(new
HashSet<>());
public ActiveManager(ExecutorService executor, String nodeId, long
activeMemoryBudget, int frameSize,
INCServiceContext serviceCtx) throws HyracksDataException {
@@ -156,9 +159,19 @@
LOGGER.log(Level.WARN, "Timed out waiting to stop runtime: " +
entry.getKey(), e);
}
});
+ waitForActiveWorkers();
LOGGER.warn("Shutdown ActiveManager on node " + nodeId + " complete");
}
+ public void runtimePartitionInitialzied(ActiveRuntimeId runtimeId) {
+ activeWorkers.add(Thread.currentThread().getId());
+ }
+
+ public void runtimePartitionDeinitialzied(ActiveRuntimeId runtimeId) {
+ LOGGER.debug("worker {} for active runtime {} was deinitialzied",
Thread.currentThread().getName(), runtimeId);
+ activeWorkers.remove(Thread.currentThread().getId());
+ }
+
@SuppressWarnings("squid:S1181") // Catch Error
private void stopRuntime(ActiveManagerMessage message) {
StopRuntimeParameters content = (StopRuntimeParameters)
message.getPayload();
@@ -189,4 +202,22 @@
}
}
+ private void waitForActiveWorkers() {
+ final Span span = Span.start(SHUTDOWN_TIMEOUT_SECS, TimeUnit.SECONDS);
+ while (!span.elapsed()) {
+ if (activeWorkers.isEmpty()) {
+ return;
+ }
+ try {
+ TimeUnit.SECONDS.sleep(1);
+ } catch (InterruptedException e) {
+ LOGGER.warn("interrupted while waiting for active workers to
exit", e);
+ Thread.currentThread().interrupt();
+ }
+ }
+ if (!activeWorkers.isEmpty()) {
+ LOGGER.warn("{} seconds passed and {} active workers are still
active", SHUTDOWN_TIMEOUT_SECS,
+ activeWorkers.size());
+ }
+ }
}
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaStoreNodePushable.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaStoreNodePushable.java
index 94ae75c..1c23785 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaStoreNodePushable.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaStoreNodePushable.java
@@ -92,6 +92,8 @@
private final long traceCategory;
+ private final ActiveRuntimeId runtimeId;
+
public FeedMetaStoreNodePushable(IHyracksTaskContext ctx,
IRecordDescriptorProvider recordDescProvider,
int partition, int nPartitions, IOperatorDescriptor coreOperator,
FeedConnectionId feedConnectionId,
Map<String, String> feedPolicyProperties,
FeedMetaOperatorDescriptor feedMetaOperatorDescriptor)
@@ -110,14 +112,15 @@
this.opDesc = feedMetaOperatorDescriptor;
tracer = ctx.getJobletContext().getServiceContext().getTracer();
traceCategory = tracer.getRegistry().get(TraceUtils.STORAGE);
+ runtimeId = new ActiveRuntimeId(connectionId.getFeedId(),
+ runtimeType.toString() + "." + connectionId.getDatasetName(),
partition);
}
@Override
public void open() throws HyracksDataException {
- ActiveRuntimeId runtimeId = new
ActiveRuntimeId(connectionId.getFeedId(),
- runtimeType.toString() + "." + connectionId.getDatasetName(),
partition);
try {
initializeNewFeedRuntime(runtimeId);
+ feedManager.runtimePartitionInitialzied(runtimeId);
insertOperator.open();
} catch (Exception e) {
LOGGER.log(Level.WARN, "Failed to open feed store operator", e);
@@ -164,7 +167,11 @@
@Override
public void close() throws HyracksDataException {
- writer.close();
+ try {
+ writer.close();
+ } finally {
+ feedManager.runtimePartitionDeinitialzied(runtimeId);
+ }
}
@Override
--
To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/4323
To unsubscribe, or for help writing mail filters, visit
https://asterix-gerrit.ics.uci.edu/settings
Gerrit-Project: asterixdb
Gerrit-Branch: stabilization-f69489
Gerrit-Change-Id: I6d1b331e284156a8ee8fab441054d50cace36603
Gerrit-Change-Number: 4323
Gerrit-PatchSet: 1
Gerrit-Owner: Murtadha Hubail <[email protected]>
Gerrit-MessageType: newchange