abdullah alamoudi has uploaded a new change for review.

  https://asterix-gerrit.ics.uci.edu/1608

Change subject: Prevent hangs on active runtime stop
......................................................................

Prevent hangs on active runtime stop

Change-Id: I2e60f633cac8e835dcc7211e87d104ecbb8947b0
---
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/AdapterExecutor.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/AdapterRuntimeManager.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/IngestionRuntime.java
3 files changed, 39 insertions(+), 31 deletions(-)


  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb 
refs/changes/08/1608/1

diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/AdapterExecutor.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/AdapterExecutor.java
index 2adce1c..12fc4e9 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/AdapterExecutor.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/AdapterExecutor.java
@@ -49,25 +49,27 @@
         }
         boolean continueIngestion = true;
         boolean failedIngestion = false;
-        while (continueIngestion) {
-            try {
-                // Start the adapter
-                adapter.start(partition, writer);
-                // Adapter has completed execution
-                continueIngestion = false;
-            } catch (Exception e) {
-                LOGGER.error("Exception during feed ingestion ", e);
-                continueIngestion = adapter.handleException(e);
-                failedIngestion = !continueIngestion;
+        try {
+            while (continueIngestion) {
+                try {
+                    // Start the adapter
+                    adapter.start(partition, writer);
+                    // Adapter has completed execution
+                    continueIngestion = false;
+                } catch (Exception e) {
+                    LOGGER.error("Exception during feed ingestion ", e);
+                    continueIngestion = adapter.handleException(e);
+                    failedIngestion = !continueIngestion;
+                }
+            }
+        } finally {
+            // Done with the adapter. about to close, setting the stage based 
on the failed ingestion flag and notifying the
+            // runtime manager
+            adapterManager.setFailed(failedIngestion);
+            adapterManager.setDone(true);
+            synchronized (adapterManager) {
+                adapterManager.notifyAll();
             }
         }
-        // Done with the adapter. about to close, setting the stage based on 
the failed ingestion flag and notifying the
-        // runtime manager
-        adapterManager.setFailed(failedIngestion);
-        adapterManager.setDone(true);
-        synchronized (adapterManager) {
-            adapterManager.notifyAll();
-        }
     }
-
 }
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/AdapterRuntimeManager.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/AdapterRuntimeManager.java
index 7f5372b..d950dc5 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/AdapterRuntimeManager.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/AdapterRuntimeManager.java
@@ -19,11 +19,14 @@
 package org.apache.asterix.external.feed.runtime;
 
 import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.asterix.active.EntityId;
 import org.apache.asterix.external.dataset.adapter.FeedAdapter;
 import org.apache.hyracks.api.comm.IFrameWriter;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.log4j.Level;
 import org.apache.log4j.Logger;
 
 /**
@@ -51,7 +54,7 @@
     private volatile boolean failed = false;
 
     public AdapterRuntimeManager(IHyracksTaskContext ctx, EntityId entityId, 
FeedAdapter feedAdapter,
-                                 IFrameWriter writer, int partition) {
+            IFrameWriter writer, int partition) {
         this.ctx = ctx;
         this.feedId = entityId;
         this.feedAdapter = feedAdapter;
@@ -63,20 +66,22 @@
         execution = ctx.getExecutorService().submit(adapterExecutor);
     }
 
-    public void stop() throws InterruptedException {
+    public void stop() throws HyracksDataException, InterruptedException {
         try {
-            if (feedAdapter.stop()) {
-                // stop() returned true, we wait for the process termination
-                execution.get();
-            } else {
-                // stop() returned false, we try to force shutdown
-                execution.cancel(true);
-            }
+            ctx.getExecutorService().submit(() -> {
+                if (!feedAdapter.stop()) {
+                    execution.get();
+                }
+                return null;
+            }).get(30, TimeUnit.SECONDS);
         } catch (InterruptedException e) {
-            LOGGER.error("Interrupted while waiting for feed adapter to finish 
its work", e);
+            LOGGER.log(Level.WARN, "Interrupted while trying to stop an 
adapter runtime", e);
             throw e;
-        } catch (Exception exception) {
-            LOGGER.error("Unable to stop adapter " + feedAdapter, exception);
+        } catch (Exception e) {
+            LOGGER.log(Level.WARN, "Exception while trying to stop an adapter 
runtime", e);
+            throw HyracksDataException.create(e);
+        } finally {
+            execution.cancel(true);
         }
     }
 
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/IngestionRuntime.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/IngestionRuntime.java
index 3100704..590af01 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/IngestionRuntime.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/IngestionRuntime.java
@@ -24,6 +24,7 @@
 import org.apache.asterix.active.ActiveRuntimeId;
 import org.apache.asterix.active.EntityId;
 import org.apache.asterix.active.IActiveRuntime;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
 
 public class IngestionRuntime implements IActiveRuntime {
 
@@ -50,7 +51,7 @@
     }
 
     @Override
-    public void stop() throws InterruptedException {
+    public void stop() throws InterruptedException, HyracksDataException {
         adapterRuntimeManager.stop();
         LOGGER.log(Level.INFO, "Feed " + feedId.getEntityName() + " stopped on 
partition " + runtimeId);
     }

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/1608
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I2e60f633cac8e835dcc7211e87d104ecbb8947b0
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: abdullah alamoudi <bamou...@gmail.com>

Reply via email to