abdullah alamoudi has uploaded a new change for review. https://asterix-gerrit.ics.uci.edu/1921
Change subject: [ASTERIXDB-2008][CLUS] Only add pending removal if node known ...................................................................... [ASTERIXDB-2008][CLUS] Only add pending removal if node known [ASTERIXDB-2023][ING] Introduce Enums instead of using bytes - user model changes: no - storage format changes: no - interface changes: no details: - Only nodes which are known to cluster manager are added to the list of nodes pending removal. Other nodes are ignored - Enums introduced: - ActiveEvent.Kind - ActivePartitionMessage.Event Change-Id: I7044896559798426c04a3f46861bc5335b25d140 --- M asterixdb/asterix-active/pom.xml M asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java M asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveSourceOperatorNodePushable.java M asterixdb/asterix-active/src/main/java/org/apache/asterix/active/SingleThreadEventProcessor.java M asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveManagerMessage.java M asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActivePartitionMessage.java M asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/StatsRequestMessage.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java M asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/Action.java M asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveStatsTest.java M asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/Actor.java M asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/TestNodeControllerActor.java M asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/AdapterRuntimeManager.java M asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java M hyracks-fullstack/algebricks/algebricks-common/src/main/java/org/apache/hyracks/algebricks/common/constraints/AlgebricksAbsolutePartitionConstraint.java M hyracks-fullstack/hyracks/hyracks-api/src/test/java/org/apache/hyracks/api/test/FrameWriterTestUtils.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/DatasetDirectoryService.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/TaskFailureWork.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/ApplicationMessageWork.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/NotifyTaskCompleteWork.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/NotifyTaskFailureWork.java M hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCSystem.java M hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/freepage/MutableArrayValueReference.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeDiskComponent.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeWithBuddyDiskComponent.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/utils/ComponentMetadataUtil.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexDiskComponent.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeDiskComponent.java 33 files changed, 255 insertions(+), 172 deletions(-) git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb refs/changes/21/1921/1 diff --git a/asterixdb/asterix-active/pom.xml b/asterixdb/asterix-active/pom.xml index 3dd24b6..6568795 100644 --- a/asterixdb/asterix-active/pom.xml +++ b/asterixdb/asterix-active/pom.xml @@ -31,10 +31,6 @@ <version>${project.version}</version> </dependency> <dependency> - <groupId>log4j</groupId> - <artifactId>log4j</artifactId> - </dependency> - <dependency> <groupId>org.apache.hyracks</groupId> <artifactId>hyracks-api</artifactId> </dependency> diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java index fcf2be9..1dbacf5 100644 --- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java +++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java @@ -26,6 +26,8 @@ import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.logging.Level; +import java.util.logging.Logger; import org.apache.asterix.active.message.ActiveManagerMessage; import org.apache.asterix.active.message.ActiveStatsResponse; @@ -38,7 +40,6 @@ import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.util.JavaSerializationUtils; import org.apache.hyracks.control.nc.NodeControllerService; -import org.apache.log4j.Logger; public class ActiveManager { @@ -86,15 +87,16 @@ } public void submit(ActiveManagerMessage message) throws HyracksDataException { + LOGGER.log(Level.INFO, "Message of type " + message.getKind() + " received in " + nodeId); switch (message.getKind()) { - case ActiveManagerMessage.STOP_ACTIVITY: + case STOP_ACTIVITY: stopRuntime(message); break; - case ActiveManagerMessage.REQUEST_STATS: + case REQUEST_STATS: requestStats((StatsRequestMessage) message); break; default: - LOGGER.warn("Unknown message type received: " + message.getKind()); + LOGGER.warning("Unknown message type received: " + message.getKind()); } } @@ -104,7 +106,7 @@ IActiveRuntime runtime = runtimes.get(runtimeId); long reqId = message.getReqId(); if (runtime == null) { - LOGGER.warn("Request stats of a runtime that is not registered " + runtimeId); + LOGGER.warning("Request stats of a runtime that is not registered " + runtimeId); // Send a failure message ((NodeControllerService) serviceCtx.getControllerService()) .sendApplicationMessageToCC( @@ -124,7 +126,7 @@ } public void shutdown() { - LOGGER.warn("Shutting down ActiveManager on node " + nodeId); + LOGGER.warning("Shutting down ActiveManager on node " + nodeId); Map<ActiveRuntimeId, Future<Void>> stopFutures = new HashMap<>(); shutdown = true; runtimes.forEach((runtimeId, runtime) -> stopFutures.put(runtimeId, executor.submit(() -> { @@ -136,29 +138,29 @@ try { entry.getValue().get(SHUTDOWN_TIMEOUT_SECS, TimeUnit.SECONDS); } catch (InterruptedException e) { - LOGGER.warn("Interrupted waiting to stop runtime: " + entry.getKey()); + LOGGER.warning("Interrupted waiting to stop runtime: " + entry.getKey()); Thread.currentThread().interrupt(); } catch (ExecutionException e) { - LOGGER.warn("Exception while stopping runtime: " + entry.getKey(), e); + LOGGER.log(Level.WARNING, "Exception while stopping runtime: " + entry.getKey(), e); } catch (TimeoutException e) { - LOGGER.warn("Timed out waiting to stop runtime: " + entry.getKey(), e); + LOGGER.log(Level.WARNING, "Timed out waiting to stop runtime: " + entry.getKey(), e); } }); - LOGGER.warn("Shutdown ActiveManager on node " + nodeId + " complete"); + LOGGER.warning("Shutdown ActiveManager on node " + nodeId + " complete"); } private void stopRuntime(ActiveManagerMessage message) { ActiveRuntimeId runtimeId = (ActiveRuntimeId) message.getPayload(); IActiveRuntime runtime = runtimes.get(runtimeId); if (runtime == null) { - LOGGER.warn("Request to stop a runtime that is not registered " + runtimeId); + LOGGER.warning("Request to stop a runtime that is not registered " + runtimeId); } else { executor.execute(() -> { try { stopIfRunning(runtimeId, runtime); } catch (Exception e) { // TODO(till) Figure out a better way to handle failure to stop a runtime - LOGGER.warn("Failed to stop runtime: " + runtimeId, e); + LOGGER.log(Level.WARNING, "Failed to stop runtime: " + runtimeId, e); } }); } diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveSourceOperatorNodePushable.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveSourceOperatorNodePushable.java index a7d7796..adef590 100644 --- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveSourceOperatorNodePushable.java +++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveSourceOperatorNodePushable.java @@ -22,6 +22,7 @@ import java.util.logging.Logger; import org.apache.asterix.active.message.ActivePartitionMessage; +import org.apache.asterix.active.message.ActivePartitionMessage.Event; import org.apache.asterix.common.api.INcApplicationContext; import org.apache.hyracks.api.comm.IFrameWriter; import org.apache.hyracks.api.context.IHyracksTaskContext; @@ -89,7 +90,7 @@ try { // notify cc that runtime has been registered ctx.sendApplicationMessageToCC(new ActivePartitionMessage(runtimeId, ctx.getJobletContext().getJobId(), - ActivePartitionMessage.ACTIVE_RUNTIME_REGISTERED, null), null); + Event.RUNTIME_REGISTERED, null), null); start(); } catch (InterruptedException e) { LOGGER.log(Level.INFO, "initialize() interrupted on ActiveSourceOperatorNodePushable", e); @@ -112,7 +113,7 @@ activeManager.deregisterRuntime(runtimeId); try { ctx.sendApplicationMessageToCC(new ActivePartitionMessage(runtimeId, ctx.getJobletContext().getJobId(), - ActivePartitionMessage.ACTIVE_RUNTIME_DEREGISTERED, null), null); + Event.RUNTIME_DEREGISTERED, null), null); } catch (Exception e) { LOGGER.log(Level.INFO, "deinitialize() failed on ActiveSourceOperatorNodePushable", e); throw HyracksDataException.create(e); diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/SingleThreadEventProcessor.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/SingleThreadEventProcessor.java index 0a36216..6ce696a 100644 --- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/SingleThreadEventProcessor.java +++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/SingleThreadEventProcessor.java @@ -27,6 +27,7 @@ import java.util.logging.Logger; import org.apache.asterix.common.exceptions.ErrorCode; +import org.apache.asterix.common.exceptions.RuntimeDataException; import org.apache.hyracks.api.exceptions.HyracksDataException; public abstract class SingleThreadEventProcessor<T> implements Runnable { @@ -71,8 +72,8 @@ public void stop() throws HyracksDataException, InterruptedException { future.cancel(true); executorService.shutdown(); - if (!executorService.awaitTermination(10, TimeUnit.SECONDS)) { - throw HyracksDataException.create(ErrorCode.FAILED_TO_SHUTDOWN_EVENT_PROCESSOR, name); + if (!executorService.awaitTermination(60, TimeUnit.MINUTES)) { + throw new RuntimeDataException(ErrorCode.FAILED_TO_SHUTDOWN_EVENT_PROCESSOR, name); } } } diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveManagerMessage.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveManagerMessage.java index 9772698..bef418b 100644 --- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveManagerMessage.java +++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveManagerMessage.java @@ -26,14 +26,16 @@ import org.apache.hyracks.api.exceptions.HyracksDataException; public class ActiveManagerMessage implements INcAddressedMessage { - public static final byte STOP_ACTIVITY = 0x00; - public static final byte REQUEST_STATS = 0x01; + public enum Kind { + STOP_ACTIVITY, + REQUEST_STATS + } private static final long serialVersionUID = 1L; - private final byte kind; + private final Kind kind; private final Serializable payload; - public ActiveManagerMessage(byte kind, Serializable payload) { + public ActiveManagerMessage(Kind kind, Serializable payload) { this.kind = kind; this.payload = payload; } @@ -42,7 +44,7 @@ return payload; } - public byte getKind() { + public Kind getKind() { return kind; } diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActivePartitionMessage.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActivePartitionMessage.java index a47d5a5..9ace417 100644 --- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActivePartitionMessage.java +++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActivePartitionMessage.java @@ -29,17 +29,19 @@ import org.apache.hyracks.api.job.JobId; public class ActivePartitionMessage implements ICcAddressedMessage { + public enum Event { + RUNTIME_REGISTERED, + RUNTIME_DEREGISTERED, + GENERIC_EVENT + } private static final long serialVersionUID = 1L; - public static final byte ACTIVE_RUNTIME_REGISTERED = 0x00; - public static final byte ACTIVE_RUNTIME_DEREGISTERED = 0x01; - public static final byte GENERIC_EVENT = 0x02; private final ActiveRuntimeId activeRuntimeId; private final JobId jobId; private final Serializable payload; - private final byte event; + private final Event event; - public ActivePartitionMessage(ActiveRuntimeId activeRuntimeId, JobId jobId, byte event, Serializable payload) { + public ActivePartitionMessage(ActiveRuntimeId activeRuntimeId, JobId jobId, Event event, Serializable payload) { this.activeRuntimeId = activeRuntimeId; this.jobId = jobId; this.event = event; @@ -58,7 +60,7 @@ return payload; } - public byte getEvent() { + public Event getEvent() { return event; } diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/StatsRequestMessage.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/StatsRequestMessage.java index 8fa5f19..d43f00e 100644 --- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/StatsRequestMessage.java +++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/StatsRequestMessage.java @@ -24,8 +24,8 @@ private static final long serialVersionUID = 1L; private final long reqId; - public StatsRequestMessage(byte kind, Serializable payload, long reqId) { - super(kind, payload); + public StatsRequestMessage(Serializable payload, long reqId) { + super(Kind.REQUEST_STATS, payload); this.reqId = reqId; } diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java index 995e372..493cc0a 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java @@ -38,8 +38,8 @@ import org.apache.asterix.active.IRetryPolicy; import org.apache.asterix.active.IRetryPolicyFactory; import org.apache.asterix.active.NoRetryPolicyFactory; -import org.apache.asterix.active.message.ActiveManagerMessage; import org.apache.asterix.active.message.ActivePartitionMessage; +import org.apache.asterix.active.message.ActivePartitionMessage.Event; import org.apache.asterix.active.message.StatsRequestMessage; import org.apache.asterix.common.api.IClusterManagementWork.ClusterState; import org.apache.asterix.common.api.IMetadataLockManager; @@ -68,6 +68,7 @@ public abstract class ActiveEntityEventsListener implements IActiveEntityController { private static final Logger LOGGER = Logger.getLogger(ActiveEntityEventsListener.class.getName()); + private static final Level level = Level.WARNING; private static final ActiveEvent STATE_CHANGED = new ActiveEvent(null, Kind.STATE_CHANGED, null, null); private static final EnumSet<ActivityState> TRANSITION_STATES = EnumSet.of(ActivityState.RESUMING, ActivityState.STARTING, ActivityState.STOPPING, ActivityState.RECOVERING); @@ -130,7 +131,7 @@ } protected synchronized void setState(ActivityState newState) { - LOGGER.log(Level.FINE, "State is being set to " + newState + " from " + state); + LOGGER.log(level, "State of " + getEntityId() + "is being set to " + newState + " from " + state); this.prevState = state; this.state = newState; if (newState == ActivityState.SUSPENDED) { @@ -142,7 +143,7 @@ @Override public synchronized void notify(ActiveEvent event) { try { - LOGGER.fine("EventListener is notified."); + LOGGER.log(level, "EventListener is notified."); ActiveEvent.Kind eventKind = event.getEventKind(); switch (eventKind) { case JOB_CREATED: @@ -172,22 +173,24 @@ } protected synchronized void handle(ActivePartitionMessage message) { - if (message.getEvent() == ActivePartitionMessage.ACTIVE_RUNTIME_REGISTERED) { + if (message.getEvent() == Event.RUNTIME_REGISTERED) { numRegistered++; if (numRegistered == locations.getLocations().length) { setState(ActivityState.RUNNING); } - } else if (message.getEvent() == ActivePartitionMessage.ACTIVE_RUNTIME_DEREGISTERED) { + } else if (message.getEvent() == Event.RUNTIME_DEREGISTERED) { numRegistered--; } } @SuppressWarnings("unchecked") protected void finish(ActiveEvent event) throws HyracksDataException { + LOGGER.log(level, "the job " + jobId + " finished"); jobId = null; Pair<JobStatus, List<Exception>> status = (Pair<JobStatus, List<Exception>>) event.getEventObject(); JobStatus jobStatus = status.getLeft(); List<Exception> exceptions = status.getRight(); + LOGGER.log(level, "The job finished with status: " + jobStatus); if (jobStatus.equals(JobStatus.FAILURE)) { jobFailure = exceptions.isEmpty() ? new RuntimeDataException(ErrorCode.UNREPORTED_TASK_FAILURE_EXCEPTION) : exceptions.get(0); @@ -271,10 +274,10 @@ @SuppressWarnings("unchecked") @Override public void refreshStats(long timeout) throws HyracksDataException { - LOGGER.log(Level.FINE, "refreshStats called"); + LOGGER.log(level, "refreshStats called"); synchronized (this) { if (state != ActivityState.RUNNING || isFetchingStats) { - LOGGER.log(Level.FINE, + LOGGER.log(level, "returning immediately since state = " + state + " and fetchingStats = " + isFetchingStats); return; } else { @@ -287,8 +290,7 @@ List<INcAddressedMessage> requests = new ArrayList<>(); List<String> ncs = Arrays.asList(locations.getLocations()); for (int i = 0; i < ncs.size(); i++) { - requests.add(new StatsRequestMessage(ActiveManagerMessage.REQUEST_STATS, - new ActiveRuntimeId(entityId, runtimeName, i), reqId)); + requests.add(new StatsRequestMessage(new ActiveRuntimeId(entityId, runtimeName, i), reqId)); } try { List<String> responses = (List<String>) messageBroker.sendSyncRequestToNCs(reqId, ncs, requests, timeout); @@ -348,32 +350,32 @@ @Override public synchronized void recover() throws HyracksDataException { - LOGGER.log(Level.FINE, "Recover is called on " + entityId); + LOGGER.log(level, "Recover is called on " + entityId); if (recoveryTask != null) { - LOGGER.log(Level.FINE, "But recovery task for " + entityId + " is already there!! throwing an exception"); + LOGGER.log(level, "But recovery task for " + entityId + " is already there!! throwing an exception"); throw new RuntimeDataException(ErrorCode.DOUBLE_RECOVERY_ATTEMPTS); } if (retryPolicyFactory == NoRetryPolicyFactory.INSTANCE) { - LOGGER.log(Level.FINE, "But it has no recovery policy, so it is set to permanent failure"); + LOGGER.log(level, "But it has no recovery policy, so it is set to permanent failure"); setState(ActivityState.PERMANENTLY_FAILED); } else { ExecutorService executor = appCtx.getServiceContext().getControllerService().getExecutor(); IRetryPolicy policy = retryPolicyFactory.create(this); cancelRecovery = false; setState(ActivityState.TEMPORARILY_FAILED); - LOGGER.log(Level.FINE, "Recovery task has been submitted"); + LOGGER.log(level, "Recovery task has been submitted"); recoveryTask = executor.submit(() -> doRecover(policy)); } } protected Void doRecover(IRetryPolicy policy) throws AlgebricksException, HyracksDataException, InterruptedException { - LOGGER.log(Level.FINE, "Actual Recovery task has started"); + LOGGER.log(level, "Actual Recovery task has started"); if (getState() != ActivityState.TEMPORARILY_FAILED) { - LOGGER.log(Level.FINE, "but its state is not temp failure and so we're just returning"); + LOGGER.log(level, "but its state is not temp failure and so we're just returning"); return null; } - LOGGER.log(Level.FINE, "calling the policy"); + LOGGER.log(level, "calling the policy"); while (policy.retry()) { synchronized (this) { if (cancelRecovery) { @@ -402,7 +404,7 @@ doStart(metadataProvider); return null; } catch (Exception e) { - LOGGER.log(Level.WARNING, "Attempt to revive " + entityId + " failed", e); + LOGGER.log(level, "Attempt to revive " + entityId + " failed", e); setState(ActivityState.TEMPORARILY_FAILED); recoverFailure = e; } finally { @@ -515,10 +517,10 @@ WaitForStateSubscriber subscriber; Future<Void> suspendTask; synchronized (this) { - LOGGER.log(Level.FINE, "suspending entity " + entityId); - LOGGER.log(Level.FINE, "Waiting for ongoing activities"); + LOGGER.log(level, "suspending entity " + entityId); + LOGGER.log(level, "Waiting for ongoing activities"); waitForNonTransitionState(); - LOGGER.log(Level.FINE, "Proceeding with suspension. Current state is " + state); + LOGGER.log(level, "Proceeding with suspension. Current state is " + state); if (state == ActivityState.STOPPED || state == ActivityState.PERMANENTLY_FAILED) { suspended = true; return; @@ -536,12 +538,12 @@ EnumSet.of(ActivityState.SUSPENDED, ActivityState.TEMPORARILY_FAILED)); suspendTask = metadataProvider.getApplicationContext().getServiceContext().getControllerService() .getExecutor().submit(() -> doSuspend(metadataProvider)); - LOGGER.log(Level.FINE, "Suspension task has been submitted"); + LOGGER.log(level, "Suspension task has been submitted"); } try { - LOGGER.log(Level.FINE, "Waiting for suspension task to complete"); + LOGGER.log(level, "Waiting for suspension task to complete"); suspendTask.get(); - LOGGER.log(Level.FINE, "waiting for state to become SUSPENDED or TEMPORARILY_FAILED"); + LOGGER.log(level, "waiting for state to become SUSPENDED or TEMPORARILY_FAILED"); subscriber.sync(); } catch (Exception e) { synchronized (this) { diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java index c5e5dbb..5b576ac 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java @@ -53,7 +53,7 @@ implements IActiveNotificationHandler, IJobLifecycleListener { private static final Logger LOGGER = Logger.getLogger(ActiveNotificationHandler.class.getName()); - private static final boolean DEBUG = false; + private static final Level level = Level.WARNING; public static final String ACTIVE_ENTITY_PROPERTY_NAME = "ActiveJob"; private final Map<EntityId, IActiveEntityEventsListener> entityEventListeners; private final Map<JobId, EntityId> jobId2EntityId; @@ -73,13 +73,13 @@ EntityId entityId = jobId2EntityId.get(event.getJobId()); if (entityId != null) { IActiveEntityEventsListener listener = entityEventListeners.get(entityId); - LOGGER.log(Level.FINE, "Next event is of type " + event.getEventKind()); + LOGGER.log(level, "Next event is of type " + event.getEventKind()); if (event.getEventKind() == Kind.JOB_FINISHED) { - LOGGER.log(Level.FINE, "Removing the job"); + LOGGER.log(level, "Removing the job"); jobId2EntityId.remove(event.getJobId()); } if (listener != null) { - LOGGER.log(Level.FINE, "Notifying the listener"); + LOGGER.log(level, "Notifying the listener"); listener.notify(event); } } else { @@ -91,34 +91,30 @@ @Override public void notifyJobCreation(JobId jobId, JobSpecification jobSpecification) throws HyracksDataException { - LOGGER.log(Level.FINE, + LOGGER.log(level, "notifyJobCreation(JobId jobId, JobSpecification jobSpecification) was called with jobId = " + jobId); Object property = jobSpecification.getProperty(ACTIVE_ENTITY_PROPERTY_NAME); if (property == null || !(property instanceof EntityId)) { - LOGGER.log(Level.FINE, "Job is not of type active job. property found to be: " + property); + LOGGER.log(level, "Job is not of type active job. property found to be: " + property); return; } EntityId entityId = (EntityId) property; monitorJob(jobId, entityId); boolean found = jobId2EntityId.get(jobId) != null; - LOGGER.log(Level.FINE, "Job was found to be: " + (found ? "Active" : "Inactive")); + LOGGER.log(level, "Job was found to be: " + (found ? "Active" : "Inactive")); add(new ActiveEvent(jobId, Kind.JOB_CREATED, entityId, jobSpecification)); } private synchronized void monitorJob(JobId jobId, EntityId entityId) { - if (DEBUG) { - LOGGER.log(Level.WARNING, "monitorJob(JobId jobId, ActiveJob activeJob) called with job id: " + jobId); - boolean found = jobId2EntityId.get(jobId) != null; - LOGGER.log(Level.WARNING, "Job was found to be: " + (found ? "Active" : "Inactive")); - } + LOGGER.log(level, "monitorJob(JobId jobId, ActiveJob activeJob) called with job id: " + jobId); + boolean found = jobId2EntityId.get(jobId) != null; + LOGGER.log(level, "Job was found to be: " + (found ? "Active" : "Inactive")); if (entityEventListeners.containsKey(entityId)) { if (jobId2EntityId.containsKey(jobId)) { LOGGER.severe("Job is already being monitored for job: " + jobId); return; } - if (DEBUG) { - LOGGER.log(Level.WARNING, "monitoring started for job id: " + jobId); - } + LOGGER.log(level, "monitoring started for job id: " + jobId); } else { LOGGER.info("No listener was found for the entity: " + entityId); } @@ -140,9 +136,7 @@ if (entityId != null) { add(new ActiveEvent(jobId, Kind.JOB_FINISHED, entityId, Pair.of(jobStatus, exceptions))); } else { - if (LOGGER.isLoggable(Level.INFO)) { - LOGGER.info("NO NEED TO NOTIFY JOB FINISH!"); - } + LOGGER.log(level, "NO NEED TO NOTIFY JOB FINISH!"); } } @@ -156,20 +150,16 @@ @Override public IActiveEntityEventsListener getListener(EntityId entityId) { - if (DEBUG) { - LOGGER.log(Level.WARNING, "getActiveEntityListener(EntityId entityId) was called with entity " + entityId); - IActiveEntityEventsListener listener = entityEventListeners.get(entityId); - LOGGER.log(Level.WARNING, "Listener found: " + listener); - } + LOGGER.log(level, "getActiveEntityListener(EntityId entityId) was called with entity " + entityId); + IActiveEntityEventsListener listener = entityEventListeners.get(entityId); + LOGGER.log(level, "Listener found: " + listener); return entityEventListeners.get(entityId); } @Override public synchronized IActiveEntityEventsListener[] getEventListeners() { - if (DEBUG) { - LOGGER.log(Level.WARNING, "getEventListeners() was called"); - LOGGER.log(Level.WARNING, "returning " + entityEventListeners.size() + " Listeners"); - } + LOGGER.log(level, "getEventListeners() was called"); + LOGGER.log(level, "returning " + entityEventListeners.size() + " Listeners"); return entityEventListeners.values().toArray(new IActiveEntityEventsListener[entityEventListeners.size()]); } @@ -178,11 +168,8 @@ if (suspended) { throw new RuntimeDataException(ErrorCode.ACTIVE_NOTIFICATION_HANDLER_IS_SUSPENDED); } - if (DEBUG) { - LOGGER.log(Level.WARNING, - "registerListener(IActiveEntityEventsListener listener) was called for the entity " - + listener.getEntityId()); - } + LOGGER.log(level, "registerListener(IActiveEntityEventsListener listener) was called for the entity " + + listener.getEntityId()); if (entityEventListeners.containsKey(listener.getEntityId())) { throw new RuntimeDataException(ErrorCode.ACTIVE_ENTITY_IS_ALREADY_REGISTERED, listener.getEntityId()); } @@ -194,7 +181,7 @@ if (suspended) { throw new RuntimeDataException(ErrorCode.ACTIVE_NOTIFICATION_HANDLER_IS_SUSPENDED); } - LOGGER.log(Level.FINE, "unregisterListener(IActiveEntityEventsListener listener) was called for the entity " + LOGGER.log(level, "unregisterListener(IActiveEntityEventsListener listener) was called for the entity " + listener.getEntityId()); IActiveEntityEventsListener registeredListener = entityEventListeners.remove(listener.getEntityId()); if (registeredListener == null) { @@ -221,16 +208,16 @@ @Override public synchronized void recover() throws HyracksDataException { - LOGGER.log(Level.FINE, "Starting active recovery"); + LOGGER.log(level, "Starting active recovery"); for (IActiveEntityEventsListener listener : entityEventListeners.values()) { synchronized (listener) { - LOGGER.log(Level.FINE, "Entity " + listener.getEntityId() + " is " + listener.getStats()); + LOGGER.log(level, "Entity " + listener.getEntityId() + " is " + listener.getStats()); if (listener.getState() == ActivityState.PERMANENTLY_FAILED && listener instanceof IActiveEntityController) { - LOGGER.log(Level.FINE, "Recovering"); + LOGGER.log(level, "Recovering"); ((IActiveEntityController) listener).recover(); } else { - LOGGER.log(Level.FINE, "Only notifying"); + LOGGER.log(level, "Only notifying"); listener.notifyAll(); } } @@ -243,7 +230,7 @@ if (suspended) { throw new RuntimeDataException(ErrorCode.ACTIVE_EVENT_HANDLER_ALREADY_SUSPENDED); } - LOGGER.log(Level.FINE, "Suspending active events handler"); + LOGGER.log(level, "Suspending active events handler"); suspended = true; } IMetadataLockManager lockManager = mdProvider.getApplicationContext().getMetadataLockManager(); @@ -253,27 +240,27 @@ // exclusive lock all the datasets String dataverseName = listener.getEntityId().getDataverse(); String entityName = listener.getEntityId().getEntityName(); - LOGGER.log(Level.FINE, "Suspending " + listener.getEntityId()); - LOGGER.log(Level.FINE, "Acquiring locks"); + LOGGER.log(level, "Suspending " + listener.getEntityId()); + LOGGER.log(level, "Acquiring locks"); lockManager.acquireActiveEntityWriteLock(mdProvider.getLocks(), dataverseName + '.' + entityName); List<Dataset> datasets = ((ActiveEntityEventsListener) listener).getDatasets(); for (Dataset dataset : datasets) { lockManager.acquireDatasetExclusiveModificationLock(mdProvider.getLocks(), DatasetUtil.getFullyQualifiedName(dataset)); } - LOGGER.log(Level.FINE, "locks acquired"); + LOGGER.log(level, "locks acquired"); ((ActiveEntityEventsListener) listener).suspend(mdProvider); - LOGGER.log(Level.FINE, listener.getEntityId() + " suspended"); + LOGGER.log(level, listener.getEntityId() + " suspended"); } } public void resume(MetadataProvider mdProvider) throws AsterixException, HyracksDataException, InterruptedException { - LOGGER.log(Level.FINE, "Resuming active events handler"); + LOGGER.log(level, "Resuming active events handler"); for (IActiveEntityEventsListener listener : entityEventListeners.values()) { - LOGGER.log(Level.FINE, "Resuming " + listener.getEntityId()); + LOGGER.log(level, "Resuming " + listener.getEntityId()); ((ActiveEntityEventsListener) listener).resume(mdProvider); - LOGGER.log(Level.FINE, listener.getEntityId() + " resumed"); + LOGGER.log(level, listener.getEntityId() + " resumed"); } synchronized (this) { suspended = false; diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java index 09c4983..5503940 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java @@ -31,6 +31,7 @@ import org.apache.asterix.active.ActiveRuntimeId; import org.apache.asterix.active.EntityId; import org.apache.asterix.active.message.ActiveManagerMessage; +import org.apache.asterix.active.message.ActiveManagerMessage.Kind; import org.apache.asterix.app.translator.DefaultStatementExecutorFactory; import org.apache.asterix.common.context.IStorageComponentProvider; import org.apache.asterix.common.dataflow.ICcApplicationContext; @@ -272,8 +273,8 @@ } // make connections between operators - for (Entry<ConnectorDescriptorId, Pair<Pair<IOperatorDescriptor, Integer>, - Pair<IOperatorDescriptor, Integer>>> entry : subJob.getConnectorOperatorMap().entrySet()) { + for (Entry<ConnectorDescriptorId, Pair<Pair<IOperatorDescriptor, Integer>, Pair<IOperatorDescriptor, Integer>>> entry : subJob + .getConnectorOperatorMap().entrySet()) { ConnectorDescriptorId newId = connectorIdMapping.get(entry.getKey()); IConnectorDescriptor connDesc = jobSpec.getConnectorMap().get(newId); Pair<IOperatorDescriptor, Integer> leftOp = entry.getValue().getLeft(); @@ -381,7 +382,7 @@ public static void SendStopMessageToNode(ICcApplicationContext appCtx, EntityId feedId, String intakeNodeLocation, Integer partition) throws Exception { - ActiveManagerMessage stopFeedMessage = new ActiveManagerMessage(ActiveManagerMessage.STOP_ACTIVITY, + ActiveManagerMessage stopFeedMessage = new ActiveManagerMessage(Kind.STOP_ACTIVITY, new ActiveRuntimeId(feedId, FeedIntakeOperatorNodePushable.class.getSimpleName(), partition)); SendActiveMessage(appCtx, stopFeedMessage, intakeNodeLocation); } diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/Action.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/Action.java index 71cb038..74c4364 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/Action.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/Action.java @@ -21,7 +21,7 @@ import org.apache.asterix.metadata.declared.MetadataProvider; import org.apache.hyracks.api.exceptions.HyracksDataException; -abstract class Action { +public abstract class Action { boolean done = false; HyracksDataException failure; @@ -39,21 +39,21 @@ protected abstract void doExecute(MetadataProvider mdProvider) throws Exception; - boolean hasFailed() { + public boolean hasFailed() { return failure != null; } - HyracksDataException getFailure() { + public HyracksDataException getFailure() { return failure; } - synchronized void sync() throws InterruptedException { + public synchronized void sync() throws InterruptedException { while (!done) { wait(); } } - boolean isDone() { + public boolean isDone() { return done; } } \ No newline at end of file diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveStatsTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveStatsTest.java index f8baa0e..e1fdb69 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveStatsTest.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveStatsTest.java @@ -30,6 +30,7 @@ import org.apache.asterix.active.IActiveRuntime; import org.apache.asterix.active.NoRetryPolicyFactory; import org.apache.asterix.active.message.ActivePartitionMessage; +import org.apache.asterix.active.message.ActivePartitionMessage.Event; import org.apache.asterix.algebra.base.ILangExtension.Language; import org.apache.asterix.app.active.ActiveEntityEventsListener; import org.apache.asterix.app.active.ActiveNotificationHandler; @@ -126,8 +127,8 @@ requestedStats = eventsListener.getStats(); Assert.assertTrue(requestedStats.contains("N/A")); // Fake partition message and notify eventListener - ActivePartitionMessage partitionMessage = new ActivePartitionMessage(activeRuntimeId, jobId, - ActivePartitionMessage.ACTIVE_RUNTIME_REGISTERED, null); + ActivePartitionMessage partitionMessage = + new ActivePartitionMessage(activeRuntimeId, jobId, Event.RUNTIME_REGISTERED, null); partitionMessage.handle(appCtx); start.sync(); if (start.hasFailed()) { diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/Actor.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/Actor.java index 3f68651..8d21b55 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/Actor.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/Actor.java @@ -21,7 +21,7 @@ import org.apache.asterix.active.SingleThreadEventProcessor; import org.apache.asterix.metadata.declared.MetadataProvider; -class Actor extends SingleThreadEventProcessor<Action> { +public class Actor extends SingleThreadEventProcessor<Action> { private final MetadataProvider actorMdProvider; diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/TestNodeControllerActor.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/TestNodeControllerActor.java index e7e21b6..99499a3 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/TestNodeControllerActor.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/TestNodeControllerActor.java @@ -23,6 +23,7 @@ import org.apache.asterix.active.ActiveRuntimeId; import org.apache.asterix.active.EntityId; import org.apache.asterix.active.message.ActivePartitionMessage; +import org.apache.asterix.active.message.ActivePartitionMessage.Event; import org.apache.asterix.metadata.declared.MetadataProvider; import org.apache.hyracks.api.job.JobId; @@ -41,9 +42,8 @@ Action registration = new Action() { @Override protected void doExecute(MetadataProvider actorMdProvider) throws Exception { - ActiveEvent event = new ActiveEvent(jobId, Kind.PARTITION_EVENT, entityId, - new ActivePartitionMessage(new ActiveRuntimeId(entityId, id, partition), jobId, - ActivePartitionMessage.ACTIVE_RUNTIME_REGISTERED, null)); + ActiveEvent event = new ActiveEvent(jobId, Kind.PARTITION_EVENT, entityId, new ActivePartitionMessage( + new ActiveRuntimeId(entityId, id, partition), jobId, Event.RUNTIME_REGISTERED, null)); clusterController.activeEvent(event); } }; @@ -55,9 +55,8 @@ Action registration = new Action() { @Override protected void doExecute(MetadataProvider actorMdProvider) throws Exception { - ActiveEvent event = new ActiveEvent(jobId, Kind.PARTITION_EVENT, entityId, - new ActivePartitionMessage(new ActiveRuntimeId(entityId, id, partition), jobId, - ActivePartitionMessage.ACTIVE_RUNTIME_DEREGISTERED, null)); + ActiveEvent event = new ActiveEvent(jobId, Kind.PARTITION_EVENT, entityId, new ActivePartitionMessage( + new ActiveRuntimeId(entityId, id, partition), jobId, Event.RUNTIME_DEREGISTERED, null)); clusterController.activeEvent(event); } }; diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java index 10b528f..b383317 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java @@ -534,7 +534,7 @@ return executeQueryService(str, fmt, uri, params, jsonEncoded, responseCodeValidator, false); } - protected InputStream executeQueryService(String str, OutputFormat fmt, URI uri, + public InputStream executeQueryService(String str, OutputFormat fmt, URI uri, List<CompilationUnit.Parameter> params, boolean jsonEncoded, Predicate<Integer> responseCodeValidator, boolean cancellable) throws Exception { final List<CompilationUnit.Parameter> newParams = upsertParam(params, "format", fmt.mimeType()); @@ -1326,7 +1326,7 @@ if (failedGroup != null) { failedGroup.getTestCase().add(testCaseCtx.getTestCase()); } - throw new Exception("Test \"" + testFile + "\" FAILED!"); + throw new Exception("Test \"" + testFile + "\" FAILED!", e); } } finally { if (numOfFiles == testFileCtxs.size()) { diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java index 5b9b96f..d709845 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java @@ -193,6 +193,7 @@ try { waitForSignal(); } catch (InterruptedException e) { + Thread.currentThread().interrupt(); throw HyracksDataException.create(e); } } diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/AdapterRuntimeManager.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/AdapterRuntimeManager.java index 1b5eeac..f423404 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/AdapterRuntimeManager.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/AdapterRuntimeManager.java @@ -74,29 +74,31 @@ public void stop() throws HyracksDataException, InterruptedException { synchronized (adapterExecutor) { - try { - if (started) { - try { - ctx.getExecutorService().submit(() -> { - if (feedAdapter.stop()) { - execution.get(); - } - return null; - }).get(30, TimeUnit.SECONDS); - } catch (InterruptedException e) { - LOGGER.log(Level.WARNING, "Interrupted while trying to stop an adapter runtime", e); - throw e; - } catch (Exception e) { - LOGGER.log(Level.WARNING, "Exception while trying to stop an adapter runtime", e); - throw HyracksDataException.create(e); - } finally { - execution.cancel(true); + if (!done) { + try { + if (started) { + try { + ctx.getExecutorService().submit(() -> { + if (feedAdapter.stop()) { + execution.get(); + } + return null; + }).get(30, TimeUnit.SECONDS); + } catch (InterruptedException e) { + LOGGER.log(Level.WARNING, "Interrupted while trying to stop an adapter runtime", e); + throw e; + } catch (Exception e) { + LOGGER.log(Level.WARNING, "Exception while trying to stop an adapter runtime", e); + throw HyracksDataException.create(e); + } finally { + execution.cancel(true); + } + } else { + LOGGER.log(Level.WARNING, "Adapter executor was stopped before it starts"); } - } else { - LOGGER.log(Level.WARNING, "Adapter executor was stopped before it starts"); + } finally { + done = true; } - } finally { - done = true; } } } diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java index 4717a7b..3b23d67 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java @@ -389,10 +389,11 @@ if (LOGGER.isLoggable(Level.INFO)) { LOGGER.info("Registering intention to remove node id " + nodeId); } - if (!activeNcConfiguration.containsKey(nodeId)) { + if (activeNcConfiguration.containsKey(nodeId)) { + pendingRemoval.add(nodeId); + } else { LOGGER.warning("Cannot register unknown node " + nodeId + " for pending removal"); } - pendingRemoval.add(nodeId); } public synchronized boolean cancelRemovePending(String nodeId) { diff --git a/hyracks-fullstack/algebricks/algebricks-common/src/main/java/org/apache/hyracks/algebricks/common/constraints/AlgebricksAbsolutePartitionConstraint.java b/hyracks-fullstack/algebricks/algebricks-common/src/main/java/org/apache/hyracks/algebricks/common/constraints/AlgebricksAbsolutePartitionConstraint.java index 8394057..99d9f3d 100644 --- a/hyracks-fullstack/algebricks/algebricks-common/src/main/java/org/apache/hyracks/algebricks/common/constraints/AlgebricksAbsolutePartitionConstraint.java +++ b/hyracks-fullstack/algebricks/algebricks-common/src/main/java/org/apache/hyracks/algebricks/common/constraints/AlgebricksAbsolutePartitionConstraint.java @@ -18,6 +18,8 @@ */ package org.apache.hyracks.algebricks.common.constraints; +import java.util.Arrays; + public class AlgebricksAbsolutePartitionConstraint extends AlgebricksPartitionConstraint { private final String[] locations; @@ -33,4 +35,10 @@ public String[] getLocations() { return locations; } + + @Override + public String toString() { + return Arrays.deepToString(locations); + } + } diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/test/java/org/apache/hyracks/api/test/FrameWriterTestUtils.java b/hyracks-fullstack/hyracks/hyracks-api/src/test/java/org/apache/hyracks/api/test/FrameWriterTestUtils.java index c9cc71e..2c1ce37 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/test/java/org/apache/hyracks/api/test/FrameWriterTestUtils.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/test/java/org/apache/hyracks/api/test/FrameWriterTestUtils.java @@ -19,6 +19,7 @@ package org.apache.hyracks.api.test; import java.util.Collection; +import java.util.Collections; public class FrameWriterTestUtils { public static final String EXCEPTION_MESSAGE = "IFrameWriter Exception in the call to the method "; @@ -32,6 +33,10 @@ Close } + public static TestFrameWriter create() { + return create(Collections.emptyList(), Collections.emptyList(), false); + } + public static TestFrameWriter create(Collection<FrameWriterOperation> exceptionThrowingOperations, Collection<FrameWriterOperation> errorThrowingOperations, boolean deepCopyInputFrames) { CountAnswer openAnswer = diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/DatasetDirectoryService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/DatasetDirectoryService.java index 2685f60..7a9306c 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/DatasetDirectoryService.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/DatasetDirectoryService.java @@ -151,12 +151,18 @@ @Override public synchronized void reportJobFailure(JobId jobId, List<Exception> exceptions) { + LOGGER.log(Level.INFO, "job " + jobId + " failed and is being reported to " + getClass().getSimpleName(), + exceptions.get(0)); DatasetJobRecord djr = getDatasetJobRecord(jobId); + LOGGER.log(Level.INFO, "Dataset job record is " + djr); if (djr != null) { + LOGGER.log(Level.INFO, "Setting exceptions in Dataset job record"); djr.fail(exceptions); } final JobResultInfo jobResultInfo = jobResultLocations.get(jobId); + LOGGER.log(Level.INFO, "Job result info is " + jobResultInfo); if (jobResultInfo != null) { + LOGGER.log(Level.INFO, "Setting exceptions in Job result info"); jobResultInfo.setException(exceptions.isEmpty() ? null : exceptions.get(0)); } notifyAll(); diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java index f18a917..dbbaf9f 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java @@ -66,7 +66,6 @@ import org.apache.hyracks.control.common.job.PartitionState; import org.apache.hyracks.control.common.job.TaskAttemptDescriptor; - public class JobExecutor { private static final Logger LOGGER = Logger.getLogger(JobExecutor.class.getName()); @@ -190,11 +189,11 @@ private void startRunnableActivityClusters() throws HyracksException { Set<TaskCluster> taskClusterRoots = new HashSet<>(); - findRunnableTaskClusterRoots(taskClusterRoots, jobRun.getActivityClusterGraph().getActivityClusterMap() - .values()); - if (LOGGER.isLoggable(Level.FINE)) { - LOGGER.fine("Runnable TC roots: " + taskClusterRoots + ", inProgressTaskClusters: " - + inProgressTaskClusters); + findRunnableTaskClusterRoots(taskClusterRoots, + jobRun.getActivityClusterGraph().getActivityClusterMap().values()); + if (LOGGER.isLoggable(Level.INFO)) { + LOGGER.log(Level.INFO, + "Runnable TC roots: " + taskClusterRoots + ", inProgressTaskClusters: " + inProgressTaskClusters); } if (taskClusterRoots.isEmpty() && inProgressTaskClusters.isEmpty()) { ccs.getWorkQueue() @@ -344,8 +343,8 @@ for (int i = 0; i < tasks.length; ++i) { Task ts = tasks[i]; TaskId tid = ts.getTaskId(); - TaskAttempt taskAttempt = new TaskAttempt(tcAttempt, new TaskAttemptId(new TaskId(tid.getActivityId(), - tid.getPartition()), attempts), ts); + TaskAttempt taskAttempt = new TaskAttempt(tcAttempt, + new TaskAttemptId(new TaskId(tid.getActivityId(), tid.getPartition()), attempts), ts); taskAttempt.setStatus(TaskAttempt.TaskStatus.INITIALIZED, null); locationMap.put(tid, new PartitionLocationExpression(tid.getActivityId().getOperatorDescriptorId(), tid.getPartition())); @@ -496,8 +495,8 @@ final DeploymentId deploymentId = jobRun.getDeploymentId(); final JobId jobId = jobRun.getJobId(); final ActivityClusterGraph acg = jobRun.getActivityClusterGraph(); - final Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies = new HashMap<>( - jobRun.getConnectorPolicyMap()); + final Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies = + new HashMap<>(jobRun.getConnectorPolicyMap()); INodeManager nodeManager = ccs.getNodeManager(); try { byte[] acgBytes = predistributed ? null : JavaSerializationUtils.serialize(acg); @@ -555,14 +554,14 @@ } } final JobId jobId = jobRun.getJobId(); - LOGGER.fine("Abort map for job: " + jobId + ": " + abortTaskAttemptMap); + LOGGER.info("Abort map for job: " + jobId + ": " + abortTaskAttemptMap); INodeManager nodeManager = ccs.getNodeManager(); for (Map.Entry<String, List<TaskAttemptId>> entry : abortTaskAttemptMap.entrySet()) { final NodeControllerState node = nodeManager.getNodeControllerState(entry.getKey()); final List<TaskAttemptId> abortTaskAttempts = entry.getValue(); if (node != null) { - if (LOGGER.isLoggable(Level.FINE)) { - LOGGER.fine("Aborting: " + abortTaskAttempts + " at " + entry.getKey()); + if (LOGGER.isLoggable(Level.INFO)) { + LOGGER.info("Aborting: " + abortTaskAttempts + " at " + entry.getKey()); } try { node.getNodeController().abortTasks(jobId, abortTaskAttempts); @@ -582,6 +581,7 @@ } private void abortDoomedTaskClusters() throws HyracksException { + LOGGER.log(Level.INFO, "aborting doomed task clusters"); Set<TaskCluster> doomedTaskClusters = new HashSet<>(); for (TaskCluster tc : inProgressTaskClusters) { // Start search at TCs that produce no outputs (sinks) @@ -590,6 +590,7 @@ } } + LOGGER.log(Level.INFO, "number of doomed task clusters found = " + doomedTaskClusters.size()); for (TaskCluster tc : doomedTaskClusters) { TaskClusterAttempt tca = findLastTaskClusterAttempt(tc); if (tca != null) { @@ -628,7 +629,7 @@ if ((maxState == null || (cPolicy.consumerWaitsForProducerToFinish() && maxState != PartitionState.COMMITTED)) && findDoomedTaskClusters(partitionProducingTaskClusterMap.get(pid), doomedTaskClusters)) { - doomed = true; + doomed = true; } } if (doomed) { @@ -663,28 +664,36 @@ /** * Indicates that a single task attempt has encountered a failure. - * @param ta Failed Task Attempt - * @param exceptions exeptions thrown during the failure + * + * @param ta + * Failed Task Attempt + * @param exceptions + * exeptions thrown during the failure */ public void notifyTaskFailure(TaskAttempt ta, List<Exception> exceptions) { try { - LOGGER.fine("Received failure notification for TaskAttempt " + ta.getTaskAttemptId()); + LOGGER.log(Level.INFO, "Received failure notification for TaskAttempt " + ta.getTaskAttemptId()); TaskAttemptId taId = ta.getTaskAttemptId(); TaskCluster tc = ta.getTask().getTaskCluster(); TaskClusterAttempt lastAttempt = findLastTaskClusterAttempt(tc); if (lastAttempt != null && taId.getAttempt() == lastAttempt.getAttempt()) { - LOGGER.fine("Marking TaskAttempt " + ta.getTaskAttemptId() + " as failed"); + LOGGER.log(Level.INFO, "Marking TaskAttempt " + ta.getTaskAttemptId() + " as failed"); ta.setStatus(TaskAttempt.TaskStatus.FAILED, exceptions); abortTaskCluster(lastAttempt, TaskClusterAttempt.TaskClusterStatus.FAILED); abortDoomedTaskClusters(); - if (lastAttempt.getAttempt() >= jobRun.getActivityClusterGraph().getMaxReattempts() || isCancelled()) { + int maxReattempts = jobRun.getActivityClusterGraph().getMaxReattempts(); + LOGGER.log(Level.INFO, "Marking TaskAttempt " + ta.getTaskAttemptId() + + " as failed and the number of max re-attempts = " + maxReattempts); + if (lastAttempt.getAttempt() >= maxReattempts || isCancelled()) { + LOGGER.log(Level.INFO, "Aborting the job of " + ta.getTaskAttemptId()); abortJob(exceptions); return; } + LOGGER.log(Level.INFO, "We will try to start runnable activity clusters of " + ta.getTaskAttemptId()); startRunnableActivityClusters(); } else { - LOGGER.warning("Ignoring task failure notification: " + taId + " -- Current last attempt = " - + lastAttempt); + LOGGER.warning( + "Ignoring task failure notification: " + taId + " -- Current last attempt = " + lastAttempt); } } catch (Exception e) { abortJob(Collections.singletonList(e)); diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/TaskFailureWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/TaskFailureWork.java index 486e9c6..8f50087 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/TaskFailureWork.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/TaskFailureWork.java @@ -19,6 +19,8 @@ package org.apache.hyracks.control.cc.work; import java.util.List; +import java.util.logging.Level; +import java.util.logging.Logger; import org.apache.hyracks.api.dataflow.TaskAttemptId; import org.apache.hyracks.api.job.JobId; @@ -28,6 +30,7 @@ import org.apache.hyracks.control.cc.job.TaskAttempt; public class TaskFailureWork extends AbstractTaskLifecycleWork { + private static final Logger LOGGER = Logger.getLogger(TaskFailureWork.class.getName()); private final List<Exception> exceptions; public TaskFailureWork(ClusterControllerService ccs, JobId jobId, TaskAttemptId taId, String nodeId, @@ -38,6 +41,7 @@ @Override protected void performEvent(TaskAttempt ta) { + LOGGER.log(Level.WARNING, "Executing task failure work for " + this, exceptions.get(0)); IJobManager jobManager = ccs.getJobManager(); JobRun run = jobManager.get(jobId); ccs.getDatasetDirectoryService().reportJobFailure(jobId, exceptions); diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/ApplicationMessageWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/ApplicationMessageWork.java index 7f5302a..4f5b556 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/ApplicationMessageWork.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/ApplicationMessageWork.java @@ -28,9 +28,6 @@ import org.apache.hyracks.control.nc.NodeControllerService; import org.apache.hyracks.control.nc.application.NCServiceContext; -/** - * @author rico - */ public class ApplicationMessageWork extends AbstractWork { private static final Logger LOGGER = Logger.getLogger(ApplicationMessageWork.class.getName()); private byte[] message; @@ -63,6 +60,6 @@ @Override public String toString() { - return getName() + ": nodeID: " + nodeId; + return getName() + ": nodeId: " + nodeId; } } diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/NotifyTaskCompleteWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/NotifyTaskCompleteWork.java index f4ee6b0..7728d16 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/NotifyTaskCompleteWork.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/NotifyTaskCompleteWork.java @@ -18,12 +18,16 @@ */ package org.apache.hyracks.control.nc.work; +import java.util.logging.Level; +import java.util.logging.Logger; + import org.apache.hyracks.control.common.job.profiling.om.TaskProfile; import org.apache.hyracks.control.common.work.AbstractWork; import org.apache.hyracks.control.nc.NodeControllerService; import org.apache.hyracks.control.nc.Task; public class NotifyTaskCompleteWork extends AbstractWork { + private static final Logger LOGGER = Logger.getLogger(NotifyTaskCompleteWork.class.getName()); private final NodeControllerService ncs; private final Task task; @@ -40,8 +44,13 @@ ncs.getClusterController().notifyTaskComplete(task.getJobletContext().getJobId(), task.getTaskAttemptId(), ncs.getId(), taskProfile); } catch (Exception e) { - e.printStackTrace(); + LOGGER.log(Level.SEVERE, "Failed notifying task complete for " + task.getTaskAttemptId(), e); } task.getJoblet().removeTask(task); } + + @Override + public String toString() { + return getClass().getSimpleName() + ":" + task.getTaskAttemptId(); + } } diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/NotifyTaskFailureWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/NotifyTaskFailureWork.java index fa8ba28..7ed2c09 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/NotifyTaskFailureWork.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/NotifyTaskFailureWork.java @@ -35,7 +35,6 @@ private final Task task; private final JobId jobId; private final TaskAttemptId taskId; - private final List<Exception> exceptions; public NotifyTaskFailureWork(NodeControllerService ncs, Task task, List<Exception> exceptions, JobId jobId, @@ -49,6 +48,8 @@ @Override public void run() { + LOGGER.log(Level.WARNING, ncs.getId() + " is sending a notification to cc that task " + taskId + " has failed", + exceptions.get(0)); try { IDatasetPartitionManager dpm = ncs.getDatasetPartitionManager(); if (dpm != null) { diff --git a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCSystem.java b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCSystem.java index d9ab210..dea48bd 100644 --- a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCSystem.java +++ b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCSystem.java @@ -74,6 +74,7 @@ } catch (IOException e) { throw new IPCException(e); } catch (InterruptedException e) { + Thread.currentThread().interrupt(); throw new IPCException(e); } } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/freepage/MutableArrayValueReference.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/freepage/MutableArrayValueReference.java index a19e69a..157450a 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/freepage/MutableArrayValueReference.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/freepage/MutableArrayValueReference.java @@ -18,6 +18,8 @@ */ package org.apache.hyracks.storage.am.common.freepage; +import java.nio.charset.StandardCharsets; + import org.apache.hyracks.data.std.api.IValueReference; public class MutableArrayValueReference implements IValueReference { @@ -46,4 +48,9 @@ return array == null ? 0 : array.length; } + @Override + public String toString() { + return new String(array, StandardCharsets.UTF_8); + } + } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeDiskComponent.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeDiskComponent.java index eb8ec92..33bb60e 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeDiskComponent.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeDiskComponent.java @@ -63,4 +63,9 @@ public int getFileReferenceCount() { return btree.getBufferCache().getFileReferenceCount(btree.getFileId()); } + + @Override + public String toString() { + return getClass().getSimpleName() + ":" + btree.getFileReference().getRelativePath(); + } } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeWithBuddyDiskComponent.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeWithBuddyDiskComponent.java index 57b9092..0ba7c30 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeWithBuddyDiskComponent.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeWithBuddyDiskComponent.java @@ -74,4 +74,8 @@ return btree.getBufferCache().getFileReferenceCount(btree.getFileId()); } + @Override + public String toString() { + return getClass().getSimpleName() + ":" + btree.getFileReference().getRelativePath(); + } } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/utils/ComponentMetadataUtil.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/utils/ComponentMetadataUtil.java index 6ccbc8d..40017d1 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/utils/ComponentMetadataUtil.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/utils/ComponentMetadataUtil.java @@ -19,6 +19,8 @@ package org.apache.hyracks.storage.am.lsm.common.utils; import java.util.List; +import java.util.logging.Level; +import java.util.logging.Logger; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.data.std.api.IPointable; @@ -32,8 +34,8 @@ public class ComponentMetadataUtil { - public static final MutableArrayValueReference MARKER_LSN_KEY = - new MutableArrayValueReference("Marker".getBytes()); + private static final Logger LOGGER = Logger.getLogger(ComponentMetadataUtil.class.getName()); + public static final MutableArrayValueReference MARKER_LSN_KEY = new MutableArrayValueReference("Marker".getBytes()); public static final long NOT_FOUND = -1L; private ComponentMetadataUtil() { @@ -71,16 +73,28 @@ * @throws HyracksDataException */ public static void get(ILSMIndex index, IValueReference key, IPointable pointable) throws HyracksDataException { + LOGGER.log(Level.INFO, "Getting " + key + " from index " + index); // Lock the opTracker to ensure index components don't change synchronized (index.getOperationTracker()) { index.getCurrentMemoryComponent().getMetadata().get(key, pointable); if (pointable.getLength() == 0) { + LOGGER.log(Level.INFO, key + " was not found in mutable memory component of " + index); // was not found in the in current mutable component, search in the other in memory components fromImmutableMemoryComponents(index, key, pointable); if (pointable.getLength() == 0) { + LOGGER.log(Level.INFO, key + " was not found in all immmutable memory components of " + index); // was not found in the in all in memory components, search in the disk components fromDiskComponents(index, key, pointable); + if (pointable.getLength() == 0) { + LOGGER.log(Level.INFO, key + " was not found in all disk components of " + index); + } else { + LOGGER.log(Level.INFO, key + " was found in disk components of " + index); + } + } else { + LOGGER.log(Level.INFO, key + " was found in the immutable memory components of " + index); } + } else { + LOGGER.log(Level.INFO, key + " was found in mutable memory component of " + index); } } } @@ -105,7 +119,9 @@ private static void fromDiskComponents(ILSMIndex index, IValueReference key, IPointable pointable) throws HyracksDataException { + LOGGER.log(Level.INFO, "Getting " + key + " from disk components of " + index); for (ILSMDiskComponent c : index.getImmutableComponents()) { + LOGGER.log(Level.INFO, "Getting " + key + " from disk components " + c); c.getMetadata().get(key, pointable); if (pointable.getLength() != 0) { // Found @@ -115,10 +131,13 @@ } private static void fromImmutableMemoryComponents(ILSMIndex index, IValueReference key, IPointable pointable) { + LOGGER.log(Level.INFO, "Getting " + key + " from immutable memory components of " + index); List<ILSMMemoryComponent> memComponents = index.getMemoryComponents(); int numOtherMemComponents = memComponents.size() - 1; int next = index.getCurrentMemoryComponentIndex(); + LOGGER.log(Level.INFO, index + " has " + numOtherMemComponents + " immutable memory components"); for (int i = 0; i < numOtherMemComponents; i++) { + LOGGER.log(Level.INFO, "trying to get " + key + " from immutable memory components number: " + (i + 1)); next = next - 1; if (next < 0) { next = memComponents.size() - 1; diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexDiskComponent.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexDiskComponent.java index f2b3284..2470a39 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexDiskComponent.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexDiskComponent.java @@ -75,4 +75,9 @@ public int getFileReferenceCount() { return deletedKeysBTree.getBufferCache().getFileReferenceCount(deletedKeysBTree.getFileId()); } + + @Override + public String toString() { + return getClass().getSimpleName() + ":" + ((OnDiskInvertedIndex) invIndex).getInvListsFile().getRelativePath(); + } } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeDiskComponent.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeDiskComponent.java index 982f89b..54ef122 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeDiskComponent.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeDiskComponent.java @@ -76,4 +76,9 @@ public int getFileReferenceCount() { return rtree.getBufferCache().getFileReferenceCount(rtree.getFileId()); } + + @Override + public String toString() { + return getClass().getSimpleName() + ":" + rtree.getFileReference().getRelativePath(); + } } -- To view, visit https://asterix-gerrit.ics.uci.edu/1921 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-MessageType: newchange Gerrit-Change-Id: I7044896559798426c04a3f46861bc5335b25d140 Gerrit-PatchSet: 1 Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-Owner: abdullah alamoudi <bamou...@gmail.com>