abdullah alamoudi has uploaded a new change for review.

  https://asterix-gerrit.ics.uci.edu/1921

Change subject: [ASTERIXDB-2008][CLUS] Only add pending removal if node known
......................................................................

[ASTERIXDB-2008][CLUS] Only add pending removal if node known

[ASTERIXDB-2023][ING] Introduce Enums instead of using bytes

- user model changes: no
- storage format changes: no
- interface changes: no

details:
- Only nodes which are known to cluster manager are added
  to the list of nodes pending removal. Other nodes are ignored
- Enums introduced:
  - ActiveEvent.Kind
  - ActivePartitionMessage.Event

Change-Id: I7044896559798426c04a3f46861bc5335b25d140
---
M asterixdb/asterix-active/pom.xml
M 
asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java
M 
asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveSourceOperatorNodePushable.java
M 
asterixdb/asterix-active/src/main/java/org/apache/asterix/active/SingleThreadEventProcessor.java
M 
asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveManagerMessage.java
M 
asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActivePartitionMessage.java
M 
asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/StatsRequestMessage.java
M 
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
M 
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java
M 
asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
M asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/Action.java
M 
asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveStatsTest.java
M asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/Actor.java
M 
asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/TestNodeControllerActor.java
M 
asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/AdapterRuntimeManager.java
M 
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
M 
hyracks-fullstack/algebricks/algebricks-common/src/main/java/org/apache/hyracks/algebricks/common/constraints/AlgebricksAbsolutePartitionConstraint.java
M 
hyracks-fullstack/hyracks/hyracks-api/src/test/java/org/apache/hyracks/api/test/FrameWriterTestUtils.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/DatasetDirectoryService.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/TaskFailureWork.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/ApplicationMessageWork.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/NotifyTaskCompleteWork.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/NotifyTaskFailureWork.java
M 
hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCSystem.java
M 
hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/freepage/MutableArrayValueReference.java
M 
hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeDiskComponent.java
M 
hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeWithBuddyDiskComponent.java
M 
hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/utils/ComponentMetadataUtil.java
M 
hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexDiskComponent.java
M 
hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeDiskComponent.java
33 files changed, 255 insertions(+), 172 deletions(-)


  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb 
refs/changes/21/1921/1

diff --git a/asterixdb/asterix-active/pom.xml b/asterixdb/asterix-active/pom.xml
index 3dd24b6..6568795 100644
--- a/asterixdb/asterix-active/pom.xml
+++ b/asterixdb/asterix-active/pom.xml
@@ -31,10 +31,6 @@
       <version>${project.version}</version>
     </dependency>
     <dependency>
-      <groupId>log4j</groupId>
-      <artifactId>log4j</artifactId>
-    </dependency>
-    <dependency>
       <groupId>org.apache.hyracks</groupId>
       <artifactId>hyracks-api</artifactId>
     </dependency>
diff --git 
a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java
 
b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java
index fcf2be9..1dbacf5 100644
--- 
a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java
+++ 
b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java
@@ -26,6 +26,8 @@
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.logging.Level;
+import java.util.logging.Logger;
 
 import org.apache.asterix.active.message.ActiveManagerMessage;
 import org.apache.asterix.active.message.ActiveStatsResponse;
@@ -38,7 +40,6 @@
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.util.JavaSerializationUtils;
 import org.apache.hyracks.control.nc.NodeControllerService;
-import org.apache.log4j.Logger;
 
 public class ActiveManager {
 
@@ -86,15 +87,16 @@
     }
 
     public void submit(ActiveManagerMessage message) throws 
