abdullah alamoudi has uploaded a new change for review. https://asterix-gerrit.ics.uci.edu/1608
Change subject: Prevent hangs on active runtime stop ...................................................................... Prevent hangs on active runtime stop Change-Id: I2e60f633cac8e835dcc7211e87d104ecbb8947b0 --- M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/AdapterExecutor.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/AdapterRuntimeManager.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/IngestionRuntime.java 3 files changed, 39 insertions(+), 31 deletions(-) git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb refs/changes/08/1608/1 diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/AdapterExecutor.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/AdapterExecutor.java index 2adce1c..12fc4e9 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/AdapterExecutor.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/AdapterExecutor.java @@ -49,25 +49,27 @@ } boolean continueIngestion = true; boolean failedIngestion = false; - while (continueIngestion) { - try { - // Start the adapter - adapter.start(partition, writer); - // Adapter has completed execution - continueIngestion = false; - } catch (Exception e) { - LOGGER.error("Exception during feed ingestion ", e); - continueIngestion = adapter.handleException(e); - failedIngestion = !continueIngestion; + try { + while (continueIngestion) { + try { + // Start the adapter + adapter.start(partition, writer); + // Adapter has completed execution + continueIngestion = false; + } catch (Exception e) { + LOGGER.error("Exception during feed ingestion ", e); + continueIngestion = adapter.handleException(e); + failedIngestion = !continueIngestion; + } + } + } finally { + // Done with the adapter. about to close, setting the stage based on the failed ingestion flag and notifying the + // runtime manager + adapterManager.setFailed(failedIngestion); + adapterManager.setDone(true); + synchronized (adapterManager) { + adapterManager.notifyAll(); } } - // Done with the adapter. about to close, setting the stage based on the failed ingestion flag and notifying the - // runtime manager - adapterManager.setFailed(failedIngestion); - adapterManager.setDone(true); - synchronized (adapterManager) { - adapterManager.notifyAll(); - } } - } diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/AdapterRuntimeManager.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/AdapterRuntimeManager.java index 7f5372b..d950dc5 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/AdapterRuntimeManager.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/AdapterRuntimeManager.java @@ -19,11 +19,14 @@ package org.apache.asterix.external.feed.runtime; import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; import org.apache.asterix.active.EntityId; import org.apache.asterix.external.dataset.adapter.FeedAdapter; import org.apache.hyracks.api.comm.IFrameWriter; import org.apache.hyracks.api.context.IHyracksTaskContext; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.log4j.Level; import org.apache.log4j.Logger; /** @@ -51,7 +54,7 @@ private volatile boolean failed = false; public AdapterRuntimeManager(IHyracksTaskContext ctx, EntityId entityId, FeedAdapter feedAdapter, - IFrameWriter writer, int partition) { + IFrameWriter writer, int partition) { this.ctx = ctx; this.feedId = entityId; this.feedAdapter = feedAdapter; @@ -63,20 +66,22 @@ execution = ctx.getExecutorService().submit(adapterExecutor); } - public void stop() throws InterruptedException { + public void stop() throws HyracksDataException, InterruptedException { try { - if (feedAdapter.stop()) { - // stop() returned true, we wait for the process termination - execution.get(); - } else { - // stop() returned false, we try to force shutdown - execution.cancel(true); - } + ctx.getExecutorService().submit(() -> { + if (!feedAdapter.stop()) { + execution.get(); + } + return null; + }).get(30, TimeUnit.SECONDS); } catch (InterruptedException e) { - LOGGER.error("Interrupted while waiting for feed adapter to finish its work", e); + LOGGER.log(Level.WARN, "Interrupted while trying to stop an adapter runtime", e); throw e; - } catch (Exception exception) { - LOGGER.error("Unable to stop adapter " + feedAdapter, exception); + } catch (Exception e) { + LOGGER.log(Level.WARN, "Exception while trying to stop an adapter runtime", e); + throw HyracksDataException.create(e); + } finally { + execution.cancel(true); } } diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/IngestionRuntime.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/IngestionRuntime.java index 3100704..590af01 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/IngestionRuntime.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/IngestionRuntime.java @@ -24,6 +24,7 @@ import org.apache.asterix.active.ActiveRuntimeId; import org.apache.asterix.active.EntityId; import org.apache.asterix.active.IActiveRuntime; +import org.apache.hyracks.api.exceptions.HyracksDataException; public class IngestionRuntime implements IActiveRuntime { @@ -50,7 +51,7 @@ } @Override - public void stop() throws InterruptedException { + public void stop() throws InterruptedException, HyracksDataException { adapterRuntimeManager.stop(); LOGGER.log(Level.INFO, "Feed " + feedId.getEntityName() + " stopped on partition " + runtimeId); } -- To view, visit https://asterix-gerrit.ics.uci.edu/1608 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-MessageType: newchange Gerrit-Change-Id: I2e60f633cac8e835dcc7211e87d104ecbb8947b0 Gerrit-PatchSet: 1 Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-Owner: abdullah alamoudi <bamou...@gmail.com>