>From Murtadha Hubail <[email protected]>:

Murtadha Hubail has uploaded this change for review. ( 
https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/4323 )


Change subject: [NO ISSUE][ING] Allow Active Runtime Workers To Exit Gracefully
......................................................................

[NO ISSUE][ING] Allow Active Runtime Workers To Exit Gracefully

- user model changes: no
- storage format changes: no
- interface changes: no

Details:
- Notify ActiveManager when active runtime storage side
  workers are initialized and deinitialized.
- Track active runtime workers in ActiveManager.
- During ActiveManager shutdown, allow active runtime storage side
  workers to exit gracefully with a timeout. This is done to allow
  the active runtime storage side to process the last messages sent
  from the ingestion side before the workers are interrupted.

Change-Id: I6d1b331e284156a8ee8fab441054d50cace36603
---
M 
asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaStoreNodePushable.java
2 files changed, 41 insertions(+), 3 deletions(-)



  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb 
refs/changes/23/4323/1

diff --git 
a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java
 
b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java
index 4adffda..dfa8b2f 100644
--- 
a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java
+++ 
b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java
@@ -20,6 +20,7 @@

 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
@@ -42,6 +43,7 @@
 import org.apache.hyracks.api.util.JavaSerializationUtils;
 import org.apache.hyracks.control.nc.NodeControllerService;
 import org.apache.hyracks.util.ExitUtil;
+import org.apache.hyracks.util.Span;
 import org.apache.logging.log4j.Level;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
@@ -57,6 +59,7 @@
     private final String nodeId;
     private final INCServiceContext serviceCtx;
     private volatile boolean shutdown;
+    private final Set<Long> activeWorkers = Collections.synchronizedSet(new 
HashSet<>());

     public ActiveManager(ExecutorService executor, String nodeId, long 
activeMemoryBudget, int frameSize,
             INCServiceContext serviceCtx) throws HyracksDataException {
@@ -156,9 +159,19 @@
                 LOGGER.log(Level.WARN, "Timed out waiting to stop runtime: " + 
entry.getKey(), e);
             }
         });
+        waitForActiveWorkers();
         LOGGER.warn("Shutdown ActiveManager on node " + nodeId + " complete");
     }

+    public void runtimePartitionInitialzied(ActiveRuntimeId runtimeId) {
+        activeWorkers.add(Thread.currentThread().getId());
+    }
+
+    public void runtimePartitionDeinitialzied(ActiveRuntimeId runtimeId) {
+        LOGGER.debug("worker {} for active runtime {} was deinitialzied", 
Thread.currentThread().getName(), runtimeId);
+        activeWorkers.remove(Thread.currentThread().getId());
+    }
+
     @SuppressWarnings("squid:S1181") // Catch Error
     private void stopRuntime(ActiveManagerMessage message) {
         StopRuntimeParameters content = (StopRuntimeParameters) 
message.getPayload();
@@ -189,4 +202,22 @@
         }
     }

+    private void waitForActiveWorkers() {
+        final Span span = Span.start(SHUTDOWN_TIMEOUT_SECS, TimeUnit.SECONDS);
+        while (!span.elapsed()) {
+            if (activeWorkers.isEmpty()) {
+                return;
+            }
+            try {
+                TimeUnit.SECONDS.sleep(1);
+            } catch (InterruptedException e) {
+                LOGGER.warn("interrupted while waiting for active workers to 
exit", e);
+                Thread.currentThread().interrupt();
+            }
+        }
+        if (!activeWorkers.isEmpty()) {
+            LOGGER.warn("{} seconds passed and {} active workers are still 
active", SHUTDOWN_TIMEOUT_SECS,
+                    activeWorkers.size());
+        }
+    }
 }
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaStoreNodePushable.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaStoreNodePushable.java
index 94ae75c..1c23785 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaStoreNodePushable.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaStoreNodePushable.java
@@ -92,6 +92,8 @@

     private final long traceCategory;

+    private final ActiveRuntimeId runtimeId;
+
     public FeedMetaStoreNodePushable(IHyracksTaskContext ctx, 
IRecordDescriptorProvider recordDescProvider,
             int partition, int nPartitions, IOperatorDescriptor coreOperator, 
FeedConnectionId feedConnectionId,
             Map<String, String> feedPolicyProperties, 
FeedMetaOperatorDescriptor feedMetaOperatorDescriptor)
@@ -110,14 +112,15 @@
         this.opDesc = feedMetaOperatorDescriptor;
         tracer = ctx.getJobletContext().getServiceContext().getTracer();
         traceCategory = tracer.getRegistry().get(TraceUtils.STORAGE);
+        runtimeId = new ActiveRuntimeId(connectionId.getFeedId(),
+                runtimeType.toString() + "." + connectionId.getDatasetName(), 
partition);
     }

     @Override
     public void open() throws HyracksDataException {
-        ActiveRuntimeId runtimeId = new 
ActiveRuntimeId(connectionId.getFeedId(),
-                runtimeType.toString() + "." + connectionId.getDatasetName(), 
partition);
         try {
             initializeNewFeedRuntime(runtimeId);
+            feedManager.runtimePartitionInitialzied(runtimeId);
             insertOperator.open();
         } catch (Exception e) {
             LOGGER.log(Level.WARN, "Failed to open feed store operator", e);
@@ -164,7 +167,11 @@

     @Override
     public void close() throws HyracksDataException {
-        writer.close();
+        try {
+            writer.close();
+        } finally {
+            feedManager.runtimePartitionDeinitialzied(runtimeId);
+        }
     }

     @Override

--
To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/4323
To unsubscribe, or for help writing mail filters, visit 
https://asterix-gerrit.ics.uci.edu/settings

Gerrit-Project: asterixdb
Gerrit-Branch: stabilization-f69489
Gerrit-Change-Id: I6d1b331e284156a8ee8fab441054d50cace36603
Gerrit-Change-Number: 4323
Gerrit-PatchSet: 1
Gerrit-Owner: Murtadha Hubail <[email protected]>
Gerrit-MessageType: newchange

Reply via email to