HyracksDataException {
+        LOGGER.log(Level.INFO, "Message of type " + message.getKind() + " 
received in " + nodeId);
         switch (message.getKind()) {
-            case ActiveManagerMessage.STOP_ACTIVITY:
+            case STOP_ACTIVITY:
                 stopRuntime(message);
                 break;
-            case ActiveManagerMessage.REQUEST_STATS:
+            case REQUEST_STATS:
                 requestStats((StatsRequestMessage) message);
                 break;
             default:
-                LOGGER.warn("Unknown message type received: " + 
message.getKind());
+                LOGGER.warning("Unknown message type received: " + 
message.getKind());
         }
     }
 
@@ -104,7 +106,7 @@
             IActiveRuntime runtime = runtimes.get(runtimeId);
             long reqId = message.getReqId();
             if (runtime == null) {
-                LOGGER.warn("Request stats of a runtime that is not registered 
" + runtimeId);
+                LOGGER.warning("Request stats of a runtime that is not 
registered " + runtimeId);
                 // Send a failure message
                 ((NodeControllerService) serviceCtx.getControllerService())
                         .sendApplicationMessageToCC(
@@ -124,7 +126,7 @@
     }
 
     public void shutdown() {
-        LOGGER.warn("Shutting down ActiveManager on node " + nodeId);
+        LOGGER.warning("Shutting down ActiveManager on node " + nodeId);
         Map<ActiveRuntimeId, Future<Void>> stopFutures = new HashMap<>();
         shutdown = true;
         runtimes.forEach((runtimeId, runtime) -> stopFutures.put(runtimeId, 
executor.submit(() -> {
@@ -136,29 +138,29 @@
             try {
                 entry.getValue().get(SHUTDOWN_TIMEOUT_SECS, TimeUnit.SECONDS);
             } catch (InterruptedException e) {
-                LOGGER.warn("Interrupted waiting to stop runtime: " + 
entry.getKey());
+                LOGGER.warning("Interrupted waiting to stop runtime: " + 
entry.getKey());
                 Thread.currentThread().interrupt();
             } catch (ExecutionException e) {
-                LOGGER.warn("Exception while stopping runtime: " + 
entry.getKey(), e);
+                LOGGER.log(Level.WARNING, "Exception while stopping runtime: " 
+ entry.getKey(), e);
             } catch (TimeoutException e) {
-                LOGGER.warn("Timed out waiting to stop runtime: " + 
entry.getKey(), e);
+                LOGGER.log(Level.WARNING, "Timed out waiting to stop runtime: 
" + entry.getKey(), e);
             }
         });
-        LOGGER.warn("Shutdown ActiveManager on node " + nodeId + " complete");
+        LOGGER.warning("Shutdown ActiveManager on node " + nodeId + " 
complete");
     }
 
     private void stopRuntime(ActiveManagerMessage message) {
         ActiveRuntimeId runtimeId = (ActiveRuntimeId) message.getPayload();
         IActiveRuntime runtime = runtimes.get(runtimeId);
         if (runtime == null) {
-            LOGGER.warn("Request to stop a runtime that is not registered " + 
runtimeId);
+            LOGGER.warning("Request to stop a runtime that is not registered " 
+ runtimeId);
         } else {
             executor.execute(() -> {
                 try {
                     stopIfRunning(runtimeId, runtime);
                 } catch (Exception e) {
                     // TODO(till) Figure out a better way to handle failure to 
stop a runtime
-                    LOGGER.warn("Failed to stop runtime: " + runtimeId, e);
+                    LOGGER.log(Level.WARNING, "Failed to stop runtime: " + 
runtimeId, e);
                 }
             });
         }
diff --git 
a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveSourceOperatorNodePushable.java
 
b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveSourceOperatorNodePushable.java
index a7d7796..adef590 100644
--- 
a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveSourceOperatorNodePushable.java
+++ 
b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveSourceOperatorNodePushable.java
@@ -22,6 +22,7 @@
 import java.util.logging.Logger;
 
 import org.apache.asterix.active.message.ActivePartitionMessage;
+import org.apache.asterix.active.message.ActivePartitionMessage.Event;
 import org.apache.asterix.common.api.INcApplicationContext;
 import org.apache.hyracks.api.comm.IFrameWriter;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
@@ -89,7 +90,7 @@
         try {
             // notify cc that runtime has been registered
             ctx.sendApplicationMessageToCC(new 
ActivePartitionMessage(runtimeId, ctx.getJobletContext().getJobId(),
-                    ActivePartitionMessage.ACTIVE_RUNTIME_REGISTERED, null), 
null);
+                    Event.RUNTIME_REGISTERED, null), null);
             start();
         } catch (InterruptedException e) {
             LOGGER.log(Level.INFO, "initialize() interrupted on 
ActiveSourceOperatorNodePushable", e);
@@ -112,7 +113,7 @@
         activeManager.deregisterRuntime(runtimeId);
         try {
             ctx.sendApplicationMessageToCC(new 
ActivePartitionMessage(runtimeId, ctx.getJobletContext().getJobId(),
-                    ActivePartitionMessage.ACTIVE_RUNTIME_DEREGISTERED, null), 
null);
+                    Event.RUNTIME_DEREGISTERED, null), null);
         } catch (Exception e) {
             LOGGER.log(Level.INFO, "deinitialize() failed on 
ActiveSourceOperatorNodePushable", e);
             throw HyracksDataException.create(e);
diff --git 
a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/SingleThreadEventProcessor.java
 
b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/SingleThreadEventProcessor.java
index 0a36216..6ce696a 100644
--- 
a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/SingleThreadEventProcessor.java
+++ 
b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/SingleThreadEventProcessor.java
@@ -27,6 +27,7 @@
 import java.util.logging.Logger;
 
 import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.common.exceptions.RuntimeDataException;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 
 public abstract class SingleThreadEventProcessor<T> implements Runnable {
@@ -71,8 +72,8 @@
     public void stop() throws HyracksDataException, InterruptedException {
         future.cancel(true);
         executorService.shutdown();
-        if (!executorService.awaitTermination(10, TimeUnit.SECONDS)) {
-            throw 
HyracksDataException.create(ErrorCode.FAILED_TO_SHUTDOWN_EVENT_PROCESSOR, name);
+        if (!executorService.awaitTermination(60, TimeUnit.MINUTES)) {
+            throw new 
RuntimeDataException(ErrorCode.FAILED_TO_SHUTDOWN_EVENT_PROCESSOR, name);
         }
     }
 }
diff --git 
a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveManagerMessage.java
 
b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveManagerMessage.java
index 9772698..bef418b 100644
--- 
a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveManagerMessage.java
+++ 
b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveManagerMessage.java
@@ -26,14 +26,16 @@
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 
 public class ActiveManagerMessage implements INcAddressedMessage {
-    public static final byte STOP_ACTIVITY = 0x00;
-    public static final byte REQUEST_STATS = 0x01;
+    public enum Kind {
+        STOP_ACTIVITY,
+        REQUEST_STATS
+    }
 
     private static final long serialVersionUID = 1L;
-    private final byte kind;
+    private final Kind kind;
     private final Serializable payload;
 
-    public ActiveManagerMessage(byte kind, Serializable payload) {
+    public ActiveManagerMessage(Kind kind, Serializable payload) {
         this.kind = kind;
         this.payload = payload;
     }
@@ -42,7 +44,7 @@
         return payload;
     }
 
-    public byte getKind() {
+    public Kind getKind() {
         return kind;
     }
 
diff --git 
a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActivePartitionMessage.java
 
b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActivePartitionMessage.java
index a47d5a5..9ace417 100644
--- 
a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActivePartitionMessage.java
+++ 
b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActivePartitionMessage.java
@@ -29,17 +29,19 @@
 import org.apache.hyracks.api.job.JobId;
 
 public class ActivePartitionMessage implements ICcAddressedMessage {
+    public enum Event {
+        RUNTIME_REGISTERED,
+        RUNTIME_DEREGISTERED,
+        GENERIC_EVENT
+    }
 
     private static final long serialVersionUID = 1L;
-    public static final byte ACTIVE_RUNTIME_REGISTERED = 0x00;
-    public static final byte ACTIVE_RUNTIME_DEREGISTERED = 0x01;
-    public static final byte GENERIC_EVENT = 0x02;
     private final ActiveRuntimeId activeRuntimeId;
     private final JobId jobId;
     private final Serializable payload;
-    private final byte event;
+    private final Event event;
 
-    public ActivePartitionMessage(ActiveRuntimeId activeRuntimeId, JobId 
jobId, byte event, Serializable payload) {
+    public ActivePartitionMessage(ActiveRuntimeId activeRuntimeId, JobId 
jobId, Event event, Serializable payload) {
         this.activeRuntimeId = activeRuntimeId;
         this.jobId = jobId;
         this.event = event;
@@ -58,7 +60,7 @@
         return payload;
     }
 
-    public byte getEvent() {
+    public Event getEvent() {
         return event;
     }
 
diff --git 
a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/StatsRequestMessage.java
 
b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/StatsRequestMessage.java
index 8fa5f19..d43f00e 100644
--- 
a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/StatsRequestMessage.java
+++ 
b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/StatsRequestMessage.java
@@ -24,8 +24,8 @@
     private static final long serialVersionUID = 1L;
     private final long reqId;
 
-    public StatsRequestMessage(byte kind, Serializable payload, long reqId) {
-        super(kind, payload);
+    public StatsRequestMessage(Serializable payload, long reqId) {
+        super(Kind.REQUEST_STATS, payload);
         this.reqId = reqId;
     }
 
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
index 995e372..493cc0a 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
@@ -38,8 +38,8 @@
 import org.apache.asterix.active.IRetryPolicy;
 import org.apache.asterix.active.IRetryPolicyFactory;
 import org.apache.asterix.active.NoRetryPolicyFactory;
-import org.apache.asterix.active.message.ActiveManagerMessage;
 import org.apache.asterix.active.message.ActivePartitionMessage;
+import org.apache.asterix.active.message.ActivePartitionMessage.Event;
 import org.apache.asterix.active.message.StatsRequestMessage;
 import org.apache.asterix.common.api.IClusterManagementWork.ClusterState;
 import org.apache.asterix.common.api.IMetadataLockManager;
@@ -68,6 +68,7 @@
 public abstract class ActiveEntityEventsListener implements 
IActiveEntityController {
 
     private static final Logger LOGGER = 
Logger.getLogger(ActiveEntityEventsListener.class.getName());
+    private static final Level level = Level.WARNING;
     private static final ActiveEvent STATE_CHANGED = new ActiveEvent(null, 
Kind.STATE_CHANGED, null, null);
     private static final EnumSet<ActivityState> TRANSITION_STATES = 
EnumSet.of(ActivityState.RESUMING,
             ActivityState.STARTING, ActivityState.STOPPING, 
ActivityState.RECOVERING);
@@ -130,7 +131,7 @@
     }
 
     protected synchronized void setState(ActivityState newState) {
-        LOGGER.log(Level.FINE, "State is being set to " + newState + " from " 
+ state);
+        LOGGER.log(level, "State of " + getEntityId() + "is being set to " + 
newState + " from " + state);
         this.prevState = state;
         this.state = newState;
         if (newState == ActivityState.SUSPENDED) {
@@ -142,7 +143,7 @@
     @Override
     public synchronized void notify(ActiveEvent event) {
         try {
-            LOGGER.fine("EventListener is notified.");
+            LOGGER.log(level, "EventListener is notified.");
             ActiveEvent.Kind eventKind = event.getEventKind();
             switch (eventKind) {
                 case JOB_CREATED:
@@ -172,22 +173,24 @@
     }
 
     protected synchronized void handle(ActivePartitionMessage message) {
-        if (message.getEvent() == 
ActivePartitionMessage.ACTIVE_RUNTIME_REGISTERED) {
+        if (message.getEvent() == Event.RUNTIME_REGISTERED) {
             numRegistered++;
             if (numRegistered == locations.getLocations().length) {
                 setState(ActivityState.RUNNING);
             }
-        } else if (message.getEvent() == 
ActivePartitionMessage.ACTIVE_RUNTIME_DEREGISTERED) {
+        } else if (message.getEvent() == Event.RUNTIME_DEREGISTERED) {
             numRegistered--;
         }
     }
 
     @SuppressWarnings("unchecked")
     protected void finish(ActiveEvent event) throws HyracksDataException {
+        LOGGER.log(level, "the job " + jobId + " finished");
         jobId = null;
         Pair<JobStatus, List<Exception>> status = (Pair<JobStatus, 
List<Exception>>) event.getEventObject();
         JobStatus jobStatus = status.getLeft();
         List<Exception> exceptions = status.getRight();
+        LOGGER.log(level, "The job finished with status: " + jobStatus);
         if (jobStatus.equals(JobStatus.FAILURE)) {
             jobFailure = exceptions.isEmpty() ? new 
RuntimeDataException(ErrorCode.UNREPORTED_TASK_FAILURE_EXCEPTION)
                     : exceptions.get(0);
@@ -271,10 +274,10 @@
     @SuppressWarnings("unchecked")
     @Override
     public void refreshStats(long timeout) throws HyracksDataException {
-        LOGGER.log(Level.FINE, "refreshStats called");
+        LOGGER.log(level, "refreshStats called");
         synchronized (this) {
             if (state != ActivityState.RUNNING || isFetchingStats) {
-                LOGGER.log(Level.FINE,
+                LOGGER.log(level,
                         "returning immediately since state = " + state + " and 
fetchingStats = " + isFetchingStats);
                 return;
             } else {
@@ -287,8 +290,7 @@
         List<INcAddressedMessage> requests = new ArrayList<>();
         List<String> ncs = Arrays.asList(locations.getLocations());
         for (int i = 0; i < ncs.size(); i++) {
-            requests.add(new 
StatsRequestMessage(ActiveManagerMessage.REQUEST_STATS,
-                    new ActiveRuntimeId(entityId, runtimeName, i), reqId));
+            requests.add(new StatsRequestMessage(new ActiveRuntimeId(entityId, 
runtimeName, i), reqId));
         }
         try {
             List<String> responses = (List<String>) 
messageBroker.sendSyncRequestToNCs(reqId, ncs, requests, timeout);
@@ -348,32 +350,32 @@
 
     @Override
     public synchronized void recover() throws HyracksDataException {
-        LOGGER.log(Level.FINE, "Recover is called on " + entityId);
+        LOGGER.log(level, "Recover is called on " + entityId);
         if (recoveryTask != null) {
-            LOGGER.log(Level.FINE, "But recovery task for " + entityId + " is 
already there!! throwing an exception");
+            LOGGER.log(level, "But recovery task for " + entityId + " is 
already there!! throwing an exception");
             throw new RuntimeDataException(ErrorCode.DOUBLE_RECOVERY_ATTEMPTS);
         }
         if (retryPolicyFactory == NoRetryPolicyFactory.INSTANCE) {
-            LOGGER.log(Level.FINE, "But it has no recovery policy, so it is 
set to permanent failure");
+            LOGGER.log(level, "But it has no recovery policy, so it is set to 
permanent failure");
             setState(ActivityState.PERMANENTLY_FAILED);
         } else {
             ExecutorService executor = 
appCtx.getServiceContext().getControllerService().getExecutor();
             IRetryPolicy policy = retryPolicyFactory.create(this);
             cancelRecovery = false;
             setState(ActivityState.TEMPORARILY_FAILED);
-            LOGGER.log(Level.FINE, "Recovery task has been submitted");
+            LOGGER.log(level, "Recovery task has been submitted");
             recoveryTask = executor.submit(() -> doRecover(policy));
         }
     }
 
     protected Void doRecover(IRetryPolicy policy)
             throws AlgebricksException, HyracksDataException, 
InterruptedException {
-        LOGGER.log(Level.FINE, "Actual Recovery task has started");
+        LOGGER.log(level, "Actual Recovery task has started");
         if (getState() != ActivityState.TEMPORARILY_FAILED) {
-            LOGGER.log(Level.FINE, "but its state is not temp failure and so 
we're just returning");
+            LOGGER.log(level, "but its state is not temp failure and so we're 
just returning");
             return null;
         }
-        LOGGER.log(Level.FINE, "calling the policy");
+        LOGGER.log(level, "calling the policy");
         while (policy.retry()) {
             synchronized (this) {
                 if (cancelRecovery) {
@@ -402,7 +404,7 @@
                     doStart(metadataProvider);
                     return null;
                 } catch (Exception e) {
-                    LOGGER.log(Level.WARNING, "Attempt to revive " + entityId 
+ " failed", e);
+                    LOGGER.log(level, "Attempt to revive " + entityId + " 
failed", e);
                     setState(ActivityState.TEMPORARILY_FAILED);
                     recoverFailure = e;
                 } finally {
@@ -515,10 +517,10 @@
         WaitForStateSubscriber subscriber;
         Future<Void> suspendTask;
         synchronized (this) {
-            LOGGER.log(Level.FINE, "suspending entity " + entityId);
-            LOGGER.log(Level.FINE, "Waiting for ongoing activities");
+            LOGGER.log(level, "suspending entity " + entityId);
+            LOGGER.log(level, "Waiting for ongoing activities");
             waitForNonTransitionState();
-            LOGGER.log(Level.FINE, "Proceeding with suspension. Current state 
is " + state);
+            LOGGER.log(level, "Proceeding with suspension. Current state is " 
+ state);
             if (state == ActivityState.STOPPED || state == 
ActivityState.PERMANENTLY_FAILED) {
                 suspended = true;
                 return;
@@ -536,12 +538,12 @@
                     EnumSet.of(ActivityState.SUSPENDED, 
ActivityState.TEMPORARILY_FAILED));
             suspendTask = 
metadataProvider.getApplicationContext().getServiceContext().getControllerService()
                     .getExecutor().submit(() -> doSuspend(metadataProvider));
-            LOGGER.log(Level.FINE, "Suspension task has been submitted");
+            LOGGER.log(level, "Suspension task has been submitted");
         }
         try {
-            LOGGER.log(Level.FINE, "Waiting for suspension task to complete");
+            LOGGER.log(level, "Waiting for suspension task to complete");
             suspendTask.get();
-            LOGGER.log(Level.FINE, "waiting for state to become SUSPENDED or 
TEMPORARILY_FAILED");
+            LOGGER.log(level, "waiting for state to become SUSPENDED or 
TEMPORARILY_FAILED");
             subscriber.sync();
         } catch (Exception e) {
             synchronized (this) {
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java
index c5e5dbb..5b576ac 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java
@@ -53,7 +53,7 @@
         implements IActiveNotificationHandler, IJobLifecycleListener {
 
     private static final Logger LOGGER = 
Logger.getLogger(ActiveNotificationHandler.class.getName());
-    private static final boolean DEBUG = false;
+    private static final Level level = Level.WARNING;
     public static final String ACTIVE_ENTITY_PROPERTY_NAME = "ActiveJob";
     private final Map<EntityId, IActiveEntityEventsListener> 
entityEventListeners;
     private final Map<JobId, EntityId> jobId2EntityId;
@@ -73,13 +73,13 @@
         EntityId entityId = jobId2EntityId.get(event.getJobId());
         if (entityId != null) {
             IActiveEntityEventsListener listener = 
entityEventListeners.get(entityId);
-            LOGGER.log(Level.FINE, "Next event is of type " + 
event.getEventKind());
+            LOGGER.log(level, "Next event is of type " + event.getEventKind());
             if (event.getEventKind() == Kind.JOB_FINISHED) {
-                LOGGER.log(Level.FINE, "Removing the job");
+                LOGGER.log(level, "Removing the job");
                 jobId2EntityId.remove(event.getJobId());
             }
             if (listener != null) {
-                LOGGER.log(Level.FINE, "Notifying the listener");
+                LOGGER.log(level, "Notifying the listener");
                 listener.notify(event);
             }
         } else {
@@ -91,34 +91,30 @@
 
     @Override
     public void notifyJobCreation(JobId jobId, JobSpecification 
jobSpecification) throws HyracksDataException {
-        LOGGER.log(Level.FINE,
+        LOGGER.log(level,
                 "notifyJobCreation(JobId jobId, JobSpecification 
jobSpecification) was called with jobId = " + jobId);
         Object property = 
jobSpecification.getProperty(ACTIVE_ENTITY_PROPERTY_NAME);
         if (property == null || !(property instanceof EntityId)) {
-            LOGGER.log(Level.FINE, "Job is not of type active job. property 
found to be: " + property);
+            LOGGER.log(level, "Job is not of type active job. property found 
to be: " + property);
             return;
         }
         EntityId entityId = (EntityId) property;
         monitorJob(jobId, entityId);
         boolean found = jobId2EntityId.get(jobId) != null;
-        LOGGER.log(Level.FINE, "Job was found to be: " + (found ? "Active" : 
"Inactive"));
+        LOGGER.log(level, "Job was found to be: " + (found ? "Active" : 
"Inactive"));
         add(new ActiveEvent(jobId, Kind.JOB_CREATED, entityId, 
jobSpecification));
     }
 
     private synchronized void monitorJob(JobId jobId, EntityId entityId) {
-        if (DEBUG) {
-            LOGGER.log(Level.WARNING, "monitorJob(JobId jobId, ActiveJob 
activeJob) called with job id: " + jobId);
-            boolean found = jobId2EntityId.get(jobId) != null;
-            LOGGER.log(Level.WARNING, "Job was found to be: " + (found ? 
"Active" : "Inactive"));
-        }
+        LOGGER.log(level, "monitorJob(JobId jobId, ActiveJob activeJob) called 
with job id: " + jobId);
+        boolean found = jobId2EntityId.get(jobId) != null;
+        LOGGER.log(level, "Job was found to be: " + (found ? "Active" : 
"Inactive"));
         if (entityEventListeners.containsKey(entityId)) {
             if (jobId2EntityId.containsKey(jobId)) {
                 LOGGER.severe("Job is already being monitored for job: " + 
jobId);
                 return;
             }
-            if (DEBUG) {
-                LOGGER.log(Level.WARNING, "monitoring started for job id: " + 
jobId);
-            }
+            LOGGER.log(level, "monitoring started for job id: " + jobId);
         } else {
             LOGGER.info("No listener was found for the entity: " + entityId);
         }
@@ -140,9 +136,7 @@
         if (entityId != null) {
             add(new ActiveEvent(jobId, Kind.JOB_FINISHED, entityId, 
Pair.of(jobStatus, exceptions)));
         } else {
-            if (LOGGER.isLoggable(Level.INFO)) {
-                LOGGER.info("NO NEED TO NOTIFY JOB FINISH!");
-            }
+            LOGGER.log(level, "NO NEED TO NOTIFY JOB FINISH!");
         }
     }
 
@@ -156,20 +150,16 @@
 
     @Override
     public IActiveEntityEventsListener getListener(EntityId entityId) {
-        if (DEBUG) {
-            LOGGER.log(Level.WARNING, "getActiveEntityListener(EntityId 
entityId) was called with entity " + entityId);
-            IActiveEntityEventsListener listener = 
entityEventListeners.get(entityId);
-            LOGGER.log(Level.WARNING, "Listener found: " + listener);
-        }
+        LOGGER.log(level, "getActiveEntityListener(EntityId entityId) was 
called with entity " + entityId);
+        IActiveEntityEventsListener listener = 
entityEventListeners.get(entityId);
+        LOGGER.log(level, "Listener found: " + listener);
         return entityEventListeners.get(entityId);
     }
 
     @Override
     public synchronized IActiveEntityEventsListener[] getEventListeners() {
-        if (DEBUG) {
-            LOGGER.log(Level.WARNING, "getEventListeners() was called");
-            LOGGER.log(Level.WARNING, "returning " + 
entityEventListeners.size() + " Listeners");
-        }
+        LOGGER.log(level, "getEventListeners() was called");
+        LOGGER.log(level, "returning " + entityEventListeners.size() + " 
Listeners");
         return entityEventListeners.values().toArray(new 
IActiveEntityEventsListener[entityEventListeners.size()]);
     }
 
@@ -178,11 +168,8 @@
         if (suspended) {
             throw new 
RuntimeDataException(ErrorCode.ACTIVE_NOTIFICATION_HANDLER_IS_SUSPENDED);
         }
-        if (DEBUG) {
-            LOGGER.log(Level.WARNING,
-                    "registerListener(IActiveEntityEventsListener listener) 
was called for the entity "
-                            + listener.getEntityId());
-        }
+        LOGGER.log(level, "registerListener(IActiveEntityEventsListener 
listener) was called for the entity "
+                + listener.getEntityId());
         if (entityEventListeners.containsKey(listener.getEntityId())) {
             throw new 
RuntimeDataException(ErrorCode.ACTIVE_ENTITY_IS_ALREADY_REGISTERED, 
listener.getEntityId());
         }
@@ -194,7 +181,7 @@
         if (suspended) {
             throw new 
RuntimeDataException(ErrorCode.ACTIVE_NOTIFICATION_HANDLER_IS_SUSPENDED);
         }
-        LOGGER.log(Level.FINE, "unregisterListener(IActiveEntityEventsListener 
listener) was called for the entity "
+        LOGGER.log(level, "unregisterListener(IActiveEntityEventsListener 
listener) was called for the entity "
                 + listener.getEntityId());
         IActiveEntityEventsListener registeredListener = 
entityEventListeners.remove(listener.getEntityId());
         if (registeredListener == null) {
@@ -221,16 +208,16 @@
 
     @Override
     public synchronized void recover() throws HyracksDataException {
-        LOGGER.log(Level.FINE, "Starting active recovery");
+        LOGGER.log(level, "Starting active recovery");
         for (IActiveEntityEventsListener listener : 
entityEventListeners.values()) {
             synchronized (listener) {
-                LOGGER.log(Level.FINE, "Entity " + listener.getEntityId() + " 
is " + listener.getStats());
+                LOGGER.log(level, "Entity " + listener.getEntityId() + " is " 
+ listener.getStats());
                 if (listener.getState() == ActivityState.PERMANENTLY_FAILED
                         && listener instanceof IActiveEntityController) {
-                    LOGGER.log(Level.FINE, "Recovering");
+                    LOGGER.log(level, "Recovering");
                     ((IActiveEntityController) listener).recover();
                 } else {
-                    LOGGER.log(Level.FINE, "Only notifying");
+                    LOGGER.log(level, "Only notifying");
                     listener.notifyAll();
                 }
             }
@@ -243,7 +230,7 @@
             if (suspended) {
                 throw new 
RuntimeDataException(ErrorCode.ACTIVE_EVENT_HANDLER_ALREADY_SUSPENDED);
             }
-            LOGGER.log(Level.FINE, "Suspending active events handler");
+            LOGGER.log(level, "Suspending active events handler");
             suspended = true;
         }
         IMetadataLockManager lockManager = 
mdProvider.getApplicationContext().getMetadataLockManager();
@@ -253,27 +240,27 @@
             // exclusive lock all the datasets
             String dataverseName = listener.getEntityId().getDataverse();
             String entityName = listener.getEntityId().getEntityName();
-            LOGGER.log(Level.FINE, "Suspending " + listener.getEntityId());
-            LOGGER.log(Level.FINE, "Acquiring locks");
+            LOGGER.log(level, "Suspending " + listener.getEntityId());
+            LOGGER.log(level, "Acquiring locks");
             lockManager.acquireActiveEntityWriteLock(mdProvider.getLocks(), 
dataverseName + '.' + entityName);
             List<Dataset> datasets = ((ActiveEntityEventsListener) 
listener).getDatasets();
             for (Dataset dataset : datasets) {
                 
lockManager.acquireDatasetExclusiveModificationLock(mdProvider.getLocks(),
                         DatasetUtil.getFullyQualifiedName(dataset));
             }
-            LOGGER.log(Level.FINE, "locks acquired");
+            LOGGER.log(level, "locks acquired");
             ((ActiveEntityEventsListener) listener).suspend(mdProvider);
-            LOGGER.log(Level.FINE, listener.getEntityId() + " suspended");
+            LOGGER.log(level, listener.getEntityId() + " suspended");
         }
     }
 
     public void resume(MetadataProvider mdProvider)
             throws AsterixException, HyracksDataException, 
InterruptedException {
-        LOGGER.log(Level.FINE, "Resuming active events handler");
+        LOGGER.log(level, "Resuming active events handler");
         for (IActiveEntityEventsListener listener : 
entityEventListeners.values()) {
-            LOGGER.log(Level.FINE, "Resuming " + listener.getEntityId());
+            LOGGER.log(level, "Resuming " + listener.getEntityId());
             ((ActiveEntityEventsListener) listener).resume(mdProvider);
-            LOGGER.log(Level.FINE, listener.getEntityId() + " resumed");
+            LOGGER.log(level, listener.getEntityId() + " resumed");
         }
         synchronized (this) {
             suspended = false;
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
index 09c4983..5503940 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
@@ -31,6 +31,7 @@
 import org.apache.asterix.active.ActiveRuntimeId;
 import org.apache.asterix.active.EntityId;
 import org.apache.asterix.active.message.ActiveManagerMessage;
+import org.apache.asterix.active.message.ActiveManagerMessage.Kind;
 import org.apache.asterix.app.translator.DefaultStatementExecutorFactory;
 import org.apache.asterix.common.context.IStorageComponentProvider;
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
@@ -272,8 +273,8 @@
             }
 
             // make connections between operators
-            for (Entry<ConnectorDescriptorId, Pair<Pair<IOperatorDescriptor, 
Integer>,
-                    Pair<IOperatorDescriptor, Integer>>> entry : 
subJob.getConnectorOperatorMap().entrySet()) {
+            for (Entry<ConnectorDescriptorId, Pair<Pair<IOperatorDescriptor, 
Integer>, Pair<IOperatorDescriptor, Integer>>> entry : subJob
+                    .getConnectorOperatorMap().entrySet()) {
                 ConnectorDescriptorId newId = 
connectorIdMapping.get(entry.getKey());
                 IConnectorDescriptor connDesc = 
jobSpec.getConnectorMap().get(newId);
                 Pair<IOperatorDescriptor, Integer> leftOp = 
entry.getValue().getLeft();
@@ -381,7 +382,7 @@
 
     public static void SendStopMessageToNode(ICcApplicationContext appCtx, 
EntityId feedId, String intakeNodeLocation,
             Integer partition) throws Exception {
-        ActiveManagerMessage stopFeedMessage = new 
ActiveManagerMessage(ActiveManagerMessage.STOP_ACTIVITY,
+        ActiveManagerMessage stopFeedMessage = new 
ActiveManagerMessage(Kind.STOP_ACTIVITY,
                 new ActiveRuntimeId(feedId, 
FeedIntakeOperatorNodePushable.class.getSimpleName(), partition));
         SendActiveMessage(appCtx, stopFeedMessage, intakeNodeLocation);
     }
diff --git 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/Action.java
 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/Action.java
index 71cb038..74c4364 100644
--- 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/Action.java
+++ 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/Action.java
@@ -21,7 +21,7 @@
 import org.apache.asterix.metadata.declared.MetadataProvider;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 
-abstract class Action {
+public abstract class Action {
     boolean done = false;
     HyracksDataException failure;
 
@@ -39,21 +39,21 @@
 
     protected abstract void doExecute(MetadataProvider mdProvider) throws 
Exception;
 
-    boolean hasFailed() {
+    public boolean hasFailed() {
         return failure != null;
     }
 
-    HyracksDataException getFailure() {
+    public HyracksDataException getFailure() {
         return failure;
     }
 
-    synchronized void sync() throws InterruptedException {
+    public synchronized void sync() throws InterruptedException {
         while (!done) {
             wait();
         }
     }
 
-    boolean isDone() {
+    public boolean isDone() {
         return done;
     }
 }
\ No newline at end of file
diff --git 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveStatsTest.java
 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveStatsTest.java
index f8baa0e..e1fdb69 100644
--- 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveStatsTest.java
+++ 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveStatsTest.java
@@ -30,6 +30,7 @@
 import org.apache.asterix.active.IActiveRuntime;
 import org.apache.asterix.active.NoRetryPolicyFactory;
 import org.apache.asterix.active.message.ActivePartitionMessage;
+import org.apache.asterix.active.message.ActivePartitionMessage.Event;
 import org.apache.asterix.algebra.base.ILangExtension.Language;
 import org.apache.asterix.app.active.ActiveEntityEventsListener;
 import org.apache.asterix.app.active.ActiveNotificationHandler;
@@ -126,8 +127,8 @@
         requestedStats = eventsListener.getStats();
         Assert.assertTrue(requestedStats.contains("N/A"));
         // Fake partition message and notify eventListener
-        ActivePartitionMessage partitionMessage = new 
ActivePartitionMessage(activeRuntimeId, jobId,
-                ActivePartitionMessage.ACTIVE_RUNTIME_REGISTERED, null);
+        ActivePartitionMessage partitionMessage =
+                new ActivePartitionMessage(activeRuntimeId, jobId, 
Event.RUNTIME_REGISTERED, null);
         partitionMessage.handle(appCtx);
         start.sync();
         if (start.hasFailed()) {
diff --git 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/Actor.java 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/Actor.java
index 3f68651..8d21b55 100644
--- 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/Actor.java
+++ 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/Actor.java
@@ -21,7 +21,7 @@
 import org.apache.asterix.active.SingleThreadEventProcessor;
 import org.apache.asterix.metadata.declared.MetadataProvider;
 
-class Actor extends SingleThreadEventProcessor<Action> {
+public class Actor extends SingleThreadEventProcessor<Action> {
 
     private final MetadataProvider actorMdProvider;
 
diff --git 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/TestNodeControllerActor.java
 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/TestNodeControllerActor.java
index e7e21b6..99499a3 100644
--- 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/TestNodeControllerActor.java
+++ 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/TestNodeControllerActor.java
@@ -23,6 +23,7 @@
 import org.apache.asterix.active.ActiveRuntimeId;
 import org.apache.asterix.active.EntityId;
 import org.apache.asterix.active.message.ActivePartitionMessage;
+import org.apache.asterix.active.message.ActivePartitionMessage.Event;
 import org.apache.asterix.metadata.declared.MetadataProvider;
 import org.apache.hyracks.api.job.JobId;
 
@@ -41,9 +42,8 @@
         Action registration = new Action() {
             @Override
             protected void doExecute(MetadataProvider actorMdProvider) throws 
Exception {
-                ActiveEvent event = new ActiveEvent(jobId, 
Kind.PARTITION_EVENT, entityId,
-                        new ActivePartitionMessage(new 
ActiveRuntimeId(entityId, id, partition), jobId,
-                                
ActivePartitionMessage.ACTIVE_RUNTIME_REGISTERED, null));
+                ActiveEvent event = new ActiveEvent(jobId, 
Kind.PARTITION_EVENT, entityId, new ActivePartitionMessage(
+                        new ActiveRuntimeId(entityId, id, partition), jobId, 
Event.RUNTIME_REGISTERED, null));
                 clusterController.activeEvent(event);
             }
         };
@@ -55,9 +55,8 @@
         Action registration = new Action() {
             @Override
             protected void doExecute(MetadataProvider actorMdProvider) throws 
Exception {
-                ActiveEvent event = new ActiveEvent(jobId, 
Kind.PARTITION_EVENT, entityId,
-                        new ActivePartitionMessage(new 
ActiveRuntimeId(entityId, id, partition), jobId,
-                                
ActivePartitionMessage.ACTIVE_RUNTIME_DEREGISTERED, null));
+                ActiveEvent event = new ActiveEvent(jobId, 
Kind.PARTITION_EVENT, entityId, new ActivePartitionMessage(
+                        new ActiveRuntimeId(entityId, id, partition), jobId, 
Event.RUNTIME_DEREGISTERED, null));
                 clusterController.activeEvent(event);
             }
         };
diff --git 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
index 10b528f..b383317 100644
--- 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
+++ 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
@@ -534,7 +534,7 @@
         return executeQueryService(str, fmt, uri, params, jsonEncoded, 
responseCodeValidator, false);
     }
 
-    protected InputStream executeQueryService(String str, OutputFormat fmt, 
URI uri,
+    public InputStream executeQueryService(String str, OutputFormat fmt, URI 
uri,
             List<CompilationUnit.Parameter> params, boolean jsonEncoded, 
Predicate<Integer> responseCodeValidator,
             boolean cancellable) throws Exception {
         final List<CompilationUnit.Parameter> newParams = upsertParam(params, 
"format", fmt.mimeType());
@@ -1326,7 +1326,7 @@
                         if (failedGroup != null) {
                             
failedGroup.getTestCase().add(testCaseCtx.getTestCase());
                         }
-                        throw new Exception("Test \"" + testFile + "\" 
FAILED!");
+                        throw new Exception("Test \"" + testFile + "\" 
FAILED!", e);
                     }
                 } finally {
                     if (numOfFiles == testFileCtxs.size()) {
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
index 5b9b96f..d709845 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
@@ -193,6 +193,7 @@
                 try {
                     waitForSignal();
                 } catch (InterruptedException e) {
+                    Thread.currentThread().interrupt();
                     throw HyracksDataException.create(e);
                 }
             }
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/AdapterRuntimeManager.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/AdapterRuntimeManager.java
index 1b5eeac..f423404 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/AdapterRuntimeManager.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/AdapterRuntimeManager.java
@@ -74,29 +74,31 @@
 
     public void stop() throws HyracksDataException, InterruptedException {
         synchronized (adapterExecutor) {
-            try {
-                if (started) {
-                    try {
-                        ctx.getExecutorService().submit(() -> {
-                            if (feedAdapter.stop()) {
-                                execution.get();
-                            }
-                            return null;
-                        }).get(30, TimeUnit.SECONDS);
-                    } catch (InterruptedException e) {
-                        LOGGER.log(Level.WARNING, "Interrupted while trying to 
stop an adapter runtime", e);
-                        throw e;
-                    } catch (Exception e) {
-                        LOGGER.log(Level.WARNING, "Exception while trying to 
stop an adapter runtime", e);
-                        throw HyracksDataException.create(e);
-                    } finally {
-                        execution.cancel(true);
+            if (!done) {
+                try {
+                    if (started) {
+                        try {
+                            ctx.getExecutorService().submit(() -> {
+                                if (feedAdapter.stop()) {
+                                    execution.get();
+                                }
+                                return null;
+                            }).get(30, TimeUnit.SECONDS);
+                        } catch (InterruptedException e) {
+                            LOGGER.log(Level.WARNING, "Interrupted while 
trying to stop an adapter runtime", e);
+                            throw e;
+                        } catch (Exception e) {
+                            LOGGER.log(Level.WARNING, "Exception while trying 
to stop an adapter runtime", e);
+                            throw HyracksDataException.create(e);
+                        } finally {
+                            execution.cancel(true);
+                        }
+                    } else {
+                        LOGGER.log(Level.WARNING, "Adapter executor was 
stopped before it starts");
                     }
-                } else {
-                    LOGGER.log(Level.WARNING, "Adapter executor was stopped 
before it starts");
+                } finally {
+                    done = true;
                 }
-            } finally {
-                done = true;
             }
         }
     }
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
index 4717a7b..3b23d67 100644
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
@@ -389,10 +389,11 @@
         if (LOGGER.isLoggable(Level.INFO)) {
             LOGGER.info("Registering intention to remove node id " + nodeId);
         }
-        if (!activeNcConfiguration.containsKey(nodeId)) {
+        if (activeNcConfiguration.containsKey(nodeId)) {
+            pendingRemoval.add(nodeId);
+        } else {
             LOGGER.warning("Cannot register unknown node " + nodeId + " for 
pending removal");
         }
-        pendingRemoval.add(nodeId);
     }
 
     public synchronized boolean cancelRemovePending(String nodeId) {
diff --git 
a/hyracks-fullstack/algebricks/algebricks-common/src/main/java/org/apache/hyracks/algebricks/common/constraints/AlgebricksAbsolutePartitionConstraint.java
 
b/hyracks-fullstack/algebricks/algebricks-common/src/main/java/org/apache/hyracks/algebricks/common/constraints/AlgebricksAbsolutePartitionConstraint.java
index 8394057..99d9f3d 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-common/src/main/java/org/apache/hyracks/algebricks/common/constraints/AlgebricksAbsolutePartitionConstraint.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-common/src/main/java/org/apache/hyracks/algebricks/common/constraints/AlgebricksAbsolutePartitionConstraint.java
@@ -18,6 +18,8 @@
  */
 package org.apache.hyracks.algebricks.common.constraints;
 
+import java.util.Arrays;
+
 public class AlgebricksAbsolutePartitionConstraint extends 
AlgebricksPartitionConstraint {
     private final String[] locations;
 
@@ -33,4 +35,10 @@
     public String[] getLocations() {
         return locations;
     }
+
+    @Override
+    public String toString() {
+        return Arrays.deepToString(locations);
+    }
+
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-api/src/test/java/org/apache/hyracks/api/test/FrameWriterTestUtils.java
 
b/hyracks-fullstack/hyracks/hyracks-api/src/test/java/org/apache/hyracks/api/test/FrameWriterTestUtils.java
index c9cc71e..2c1ce37 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-api/src/test/java/org/apache/hyracks/api/test/FrameWriterTestUtils.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-api/src/test/java/org/apache/hyracks/api/test/FrameWriterTestUtils.java
@@ -19,6 +19,7 @@
 package org.apache.hyracks.api.test;
 
 import java.util.Collection;
+import java.util.Collections;
 
 public class FrameWriterTestUtils {
     public static final String EXCEPTION_MESSAGE = "IFrameWriter Exception in 
the call to the method ";
@@ -32,6 +33,10 @@
         Close
     }
 
+    public static TestFrameWriter create() {
+        return create(Collections.emptyList(), Collections.emptyList(), false);
+    }
+
     public static TestFrameWriter create(Collection<FrameWriterOperation> 
exceptionThrowingOperations,
             Collection<FrameWriterOperation> errorThrowingOperations, boolean 
deepCopyInputFrames) {
         CountAnswer openAnswer =
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/DatasetDirectoryService.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/DatasetDirectoryService.java
index 2685f60..7a9306c 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/DatasetDirectoryService.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/DatasetDirectoryService.java
@@ -151,12 +151,18 @@
 
     @Override
     public synchronized void reportJobFailure(JobId jobId, List<Exception> 
exceptions) {
+        LOGGER.log(Level.INFO, "job " + jobId + " failed and is being reported 
to " + getClass().getSimpleName(),
+                exceptions.get(0));
         DatasetJobRecord djr = getDatasetJobRecord(jobId);
+        LOGGER.log(Level.INFO, "Dataset job record is " + djr);
         if (djr != null) {
+            LOGGER.log(Level.INFO, "Setting exceptions in Dataset job record");
             djr.fail(exceptions);
         }
         final JobResultInfo jobResultInfo = jobResultLocations.get(jobId);
+        LOGGER.log(Level.INFO, "Job result info is " + jobResultInfo);
         if (jobResultInfo != null) {
+            LOGGER.log(Level.INFO, "Setting exceptions in Job result info");
             jobResultInfo.setException(exceptions.isEmpty() ? null : 
exceptions.get(0));
         }
         notifyAll();
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java
index f18a917..dbbaf9f 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java
@@ -66,7 +66,6 @@
 import org.apache.hyracks.control.common.job.PartitionState;
 import org.apache.hyracks.control.common.job.TaskAttemptDescriptor;
 
-
 public class JobExecutor {
     private static final Logger LOGGER = 
Logger.getLogger(JobExecutor.class.getName());
 
@@ -190,11 +189,11 @@
 
     private void startRunnableActivityClusters() throws HyracksException {
         Set<TaskCluster> taskClusterRoots = new HashSet<>();
-        findRunnableTaskClusterRoots(taskClusterRoots, 
jobRun.getActivityClusterGraph().getActivityClusterMap()
-                .values());
-        if (LOGGER.isLoggable(Level.FINE)) {
-            LOGGER.fine("Runnable TC roots: " + taskClusterRoots + ", 
inProgressTaskClusters: "
-                    + inProgressTaskClusters);
+        findRunnableTaskClusterRoots(taskClusterRoots,
+                
jobRun.getActivityClusterGraph().getActivityClusterMap().values());
+        if (LOGGER.isLoggable(Level.INFO)) {
+            LOGGER.log(Level.INFO,
+                    "Runnable TC roots: " + taskClusterRoots + ", 
inProgressTaskClusters: " + inProgressTaskClusters);
         }
         if (taskClusterRoots.isEmpty() && inProgressTaskClusters.isEmpty()) {
             ccs.getWorkQueue()
@@ -344,8 +343,8 @@
         for (int i = 0; i < tasks.length; ++i) {
             Task ts = tasks[i];
             TaskId tid = ts.getTaskId();
-            TaskAttempt taskAttempt = new TaskAttempt(tcAttempt, new 
TaskAttemptId(new TaskId(tid.getActivityId(),
-                    tid.getPartition()), attempts), ts);
+            TaskAttempt taskAttempt = new TaskAttempt(tcAttempt,
+                    new TaskAttemptId(new TaskId(tid.getActivityId(), 
tid.getPartition()), attempts), ts);
             taskAttempt.setStatus(TaskAttempt.TaskStatus.INITIALIZED, null);
             locationMap.put(tid,
                     new 
PartitionLocationExpression(tid.getActivityId().getOperatorDescriptorId(), 
tid.getPartition()));
@@ -496,8 +495,8 @@
         final DeploymentId deploymentId = jobRun.getDeploymentId();
         final JobId jobId = jobRun.getJobId();
         final ActivityClusterGraph acg = jobRun.getActivityClusterGraph();
-        final Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies = 
new HashMap<>(
-                jobRun.getConnectorPolicyMap());
+        final Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies =
+                new HashMap<>(jobRun.getConnectorPolicyMap());
         INodeManager nodeManager = ccs.getNodeManager();
         try {
             byte[] acgBytes = predistributed ? null : 
JavaSerializationUtils.serialize(acg);
@@ -555,14 +554,14 @@
             }
         }
         final JobId jobId = jobRun.getJobId();
-        LOGGER.fine("Abort map for job: " + jobId + ": " + 
abortTaskAttemptMap);
+        LOGGER.info("Abort map for job: " + jobId + ": " + 
abortTaskAttemptMap);
         INodeManager nodeManager = ccs.getNodeManager();
         for (Map.Entry<String, List<TaskAttemptId>> entry : 
abortTaskAttemptMap.entrySet()) {
             final NodeControllerState node = 
nodeManager.getNodeControllerState(entry.getKey());
             final List<TaskAttemptId> abortTaskAttempts = entry.getValue();
             if (node != null) {
-                if (LOGGER.isLoggable(Level.FINE)) {
-                    LOGGER.fine("Aborting: " + abortTaskAttempts + " at " + 
entry.getKey());
+                if (LOGGER.isLoggable(Level.INFO)) {
+                    LOGGER.info("Aborting: " + abortTaskAttempts + " at " + 
entry.getKey());
                 }
                 try {
                     node.getNodeController().abortTasks(jobId, 
abortTaskAttempts);
@@ -582,6 +581,7 @@
     }
 
     private void abortDoomedTaskClusters() throws HyracksException {
+        LOGGER.log(Level.INFO, "aborting doomed task clusters");
         Set<TaskCluster> doomedTaskClusters = new HashSet<>();
         for (TaskCluster tc : inProgressTaskClusters) {
             // Start search at TCs that produce no outputs (sinks)
@@ -590,6 +590,7 @@
             }
         }
 
+        LOGGER.log(Level.INFO, "number of doomed task clusters found = " + 
doomedTaskClusters.size());
         for (TaskCluster tc : doomedTaskClusters) {
             TaskClusterAttempt tca = findLastTaskClusterAttempt(tc);
             if (tca != null) {
@@ -628,7 +629,7 @@
             if ((maxState == null
                     || (cPolicy.consumerWaitsForProducerToFinish() && maxState 
!= PartitionState.COMMITTED))
                     && 
findDoomedTaskClusters(partitionProducingTaskClusterMap.get(pid), 
doomedTaskClusters)) {
-                    doomed = true;
+                doomed = true;
             }
         }
         if (doomed) {
@@ -663,28 +664,36 @@
 
     /**
      * Indicates that a single task attempt has encountered a failure.
-     * @param ta Failed Task Attempt
-     * @param exceptions exeptions thrown during the failure
+     *
+     * @param ta
+     *            Failed Task Attempt
+     * @param exceptions
+     *            exeptions thrown during the failure
      */
     public void notifyTaskFailure(TaskAttempt ta, List<Exception> exceptions) {
         try {
-            LOGGER.fine("Received failure notification for TaskAttempt " + 
ta.getTaskAttemptId());
+            LOGGER.log(Level.INFO, "Received failure notification for 
TaskAttempt " + ta.getTaskAttemptId());
             TaskAttemptId taId = ta.getTaskAttemptId();
             TaskCluster tc = ta.getTask().getTaskCluster();
             TaskClusterAttempt lastAttempt = findLastTaskClusterAttempt(tc);
             if (lastAttempt != null && taId.getAttempt() == 
lastAttempt.getAttempt()) {
-                LOGGER.fine("Marking TaskAttempt " + ta.getTaskAttemptId() + " 
as failed");
+                LOGGER.log(Level.INFO, "Marking TaskAttempt " + 
ta.getTaskAttemptId() + " as failed");
                 ta.setStatus(TaskAttempt.TaskStatus.FAILED, exceptions);
                 abortTaskCluster(lastAttempt, 
TaskClusterAttempt.TaskClusterStatus.FAILED);
                 abortDoomedTaskClusters();
-                if (lastAttempt.getAttempt() >= 
jobRun.getActivityClusterGraph().getMaxReattempts() || isCancelled()) {
+                int maxReattempts = 
jobRun.getActivityClusterGraph().getMaxReattempts();
+                LOGGER.log(Level.INFO, "Marking TaskAttempt " + 
ta.getTaskAttemptId()
+                        + " as failed and the number of max re-attempts = " + 
maxReattempts);
+                if (lastAttempt.getAttempt() >= maxReattempts || 
isCancelled()) {
+                    LOGGER.log(Level.INFO, "Aborting the job of " + 
ta.getTaskAttemptId());
                     abortJob(exceptions);
                     return;
                 }
+                LOGGER.log(Level.INFO, "We will try to start runnable activity 
clusters of " + ta.getTaskAttemptId());
                 startRunnableActivityClusters();
             } else {
-                LOGGER.warning("Ignoring task failure notification: " + taId + 
" -- Current last attempt = "
-                        + lastAttempt);
+                LOGGER.warning(
+                        "Ignoring task failure notification: " + taId + " -- 
Current last attempt = " + lastAttempt);
             }
         } catch (Exception e) {
             abortJob(Collections.singletonList(e));
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/TaskFailureWork.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/TaskFailureWork.java
index 486e9c6..8f50087 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/TaskFailureWork.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/TaskFailureWork.java
@@ -19,6 +19,8 @@
 package org.apache.hyracks.control.cc.work;
 
 import java.util.List;
+import java.util.logging.Level;
+import java.util.logging.Logger;
 
 import org.apache.hyracks.api.dataflow.TaskAttemptId;
 import org.apache.hyracks.api.job.JobId;
@@ -28,6 +30,7 @@
 import org.apache.hyracks.control.cc.job.TaskAttempt;
 
 public class TaskFailureWork extends AbstractTaskLifecycleWork {
+    private static final Logger LOGGER = 
Logger.getLogger(TaskFailureWork.class.getName());
     private final List<Exception> exceptions;
 
     public TaskFailureWork(ClusterControllerService ccs, JobId jobId, 
TaskAttemptId taId, String nodeId,
@@ -38,6 +41,7 @@
 
     @Override
     protected void performEvent(TaskAttempt ta) {
+        LOGGER.log(Level.WARNING, "Executing task failure work for " + this, 
exceptions.get(0));
         IJobManager jobManager = ccs.getJobManager();
         JobRun run = jobManager.get(jobId);
         ccs.getDatasetDirectoryService().reportJobFailure(jobId, exceptions);
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/ApplicationMessageWork.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/ApplicationMessageWork.java
index 7f5302a..4f5b556 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/ApplicationMessageWork.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/ApplicationMessageWork.java
@@ -28,9 +28,6 @@
 import org.apache.hyracks.control.nc.NodeControllerService;
 import org.apache.hyracks.control.nc.application.NCServiceContext;
 
-/**
- * @author rico
- */
 public class ApplicationMessageWork extends AbstractWork {
     private static final Logger LOGGER = 
Logger.getLogger(ApplicationMessageWork.class.getName());
     private byte[] message;
@@ -63,6 +60,6 @@
 
     @Override
     public String toString() {
-        return getName() + ": nodeID: " + nodeId;
+        return getName() + ": nodeId: " + nodeId;
     }
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/NotifyTaskCompleteWork.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/NotifyTaskCompleteWork.java
index f4ee6b0..7728d16 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/NotifyTaskCompleteWork.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/NotifyTaskCompleteWork.java
@@ -18,12 +18,16 @@
  */
 package org.apache.hyracks.control.nc.work;
 
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
 import org.apache.hyracks.control.common.job.profiling.om.TaskProfile;
 import org.apache.hyracks.control.common.work.AbstractWork;
 import org.apache.hyracks.control.nc.NodeControllerService;
 import org.apache.hyracks.control.nc.Task;
 
 public class NotifyTaskCompleteWork extends AbstractWork {
+    private static final Logger LOGGER = 
Logger.getLogger(NotifyTaskCompleteWork.class.getName());
     private final NodeControllerService ncs;
     private final Task task;
 
@@ -40,8 +44,13 @@
             
ncs.getClusterController().notifyTaskComplete(task.getJobletContext().getJobId(),
 task.getTaskAttemptId(),
                     ncs.getId(), taskProfile);
         } catch (Exception e) {
-            e.printStackTrace();
+            LOGGER.log(Level.SEVERE, "Failed notifying task complete for " + 
task.getTaskAttemptId(), e);
         }
         task.getJoblet().removeTask(task);
     }
+
+    @Override
+    public String toString() {
+        return getClass().getSimpleName() + ":" + task.getTaskAttemptId();
+    }
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/NotifyTaskFailureWork.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/NotifyTaskFailureWork.java
index fa8ba28..7ed2c09 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/NotifyTaskFailureWork.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/NotifyTaskFailureWork.java
@@ -35,7 +35,6 @@
     private final Task task;
     private final JobId jobId;
     private final TaskAttemptId taskId;
-
     private final List<Exception> exceptions;
 
     public NotifyTaskFailureWork(NodeControllerService ncs, Task task, 
List<Exception> exceptions, JobId jobId,
@@ -49,6 +48,8 @@
 
     @Override
     public void run() {
+        LOGGER.log(Level.WARNING, ncs.getId() + " is sending a notification to 
cc that task " + taskId + " has failed",
+                exceptions.get(0));
         try {
             IDatasetPartitionManager dpm = ncs.getDatasetPartitionManager();
             if (dpm != null) {
diff --git 
a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCSystem.java
 
b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCSystem.java
index d9ab210..dea48bd 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCSystem.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCSystem.java
@@ -74,6 +74,7 @@
         } catch (IOException e) {
             throw new IPCException(e);
         } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
             throw new IPCException(e);
         }
     }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/freepage/MutableArrayValueReference.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/freepage/MutableArrayValueReference.java
index a19e69a..157450a 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/freepage/MutableArrayValueReference.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/freepage/MutableArrayValueReference.java
@@ -18,6 +18,8 @@
  */
 package org.apache.hyracks.storage.am.common.freepage;
 
+import java.nio.charset.StandardCharsets;
+
 import org.apache.hyracks.data.std.api.IValueReference;
 
 public class MutableArrayValueReference implements IValueReference {
@@ -46,4 +48,9 @@
         return array == null ? 0 : array.length;
     }
 
+    @Override
+    public String toString() {
+        return new String(array, StandardCharsets.UTF_8);
+    }
+
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeDiskComponent.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeDiskComponent.java
index eb8ec92..33bb60e 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeDiskComponent.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeDiskComponent.java
@@ -63,4 +63,9 @@
     public int getFileReferenceCount() {
         return btree.getBufferCache().getFileReferenceCount(btree.getFileId());
     }
+
+    @Override
+    public String toString() {
+        return getClass().getSimpleName() + ":" + 
btree.getFileReference().getRelativePath();
+    }
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeWithBuddyDiskComponent.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeWithBuddyDiskComponent.java
index 57b9092..0ba7c30 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeWithBuddyDiskComponent.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeWithBuddyDiskComponent.java
@@ -74,4 +74,8 @@
         return btree.getBufferCache().getFileReferenceCount(btree.getFileId());
     }
 
+    @Override
+    public String toString() {
+        return getClass().getSimpleName() + ":" + 
btree.getFileReference().getRelativePath();
+    }
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/utils/ComponentMetadataUtil.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/utils/ComponentMetadataUtil.java
index 6ccbc8d..40017d1 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/utils/ComponentMetadataUtil.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/utils/ComponentMetadataUtil.java
@@ -19,6 +19,8 @@
 package org.apache.hyracks.storage.am.lsm.common.utils;
 
 import java.util.List;
+import java.util.logging.Level;
+import java.util.logging.Logger;
 
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.data.std.api.IPointable;
@@ -32,8 +34,8 @@
 
 public class ComponentMetadataUtil {
 
-    public static final MutableArrayValueReference MARKER_LSN_KEY =
-            new MutableArrayValueReference("Marker".getBytes());
+    private static final Logger LOGGER = 
Logger.getLogger(ComponentMetadataUtil.class.getName());
+    public static final MutableArrayValueReference MARKER_LSN_KEY = new 
MutableArrayValueReference("Marker".getBytes());
     public static final long NOT_FOUND = -1L;
 
     private ComponentMetadataUtil() {
@@ -71,16 +73,28 @@
      * @throws HyracksDataException
      */
     public static void get(ILSMIndex index, IValueReference key, IPointable 
pointable) throws HyracksDataException {
+        LOGGER.log(Level.INFO, "Getting " + key + " from index " + index);
         // Lock the opTracker to ensure index components don't change
         synchronized (index.getOperationTracker()) {
             index.getCurrentMemoryComponent().getMetadata().get(key, 
pointable);
             if (pointable.getLength() == 0) {
+                LOGGER.log(Level.INFO, key + " was not found in mutable memory 
component of " + index);
                 // was not found in the in current mutable component, search 
in the other in memory components
                 fromImmutableMemoryComponents(index, key, pointable);
                 if (pointable.getLength() == 0) {
+                    LOGGER.log(Level.INFO, key + " was not found in all 
immmutable memory components of " + index);
                     // was not found in the in all in memory components, 
search in the disk components
                     fromDiskComponents(index, key, pointable);
+                    if (pointable.getLength() == 0) {
+                        LOGGER.log(Level.INFO, key + " was not found in all 
disk components of " + index);
+                    } else {
+                        LOGGER.log(Level.INFO, key + " was found in disk 
components of " + index);
+                    }
+                } else {
+                    LOGGER.log(Level.INFO, key + " was found in the immutable 
memory components of " + index);
                 }
+            } else {
+                LOGGER.log(Level.INFO, key + " was found in mutable memory 
component of " + index);
             }
         }
     }
@@ -105,7 +119,9 @@
 
     private static void fromDiskComponents(ILSMIndex index, IValueReference 
key, IPointable pointable)
             throws HyracksDataException {
+        LOGGER.log(Level.INFO, "Getting " + key + " from disk components of " 
+ index);
         for (ILSMDiskComponent c : index.getImmutableComponents()) {
+            LOGGER.log(Level.INFO, "Getting " + key + " from disk components " 
+ c);
             c.getMetadata().get(key, pointable);
             if (pointable.getLength() != 0) {
                 // Found
@@ -115,10 +131,13 @@
     }
 
     private static void fromImmutableMemoryComponents(ILSMIndex index, 
IValueReference key, IPointable pointable) {
+        LOGGER.log(Level.INFO, "Getting " + key + " from immutable memory 
components of " + index);
         List<ILSMMemoryComponent> memComponents = index.getMemoryComponents();
         int numOtherMemComponents = memComponents.size() - 1;
         int next = index.getCurrentMemoryComponentIndex();
+        LOGGER.log(Level.INFO, index + " has " + numOtherMemComponents + " 
immutable memory components");
         for (int i = 0; i < numOtherMemComponents; i++) {
+            LOGGER.log(Level.INFO, "trying to get " + key + " from immutable 
memory components number: " + (i + 1));
             next = next - 1;
             if (next < 0) {
                 next = memComponents.size() - 1;
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexDiskComponent.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexDiskComponent.java
index f2b3284..2470a39 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexDiskComponent.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexDiskComponent.java
@@ -75,4 +75,9 @@
     public int getFileReferenceCount() {
         return 
deletedKeysBTree.getBufferCache().getFileReferenceCount(deletedKeysBTree.getFileId());
     }
+
+    @Override
+    public String toString() {
+        return getClass().getSimpleName() + ":" + ((OnDiskInvertedIndex) 
invIndex).getInvListsFile().getRelativePath();
+    }
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeDiskComponent.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeDiskComponent.java
index 982f89b..54ef122 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeDiskComponent.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeDiskComponent.java
@@ -76,4 +76,9 @@
     public int getFileReferenceCount() {
         return rtree.getBufferCache().getFileReferenceCount(rtree.getFileId());
     }
+
+    @Override
+    public String toString() {
+        return getClass().getSimpleName() + ":" + 
rtree.getFileReference().getRelativePath();
+    }
 }

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/1921
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I7044896559798426c04a3f46861bc5335b25d140
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: abdullah alamoudi <bamou...@gmail.com>

Reply via email to