Repository: asterixdb Updated Branches: refs/heads/master 76ecc4b19 -> e5a65429d
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e5a65429/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java index b0abeb1..e1d5114 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java @@ -63,6 +63,7 @@ public class LSMHarness implements ILSMHarness { private static final Logger LOGGER = Logger.getLogger(LSMHarness.class.getName()); protected final ILSMIndex lsmIndex; + protected final ComponentReplacementContext componentReplacementCtx; protected final ILSMMergePolicy mergePolicy; protected final ILSMOperationTracker opTracker; protected final AtomicBoolean fullMergeIsRequested; @@ -84,83 +85,97 @@ public class LSMHarness implements ILSMHarness { if (replicationEnabled) { this.componentsToBeReplicated = new ArrayList<>(); } + componentReplacementCtx = new ComponentReplacementContext(lsmIndex); } protected boolean getAndEnterComponents(ILSMIndexOperationContext ctx, LSMOperationType opType, boolean isTryOperation) throws HyracksDataException { - validateOperationEnterComponentsState(ctx); - synchronized (opTracker) { - while (true) { - lsmIndex.getOperationalComponents(ctx); - // Before entering the components, prune those corner cases that indeed should not proceed. - switch (opType) { - case FLUSH: - // if the lsm index does not have memory components allocated, then nothing to flush - if (!lsmIndex.isMemoryComponentsAllocated()) { - return false; - } - ILSMMemoryComponent flushingComponent = (ILSMMemoryComponent) ctx.getComponentHolder().get(0); - if (!flushingComponent.isModified()) { - if (flushingComponent.getState() == ComponentState.READABLE_UNWRITABLE) { - //The mutable component has not been modified by any writer. There is nothing to flush. - //since the component is empty, set its state back to READABLE_WRITABLE only when it's - //state has been set to READABLE_UNWRITABLE - flushingComponent.setState(ComponentState.READABLE_WRITABLE); - opTracker.notifyAll(); - - // Call recycled only when we change it's state is reset back to READABLE_WRITABLE - // Otherwise, if the component is in other state, e.g., INACTIVE, or - // READABLE_UNWRITABLE_FLUSHING, it's not considered as being recycled here. - lsmIndex.getIOOperationCallback().recycled(flushingComponent); + long before = 0L; + if (ctx.isTracingEnabled()) { + before = System.nanoTime(); + } + try { + validateOperationEnterComponentsState(ctx); + synchronized (opTracker) { + while (true) { + lsmIndex.getOperationalComponents(ctx); + // Before entering the components, prune those corner cases that indeed should not proceed. + switch (opType) { + case FLUSH: + // if the lsm index does not have memory components allocated, then nothing to flush + if (!lsmIndex.isMemoryComponentsAllocated()) { + return false; } - return false; - } - if (flushingComponent.getWriterCount() > 0) { - /* - * This case is a case where even though FLUSH log was flushed to disk and scheduleFlush is triggered, - * the current in-memory component (whose state was changed to READABLE_WRITABLE (RW) - * from READABLE_UNWRITABLE(RU) before FLUSH log was written to log tail (which is memory buffer of log file) - * and then the state was changed back to RW (as shown in the following scenario)) can have writers - * based on the current code base/design. - * Thus, the writer count of the component may be greater than 0. - * if this happens, intead of throwing exception, scheduleFlush() deal with this situation by not flushing - * the component. - * Please see issue 884 for more detail information: - * https://code.google.com/p/asterixdb/issues/detail?id=884&q=owner%3Akisskys%40gmail.com&colspec=ID%20Type%20Status%20Priority%20Milestone%20Owner%20Summary%20ETA%20Severity - * - */ - return false; - } - break; - case MERGE: - if (ctx.getComponentHolder().size() < 2 - && ctx.getOperation() != IndexOperation.DELETE_DISK_COMPONENTS) { - // There is only a single component. There is nothing to merge. - return false; + ILSMMemoryComponent flushingComponent = + (ILSMMemoryComponent) ctx.getComponentHolder().get(0); + if (!flushingComponent.isModified()) { + recycle(flushingComponent); + return false; + } + if (flushingComponent.getWriterCount() > 0) { + /* + * This case is a case where even though FLUSH log was flushed to disk + * and scheduleFlush is triggered, the current in-memory component (whose state was + * changed to READABLE_WRITABLE (RW) from READABLE_UNWRITABLE(RU) before FLUSH log + * was written to log tail (which is memory buffer of log file) and then the state was + * changed back to RW (as shown in the following scenario)) can have writers based on + * the current code base/design. Thus, the writer count of the component may be greater + * than 0. if this happens, intead of throwing exception, scheduleFlush() deal with + * this situation by not flushing the component. + * for more detailed information: ASTERIXDB-1027 + */ + return false; + } + break; + case MERGE: + if (ctx.getComponentHolder().size() < 2 + && ctx.getOperation() != IndexOperation.DELETE_DISK_COMPONENTS) { + // There is only a single component. There is nothing to merge. + return false; + } + break; + default: + break; + } + if (enterComponents(ctx, opType)) { + return true; + } else if (isTryOperation) { + return false; + } + try { + // Flush and merge operations should never reach this wait call, + // because they are always try operations. If they fail to enter the components, + // then it means that there are an ongoing flush/merge operation on + // the same components, so they should not proceed. + if (opType == LSMOperationType.MODIFICATION) { + // before waiting, make sure the index is in a modifiable state to avoid waiting forever. + ensureIndexModifiable(); } - break; - default: - break; - } - if (enterComponents(ctx, opType)) { - return true; - } else if (isTryOperation) { - return false; - } - try { - // Flush and merge operations should never reach this wait call, because they are always try operations. - // If they fail to enter the components, then it means that there are an ongoing flush/merge operation on - // the same components, so they should not proceed. - if (opType == LSMOperationType.MODIFICATION) { - // before waiting, make sure the index is in a modifiable state to avoid waiting forever. - ensureIndexModifiable(); + opTracker.wait(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw HyracksDataException.create(e); } - opTracker.wait(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw HyracksDataException.create(e); } } + } finally { + if (ctx.isTracingEnabled()) { + ctx.incrementEnterExitTime(System.nanoTime() - before); + } + } + } + + private void recycle(ILSMMemoryComponent flushingComponent) throws HyracksDataException { + if (flushingComponent.getState() == ComponentState.READABLE_UNWRITABLE) { + //The mutable component has not been modified by any writer. + // There is nothing to flush. Since the component is empty, set its state back + // to READABLE_WRITABLE only when it's state has been set to READABLE_UNWRITABLE + flushingComponent.setState(ComponentState.READABLE_WRITABLE); + opTracker.notifyAll(); // NOSONAR: Always synchronized from caller + // Call recycled only when we change it's state is reset back to READABLE_WRITABLE + // Otherwise, if the component is in other state, e.g., INACTIVE, or + // READABLE_UNWRITABLE_FLUSHING, it's not considered as being recycled here. + lsmIndex.getIOOperationCallback().recycled(flushingComponent); } } @@ -179,7 +194,7 @@ public class LSMHarness implements ILSMHarness { numEntered++; } entranceSuccessful = numEntered == components.size(); - } catch (Throwable e) { + } catch (Throwable e) { // NOSONAR: Log and re-throw if (LOGGER.isLoggable(Level.SEVERE)) { LOGGER.log(Level.SEVERE, opType.name() + " failed to enter components on " + lsmIndex, e); } @@ -196,10 +211,12 @@ public class LSMHarness implements ILSMHarness { i++; numEntered--; } - return false; } - ctx.setAccessingComponents(true); } + if (!entranceSuccessful) { + return false; + } + ctx.setAccessingComponents(true); // Check if there is any action that is needed to be taken based on the operation type switch (opType) { case FLUSH: @@ -209,7 +226,7 @@ public class LSMHarness implements ILSMHarness { lsmIndex.changeMutableComponent(); // Notify all waiting threads whenever a flush has been scheduled since they will check // again if they can grab and enter the mutable component. - opTracker.notifyAll(); + opTracker.notifyAll(); // NOSONAR: Always called from a synchronized block break; case MERGE: lsmIndex.getIOOperationCallback().beforeOperation(LSMIOOperationType.MERGE); @@ -221,8 +238,8 @@ public class LSMHarness implements ILSMHarness { return true; } - private void exitComponents(ILSMIndexOperationContext ctx, LSMOperationType opType, ILSMDiskComponent newComponent, - boolean failedOperation) throws HyracksDataException { + private void doExitComponents(ILSMIndexOperationContext ctx, LSMOperationType opType, + ILSMDiskComponent newComponent, boolean failedOperation) throws HyracksDataException { /** * FLUSH and MERGE operations should always exit the components * to notify waiting threads. @@ -249,75 +266,10 @@ public class LSMHarness implements ILSMHarness { } else if (opType == LSMOperationType.MERGE) { opTracker.notifyAll(); } - - int i = 0; - // First check if there is any action that is needed to be taken based on the state of each component. - for (ILSMComponent c : ctx.getComponentHolder()) { - boolean isMutableComponent = i == 0 && c.getType() == LSMComponentType.MEMORY ? true : false; - c.threadExit(opType, failedOperation, isMutableComponent); - if (c.getType() == LSMComponentType.MEMORY) { - switch (c.getState()) { - case READABLE_UNWRITABLE: - if (isMutableComponent && (opType == LSMOperationType.MODIFICATION - || opType == LSMOperationType.FORCE_MODIFICATION)) { - lsmIndex.changeFlushStatusForCurrentMutableCompoent(true); - } - break; - case INACTIVE: - tracer.instant(c.toString(), traceCategory, Scope.p, lsmIndex.toString()); - ((AbstractLSMMemoryComponent) c).reset(); - // Notify all waiting threads whenever the mutable component's state has changed to - // inactive. This is important because even though we switched the mutable - // components, it is possible that the component that we just switched to is still - // busy flushing its data to disk. Thus, the notification that was issued upon - // scheduling the flush is not enough. - opTracker.notifyAll(); - break; - default: - break; - } - } else { - switch (c.getState()) { - case INACTIVE: - lsmIndex.addInactiveDiskComponent((AbstractLSMDiskComponent) c); - break; - default: - break; - } - } - i++; - } + exitOperationalComponents(ctx, opType, failedOperation); ctx.setAccessingComponents(false); - // Then, perform any action that is needed to be taken based on the operation type. - switch (opType) { - case FLUSH: - // newComponent is null if the flush op. was not performed. - if (!failedOperation && newComponent != null) { - lsmIndex.addDiskComponent(newComponent); - if (replicationEnabled) { - componentsToBeReplicated.clear(); - componentsToBeReplicated.add(newComponent); - triggerReplication(componentsToBeReplicated, false, opType); - } - mergePolicy.diskComponentAdded(lsmIndex, false); - } - break; - case MERGE: - // newComponent is null if the merge op. was not performed. - if (!failedOperation && newComponent != null) { - lsmIndex.subsumeMergedComponents(newComponent, ctx.getComponentHolder()); - if (replicationEnabled) { - componentsToBeReplicated.clear(); - componentsToBeReplicated.add(newComponent); - triggerReplication(componentsToBeReplicated, false, opType); - } - mergePolicy.diskComponentAdded(lsmIndex, fullMergeIsRequested.get()); - } - break; - default: - break; - } - } catch (Throwable e) { + exitOperation(ctx, opType, newComponent, failedOperation); + } catch (Throwable e) { // NOSONAR: Log and re-throw if (LOGGER.isLoggable(Level.SEVERE)) { LOGGER.log(Level.SEVERE, e.getMessage(), e); } @@ -326,7 +278,8 @@ public class LSMHarness implements ILSMHarness { if (failedOperation && (opType == LSMOperationType.MODIFICATION || opType == LSMOperationType.FORCE_MODIFICATION)) { //When the operation failed, completeOperation() method must be called - //in order to decrement active operation count which was incremented in beforeOperation() method. + //in order to decrement active operation count which was incremented + // in beforeOperation() method. opTracker.completeOperation(lsmIndex, opType, ctx.getSearchOperationCallback(), ctx.getModificationCallback()); } else { @@ -344,9 +297,9 @@ public class LSMHarness implements ILSMHarness { if (!inactiveDiskComponents.isEmpty()) { for (ILSMDiskComponent inactiveComp : inactiveDiskComponents) { if (inactiveComp.getFileReferenceCount() == 1) { - if (inactiveDiskComponentsToBeDeleted == null) { - inactiveDiskComponentsToBeDeleted = new LinkedList<>(); - } + inactiveDiskComponentsToBeDeleted = + inactiveDiskComponentsToBeDeleted == null ? new LinkedList<>() + : inactiveDiskComponentsToBeDeleted; inactiveDiskComponentsToBeDeleted.add(inactiveComp); } } @@ -370,15 +323,97 @@ public class LSMHarness implements ILSMHarness { for (ILSMDiskComponent c : inactiveDiskComponentsToBeDeleted) { c.deactivateAndDestroy(); } - } catch (Throwable e) { + } catch (Throwable e) { // NOSONAR Log and re-throw if (LOGGER.isLoggable(Level.WARNING)) { LOGGER.log(Level.WARNING, "Failure scheduling replication or destroying merged component", e); } - throw e; + throw e; // NOSONAR: The last call in the finally clause } } } + } + private void exitOperation(ILSMIndexOperationContext ctx, LSMOperationType opType, ILSMDiskComponent newComponent, + boolean failedOperation) throws HyracksDataException { + // Then, perform any action that is needed to be taken based on the operation type. + switch (opType) { + case FLUSH: + // newComponent is null if the flush op. was not performed. + if (!failedOperation && newComponent != null) { + lsmIndex.addDiskComponent(newComponent); + if (replicationEnabled) { + componentsToBeReplicated.clear(); + componentsToBeReplicated.add(newComponent); + triggerReplication(componentsToBeReplicated, false, opType); + } + mergePolicy.diskComponentAdded(lsmIndex, false); + } + break; + case MERGE: + // newComponent is null if the merge op. was not performed. + if (!failedOperation && newComponent != null) { + lsmIndex.subsumeMergedComponents(newComponent, ctx.getComponentHolder()); + if (replicationEnabled) { + componentsToBeReplicated.clear(); + componentsToBeReplicated.add(newComponent); + triggerReplication(componentsToBeReplicated, false, opType); + } + mergePolicy.diskComponentAdded(lsmIndex, fullMergeIsRequested.get()); + } + break; + default: + break; + } + } + + private void exitOperationalComponents(ILSMIndexOperationContext ctx, LSMOperationType opType, + boolean failedOperation) throws HyracksDataException { + // First check if there is any action that is needed to be taken + // based on the state of each component. + for (int i = 0; i < ctx.getComponentHolder().size(); i++) { + ILSMComponent c = ctx.getComponentHolder().get(i); + boolean isMutableComponent = i == 0 && c.getType() == LSMComponentType.MEMORY ? true : false; + c.threadExit(opType, failedOperation, isMutableComponent); + if (c.getType() == LSMComponentType.MEMORY) { + switch (c.getState()) { + case READABLE_UNWRITABLE: + if (isMutableComponent && (opType == LSMOperationType.MODIFICATION + || opType == LSMOperationType.FORCE_MODIFICATION)) { + lsmIndex.changeFlushStatusForCurrentMutableCompoent(true); + } + break; + case INACTIVE: + tracer.instant(c.toString(), traceCategory, Scope.p, lsmIndex.toString()); + ((AbstractLSMMemoryComponent) c).reset(); + // Notify all waiting threads whenever the mutable component's state + // has changed to inactive. This is important because even though we switched + // the mutable components, it is possible that the component that we just + // switched to is still busy flushing its data to disk. Thus, the notification + // that was issued upon scheduling the flush is not enough. + opTracker.notifyAll(); // NOSONAR: Always called inside synchronized block + break; + default: + break; + } + } else if (c.getState() == ComponentState.INACTIVE) { + lsmIndex.addInactiveDiskComponent((AbstractLSMDiskComponent) c); + } + } + } + + private void exitComponents(ILSMIndexOperationContext ctx, LSMOperationType opType, ILSMDiskComponent newComponent, + boolean failedOperation) throws HyracksDataException { + long before = 0L; + if (ctx.isTracingEnabled()) { + before = System.nanoTime(); + } + try { + doExitComponents(ctx, opType, newComponent, failedOperation); + } finally { + if (ctx.isTracingEnabled()) { + ctx.incrementEnterExitTime(System.nanoTime() - before); + } + } } @Override @@ -532,7 +567,7 @@ public class LSMHarness implements ILSMHarness { newComponent = lsmIndex.flush(operation); operation.getCallback().afterOperation(LSMIOOperationType.FLUSH, null, newComponent); newComponent.markAsValid(lsmIndex.isDurable()); - } catch (Throwable e) { + } catch (Throwable e) { // NOSONAR Log and re-throw failedOperation = true; if (LOGGER.isLoggable(Level.SEVERE)) { LOGGER.log(Level.SEVERE, "Flush failed on " + lsmIndex, e); @@ -562,8 +597,9 @@ public class LSMHarness implements ILSMHarness { throws HyracksDataException { fullMergeIsRequested.set(true); if (!getAndEnterComponents(ctx, LSMOperationType.MERGE, true)) { - // If the merge cannot be scheduled because there is already an ongoing merge on subset/all of the components, then - // whenever the current merge has finished, it will schedule the full merge again. + // If the merge cannot be scheduled because there is already an ongoing merge on + // subset/all of the components, then whenever the current merge has finished, + // it will schedule the full merge again. callback.afterFinalize(LSMIOOperationType.MERGE, null); return; } @@ -583,7 +619,7 @@ public class LSMHarness implements ILSMHarness { newComponent = lsmIndex.merge(operation); operation.getCallback().afterOperation(LSMIOOperationType.MERGE, ctx.getComponentHolder(), newComponent); newComponent.markAsValid(lsmIndex.isDurable()); - } catch (Throwable e) { + } catch (Throwable e) { // NOSONAR: Log and re-throw failedOperation = true; if (LOGGER.isLoggable(Level.SEVERE)) { LOGGER.log(Level.SEVERE, "Failed merge operation on " + lsmIndex, e); @@ -707,6 +743,7 @@ public class LSMHarness implements ILSMHarness { throw e; } finally { exit(ctx); + ctx.logPerformanceCounters(accessor.getTupleCount()); } } @@ -878,4 +915,36 @@ public class LSMHarness implements ILSMHarness { public String toString() { return getClass().getSimpleName() + ":" + lsmIndex; } + + @Override + public void replaceMemoryComponentsWithDiskComponents(ILSMIndexOperationContext ctx, int startIndex) + throws HyracksDataException { + synchronized (opTracker) { + componentReplacementCtx.reset(); + for (int i = 0; i < ctx.getComponentHolder().size(); i++) { + if (i >= startIndex) { + ILSMComponent next = ctx.getComponentHolder().get(i); + if (next.getType() == LSMComponentType.MEMORY + && next.getState() == ComponentState.UNREADABLE_UNWRITABLE) { + componentReplacementCtx.getComponentHolder().add(next); + componentReplacementCtx.swapIndex(i); + } + } + } + if (componentReplacementCtx.getComponentHolder().isEmpty()) { + throw new IllegalStateException( + "replaceMemoryComponentsWithDiskComponents called with no potential components"); + } + // before we exit, we should keep the replaced component ids + // we should also ensure that exact disk component replacement exist + if (componentReplacementCtx.proceed(lsmIndex.getDiskComponents())) { + // exit old component + exitComponents(componentReplacementCtx, LSMOperationType.SEARCH, null, false); + // enter new component + componentReplacementCtx.prepareToEnter(); + enterComponents(componentReplacementCtx, LSMOperationType.SEARCH); + componentReplacementCtx.replace(ctx); + } + } + } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e5a65429/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexDiskComponentBulkLoader.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexDiskComponentBulkLoader.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexDiskComponentBulkLoader.java index 000d5cf..7bc0660 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexDiskComponentBulkLoader.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexDiskComponentBulkLoader.java @@ -62,7 +62,7 @@ public class LSMIndexDiskComponentBulkLoader implements IIndexBulkLoader { //then after operation should be called from harness as well //https://issues.apache.org/jira/browse/ASTERIXDB-1764 lsmIndex.getIOOperationCallback().afterOperation(LSMIOOperationType.LOAD, null, component); - lsmIndex.getLsmHarness().addBulkLoadedComponent(component); + lsmIndex.getHarness().addBulkLoadedComponent(component); } } finally { lsmIndex.getIOOperationCallback().afterFinalize(LSMIOOperationType.LOAD, component); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e5a65429/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexReplicationJob.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexReplicationJob.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexReplicationJob.java index a35d00e..1c28ef1 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexReplicationJob.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexReplicationJob.java @@ -45,7 +45,7 @@ public class LSMIndexReplicationJob extends AbstractReplicationJob implements IL @Override public void endReplication() throws HyracksDataException { if (operationContext != null) { - ((AbstractLSMIndex) (lsmIndex)).getLsmHarness().endReplication(operationContext); + ((AbstractLSMIndex) (lsmIndex)).getHarness().endReplication(operationContext); } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e5a65429/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexSearchCursor.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexSearchCursor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexSearchCursor.java index 724a909..17c681c 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexSearchCursor.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexSearchCursor.java @@ -24,6 +24,7 @@ import java.util.List; import java.util.PriorityQueue; import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder; import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference; import org.apache.hyracks.storage.am.common.api.ITreeIndexCursor; import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent; @@ -36,9 +37,13 @@ import org.apache.hyracks.storage.common.MultiComparator; import org.apache.hyracks.storage.common.buffercache.IBufferCache; public abstract class LSMIndexSearchCursor implements ITreeIndexCursor { + protected static final int SWITCH_COMPONENT_CYCLE = 100; protected final ILSMIndexOperationContext opCtx; protected final boolean returnDeletedTuples; protected PriorityQueueElement outputElement; + protected final ArrayTupleBuilder[] switchComponentTupleBuilders; + protected final boolean[] switchRequest; + protected final PriorityQueueElement[] switchedElements; protected IIndexCursor[] rangeCursors; protected PriorityQueueElement[] pqes; protected PriorityQueue<PriorityQueueElement> outputPriorityQueue; @@ -47,6 +52,8 @@ public abstract class LSMIndexSearchCursor implements ITreeIndexCursor { protected boolean needPushElementIntoQueue; protected boolean includeMutableComponent; protected ILSMHarness lsmHarness; + protected boolean switchPossible = true; + protected int hasNextCallCount = 0; protected List<ILSMComponent> operationalComponents; @@ -55,6 +62,9 @@ public abstract class LSMIndexSearchCursor implements ITreeIndexCursor { this.returnDeletedTuples = returnDeletedTuples; outputElement = null; needPushElementIntoQueue = false; + switchComponentTupleBuilders = new ArrayTupleBuilder[opCtx.getIndex().getNumberOfAllMemoryComponents()]; + switchRequest = new boolean[switchComponentTupleBuilders.length]; + switchedElements = new PriorityQueueElement[switchComponentTupleBuilders.length]; } public ILSMIndexOperationContext getOpCtx() { @@ -98,9 +108,13 @@ public abstract class LSMIndexSearchCursor implements ITreeIndexCursor { @Override public void reset() throws HyracksDataException { + hasNextCallCount = 0; + switchPossible = true; outputElement = null; needPushElementIntoQueue = false; - + for (int i = 0; i < switchRequest.length; i++) { + switchRequest[i] = false; + } try { if (outputPriorityQueue != null) { outputPriorityQueue.clear(); @@ -121,6 +135,7 @@ public abstract class LSMIndexSearchCursor implements ITreeIndexCursor { @Override public boolean hasNext() throws HyracksDataException { + hasNextCallCount++; checkPriorityQueue(); return !outputPriorityQueue.isEmpty(); } @@ -196,7 +211,7 @@ public abstract class LSMIndexSearchCursor implements ITreeIndexCursor { } protected void checkPriorityQueue() throws HyracksDataException { - while (!outputPriorityQueue.isEmpty() || (needPushElementIntoQueue == true)) { + while (!outputPriorityQueue.isEmpty() || needPushElementIntoQueue) { if (!outputPriorityQueue.isEmpty()) { PriorityQueueElement checkElement = outputPriorityQueue.peek(); // If there is no previous tuple or the previous tuple can be ignored http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e5a65429/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java index 9e9f78a..030e4fd 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java @@ -237,7 +237,7 @@ public class LSMInvertedIndex extends AbstractLSMIndex implements IInvertedIndex deletedKeysBTreeAccessors, ((LSMInvertedIndexMemoryComponent) memoryComponents.get(currentMutableComponentId.get())) .getBuddyIndex().getLeafFrameFactory(), - ictx, includeMutableComponent, getLsmHarness(), operationalComponents); + ictx, includeMutableComponent, getHarness(), operationalComponents); } else { LSMInvertedIndexMemoryComponent mutableComponent = (LSMInvertedIndexMemoryComponent) memoryComponents.get(currentMutableComponentId.get()); @@ -246,7 +246,7 @@ public class LSMInvertedIndex extends AbstractLSMIndex implements IInvertedIndex initState = new LSMInvertedIndexRangeSearchCursorInitialState(tokensAndKeysCmp, keyCmp, keysOnlyTuple, ((LSMInvertedIndexMemoryComponent) memoryComponents.get(currentMutableComponentId.get())) .getBuddyIndex().getLeafFrameFactory(), - includeMutableComponent, getLsmHarness(), indexAccessors, deletedKeysBTreeAccessors, pred, + includeMutableComponent, getHarness(), indexAccessors, deletedKeysBTreeAccessors, pred, operationalComponents); } return initState; @@ -427,15 +427,15 @@ public class LSMInvertedIndex extends AbstractLSMIndex implements IInvertedIndex @Override public ILSMIndexAccessor createAccessor(IIndexAccessParameters iap) throws HyracksDataException { - return new LSMInvertedIndexAccessor(getLsmHarness(), + return new LSMInvertedIndexAccessor(getHarness(), createOpContext(iap.getModificationCallback(), iap.getSearchOperationCallback())); } @Override protected LSMInvertedIndexOpContext createOpContext(IModificationOperationCallback modificationCallback, ISearchOperationCallback searchCallback) throws HyracksDataException { - return new LSMInvertedIndexOpContext(memoryComponents, modificationCallback, searchCallback, - invertedIndexFieldsForNonBulkLoadOps, filterFieldsForNonBulkLoadOps, getFilterCmpFactories()); + return new LSMInvertedIndexOpContext(this, memoryComponents, modificationCallback, searchCallback, + invertedIndexFieldsForNonBulkLoadOps, filterFieldsForNonBulkLoadOps, getFilterCmpFactories(), tracer); } @Override @@ -481,7 +481,7 @@ public class LSMInvertedIndex extends AbstractLSMIndex implements IInvertedIndex protected ILSMIOOperation createFlushOperation(AbstractLSMIndexOperationContext opCtx, LSMComponentFileReferences componentFileRefs, ILSMIOOperationCallback callback) throws HyracksDataException { - return new LSMInvertedIndexFlushOperation(new LSMInvertedIndexAccessor(getLsmHarness(), opCtx), + return new LSMInvertedIndexFlushOperation(new LSMInvertedIndexAccessor(getHarness(), opCtx), componentFileRefs.getInsertIndexFileReference(), componentFileRefs.getDeleteIndexFileReference(), componentFileRefs.getBloomFilterFileReference(), callback, fileManager.getBaseDir().getAbsolutePath()); } @@ -489,7 +489,7 @@ public class LSMInvertedIndex extends AbstractLSMIndex implements IInvertedIndex @Override protected ILSMIOOperation createMergeOperation(AbstractLSMIndexOperationContext opCtx, LSMComponentFileReferences mergeFileRefs, ILSMIOOperationCallback callback) throws HyracksDataException { - ILSMIndexAccessor accessor = new LSMInvertedIndexAccessor(getLsmHarness(), opCtx); + ILSMIndexAccessor accessor = new LSMInvertedIndexAccessor(getHarness(), opCtx); IIndexCursor cursor = new LSMInvertedIndexRangeSearchCursor(opCtx); return new LSMInvertedIndexMergeOperation(accessor, cursor, mergeFileRefs.getInsertIndexFileReference(), mergeFileRefs.getDeleteIndexFileReference(), mergeFileRefs.getBloomFilterFileReference(), callback, http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e5a65429/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexOpContext.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexOpContext.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexOpContext.java index 12341da..55da252 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexOpContext.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexOpContext.java @@ -25,12 +25,14 @@ import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.storage.am.common.impls.NoOpIndexAccessParameters; import org.apache.hyracks.storage.am.common.tuples.PermutingTupleReference; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex; import org.apache.hyracks.storage.am.lsm.common.api.ILSMMemoryComponent; import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndexOperationContext; import org.apache.hyracks.storage.am.lsm.invertedindex.api.IInvertedIndexAccessor; import org.apache.hyracks.storage.common.IIndexAccessor; import org.apache.hyracks.storage.common.IModificationOperationCallback; import org.apache.hyracks.storage.common.ISearchOperationCallback; +import org.apache.hyracks.util.trace.ITracer; public class LSMInvertedIndexOpContext extends AbstractLSMIndexOperationContext { @@ -45,11 +47,12 @@ public class LSMInvertedIndexOpContext extends AbstractLSMIndexOperationContext private IInvertedIndexAccessor currentMutableInvIndexAccessors; private IIndexAccessor currentDeletedKeysBTreeAccessors; - public LSMInvertedIndexOpContext(List<ILSMMemoryComponent> mutableComponents, + public LSMInvertedIndexOpContext(ILSMIndex index, List<ILSMMemoryComponent> mutableComponents, IModificationOperationCallback modificationCallback, ISearchOperationCallback searchCallback, - int[] invertedIndexFields, int[] filterFields, IBinaryComparatorFactory[] filterComparatorFactories) - throws HyracksDataException { - super(invertedIndexFields, filterFields, filterComparatorFactories, searchCallback, modificationCallback); + int[] invertedIndexFields, int[] filterFields, IBinaryComparatorFactory[] filterComparatorFactories, + ITracer tracer) throws HyracksDataException { + super(index, invertedIndexFields, filterFields, filterComparatorFactories, searchCallback, modificationCallback, + tracer); mutableInvIndexAccessors = new IInvertedIndexAccessor[mutableComponents.size()]; deletedKeysBTreeAccessors = new IIndexAccessor[mutableComponents.size()]; for (int i = 0; i < mutableComponents.size(); i++) { http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e5a65429/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/ondisk/OnDiskInvertedIndexOpContext.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/ondisk/OnDiskInvertedIndexOpContext.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/ondisk/OnDiskInvertedIndexOpContext.java index fda3775..623fc74 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/ondisk/OnDiskInvertedIndexOpContext.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/ondisk/OnDiskInvertedIndexOpContext.java @@ -81,4 +81,5 @@ public class OnDiskInvertedIndexOpContext implements IIndexOperationContext { public MultiComparator getPrefixSearchCmp() { return prefixSearchCmp; } + } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e5a65429/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/dataflow/ExternalRTreeLocalResource.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/dataflow/ExternalRTreeLocalResource.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/dataflow/ExternalRTreeLocalResource.java index 6918b19..9960590 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/dataflow/ExternalRTreeLocalResource.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/dataflow/ExternalRTreeLocalResource.java @@ -72,7 +72,7 @@ public class ExternalRTreeLocalResource extends LSMRTreeLocalResource { bloomFilterFalsePositiveRate, mergePolicyFactory.createMergePolicy(mergePolicyProperties, ncServiceCtx), opTrackerProvider.getOperationTracker(ncServiceCtx), ioSchedulerProvider.getIoScheduler(ncServiceCtx), ioOpCallbackFactory, linearizeCmpFactory, buddyBTreeFields, durable, isPointMBR, - metadataPageManagerFactory); + metadataPageManagerFactory, ncServiceCtx.getTracer()); } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e5a65429/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/AbstractLSMRTree.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/AbstractLSMRTree.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/AbstractLSMRTree.java index 152b0a8..aac8905 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/AbstractLSMRTree.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/AbstractLSMRTree.java @@ -127,9 +127,9 @@ public abstract class AbstractLSMRTree extends AbstractLSMIndex implements ITree ILinearizeComparatorFactory linearizer, int[] comparatorFields, IBinaryComparatorFactory[] linearizerArray, double bloomFilterFalsePositiveRate, ILSMMergePolicy mergePolicy, ILSMOperationTracker opTracker, ILSMIOOperationScheduler ioScheduler, ILSMIOOperationCallbackFactory ioOpCallbackFactory, boolean durable, - boolean isPointMBR) { + boolean isPointMBR, ITracer tracer) { super(ioManager, diskBufferCache, fileManager, bloomFilterFalsePositiveRate, mergePolicy, opTracker, - ioScheduler, ioOpCallbackFactory, componentFactory, componentFactory, durable); + ioScheduler, ioOpCallbackFactory, componentFactory, componentFactory, durable, tracer); this.rtreeInteriorFrameFactory = rtreeInteriorFrameFactory; this.rtreeLeafFrameFactory = rtreeLeafFrameFactory; this.btreeInteriorFrameFactory = btreeInteriorFrameFactory; @@ -231,9 +231,9 @@ public abstract class AbstractLSMRTree extends AbstractLSMIndex implements ITree @Override protected LSMRTreeOpContext createOpContext(IModificationOperationCallback modCallback, ISearchOperationCallback searchCallback) { - return new LSMRTreeOpContext(memoryComponents, rtreeLeafFrameFactory, rtreeInteriorFrameFactory, - btreeLeafFrameFactory, modCallback, searchCallback, getTreeFields(), getFilterFields(), getLsmHarness(), - comparatorFields, linearizerArray, getFilterCmpFactories()); + return new LSMRTreeOpContext(this, memoryComponents, rtreeLeafFrameFactory, rtreeInteriorFrameFactory, + btreeLeafFrameFactory, modCallback, searchCallback, getTreeFields(), getFilterFields(), getHarness(), + comparatorFields, linearizerArray, getFilterCmpFactories(), tracer); } @Override http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e5a65429/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/ExternalRTree.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/ExternalRTree.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/ExternalRTree.java index ca24b13..6f61935 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/ExternalRTree.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/ExternalRTree.java @@ -61,6 +61,7 @@ import org.apache.hyracks.storage.common.IIndexCursor; import org.apache.hyracks.storage.common.ISearchOperationCallback; import org.apache.hyracks.storage.common.ISearchPredicate; import org.apache.hyracks.storage.common.buffercache.IBufferCache; +import org.apache.hyracks.util.trace.ITracer; /** * This is an lsm r-tree that does not have memory component and is modified @@ -89,18 +90,18 @@ public class ExternalRTree extends LSMRTree implements ITwoPCIndex { int[] comparatorFields, IBinaryComparatorFactory[] linearizerArray, ILSMMergePolicy mergePolicy, ILSMOperationTracker opTracker, ILSMIOOperationScheduler ioScheduler, ILSMIOOperationCallbackFactory ioOpCallbackFactory, int[] buddyBTreeFields, boolean durable, - boolean isPointMBR) { + boolean isPointMBR, ITracer tracer) { super(ioManager, rtreeInteriorFrameFactory, rtreeLeafFrameFactory, btreeInteriorFrameFactory, btreeLeafFrameFactory, diskBufferCache, fileNameManager, componentFactory, bloomFilterFalsePositiveRate, rtreeCmpFactories, btreeCmpFactories, linearizer, comparatorFields, linearizerArray, mergePolicy, - opTracker, ioScheduler, ioOpCallbackFactory, buddyBTreeFields, durable, isPointMBR); + opTracker, ioScheduler, ioOpCallbackFactory, buddyBTreeFields, durable, isPointMBR, tracer); this.secondDiskComponents = new LinkedList<>(); this.fieldCount = fieldCount; } @Override - public ExternalIndexHarness getLsmHarness() { - return (ExternalIndexHarness) super.getLsmHarness(); + public ExternalIndexHarness getHarness() { + return (ExternalIndexHarness) super.getHarness(); } // The subsume merged components is overridden to account for: @@ -207,7 +208,7 @@ public class ExternalRTree extends LSMRTree implements ITwoPCIndex { diskComponents.add(component); secondDiskComponents.add(component); } - getLsmHarness().indexFirstTimeActivated(); + getHarness().indexFirstTimeActivated(); } else { // This index has been opened before or is brand new with no components // components. It should also maintain the version pointer @@ -349,7 +350,7 @@ public class ExternalRTree extends LSMRTree implements ITwoPCIndex { if (!isActive) { throw new HyracksDataException("Failed to clear the index since it is not activated."); } - getLsmHarness().indexClear(); + getHarness().indexClear(); for (ILSMDiskComponent c : diskComponents) { c.deactivateAndDestroy(); @@ -532,7 +533,7 @@ public class ExternalRTree extends LSMRTree implements ITwoPCIndex { component.markAsValid(durable); component.deactivate(); } else { - getLsmHarness().addBulkLoadedComponent(component); + getHarness().addBulkLoadedComponent(component); } } } @@ -587,7 +588,7 @@ public class ExternalRTree extends LSMRTree implements ITwoPCIndex { LSMComponentFileReferences relMergeFileRefs = getMergeFileReferences((ILSMDiskComponent) mergingComponents.get(0), (ILSMDiskComponent) mergingComponents.get(mergingComponents.size() - 1)); - ILSMIndexAccessor accessor = new LSMRTreeAccessor(getLsmHarness(), rctx, buddyBTreeFields); + ILSMIndexAccessor accessor = new LSMRTreeAccessor(getHarness(), rctx, buddyBTreeFields); // create the merge operation. LSMRTreeMergeOperation mergeOp = new LSMRTreeMergeOperation(accessor, cursor, relMergeFileRefs.getInsertIndexFileReference(), relMergeFileRefs.getDeleteIndexFileReference(), @@ -598,22 +599,22 @@ public class ExternalRTree extends LSMRTree implements ITwoPCIndex { @Override public ILSMIndexAccessor createAccessor(ISearchOperationCallback searchCallback, int targetIndexVersion) throws HyracksDataException { - return new LSMRTreeAccessor(getLsmHarness(), createOpContext(searchCallback, targetIndexVersion), + return new LSMRTreeAccessor(getHarness(), createOpContext(searchCallback, targetIndexVersion), buddyBTreeFields); } // This method creates the appropriate opContext for the targeted version public ExternalRTreeOpContext createOpContext(ISearchOperationCallback searchCallback, int targetVersion) { - return new ExternalRTreeOpContext(rtreeCmpFactories, btreeCmpFactories, searchCallback, targetVersion, - getLsmHarness(), comparatorFields, linearizerArray, rtreeLeafFrameFactory, rtreeInteriorFrameFactory, - btreeLeafFrameFactory); + return new ExternalRTreeOpContext(this, rtreeCmpFactories, btreeCmpFactories, searchCallback, targetVersion, + getHarness(), comparatorFields, linearizerArray, rtreeLeafFrameFactory, rtreeInteriorFrameFactory, + btreeLeafFrameFactory, tracer); } // The accessor for disk only indexes don't use modification callback and // always carry the target index version with them @Override public ILSMIndexAccessor createAccessor(IIndexAccessParameters iap) { - return new LSMRTreeAccessor(getLsmHarness(), createOpContext(iap.getSearchOperationCallback(), version), + return new LSMRTreeAccessor(getHarness(), createOpContext(iap.getSearchOperationCallback(), version), buddyBTreeFields); } @@ -646,7 +647,7 @@ public class ExternalRTree extends LSMRTree implements ITwoPCIndex { componentFileRefrences.getDeleteIndexFileReference(), componentFileRefrences.getBloomFilterFileReference(), false); } - getLsmHarness().addTransactionComponents(component); + getHarness().addTransactionComponents(component); } @Override http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e5a65429/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/ExternalRTreeOpContext.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/ExternalRTreeOpContext.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/ExternalRTreeOpContext.java index 73be537..09237a3 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/ExternalRTreeOpContext.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/ExternalRTreeOpContext.java @@ -21,9 +21,11 @@ package org.apache.hyracks.storage.am.lsm.rtree.impls; import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory; import org.apache.hyracks.storage.am.common.api.ITreeIndexFrameFactory; import org.apache.hyracks.storage.am.lsm.common.api.ILSMHarness; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex; import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndexOperationContext; import org.apache.hyracks.storage.common.ISearchOperationCallback; import org.apache.hyracks.storage.common.MultiComparator; +import org.apache.hyracks.util.trace.ITracer; public class ExternalRTreeOpContext extends AbstractLSMIndexOperationContext { private MultiComparator bTreeCmp; @@ -31,12 +33,13 @@ public class ExternalRTreeOpContext extends AbstractLSMIndexOperationContext { private final int targetIndexVersion; private LSMRTreeCursorInitialState initialState; - public ExternalRTreeOpContext(IBinaryComparatorFactory[] rtreeCmpFactories, + public ExternalRTreeOpContext(ILSMIndex index, IBinaryComparatorFactory[] rtreeCmpFactories, IBinaryComparatorFactory[] btreeCmpFactories, ISearchOperationCallback searchCallback, int targetIndexVersion, ILSMHarness lsmHarness, int[] comparatorFields, IBinaryComparatorFactory[] linearizerArray, ITreeIndexFrameFactory rtreeLeafFrameFactory, - ITreeIndexFrameFactory rtreeInteriorFrameFactory, ITreeIndexFrameFactory btreeLeafFrameFactory) { - super(null, null, null, searchCallback, null); + ITreeIndexFrameFactory rtreeInteriorFrameFactory, ITreeIndexFrameFactory btreeLeafFrameFactory, + ITracer tracer) { + super(index, null, null, null, searchCallback, null, tracer); this.targetIndexVersion = targetIndexVersion; this.bTreeCmp = MultiComparator.create(btreeCmpFactories); this.rTreeCmp = MultiComparator.create(rtreeCmpFactories); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e5a65429/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java index 73cb206..9712b7c 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java @@ -63,6 +63,7 @@ import org.apache.hyracks.storage.common.IIndexAccessParameters; import org.apache.hyracks.storage.common.IIndexCursor; import org.apache.hyracks.storage.common.ISearchPredicate; import org.apache.hyracks.storage.common.buffercache.IBufferCache; +import org.apache.hyracks.util.trace.ITracer; public class LSMRTree extends AbstractLSMRTree { protected final int[] buddyBTreeFields; @@ -99,11 +100,11 @@ public class LSMRTree extends AbstractLSMRTree { int[] comparatorFields, IBinaryComparatorFactory[] linearizerArray, ILSMMergePolicy mergePolicy, ILSMOperationTracker opTracker, ILSMIOOperationScheduler ioScheduler, ILSMIOOperationCallbackFactory ioOpCallbackFactory, int[] buddyBTreeFields, boolean durable, - boolean isPointMBR) { + boolean isPointMBR, ITracer tracer) { super(ioManager, rtreeInteriorFrameFactory, rtreeLeafFrameFactory, btreeInteriorFrameFactory, btreeLeafFrameFactory, diskBufferCache, fileNameManager, componentFactory, rtreeCmpFactories, btreeCmpFactories, linearizer, comparatorFields, linearizerArray, bloomFilterFalsePositiveRate, - mergePolicy, opTracker, ioScheduler, ioOpCallbackFactory, durable, isPointMBR); + mergePolicy, opTracker, ioScheduler, ioOpCallbackFactory, durable, isPointMBR, tracer); this.buddyBTreeFields = buddyBTreeFields; } @@ -276,7 +277,7 @@ public class LSMRTree extends AbstractLSMRTree { @Override public ILSMIndexAccessor createAccessor(IIndexAccessParameters iap) { - return new LSMRTreeAccessor(getLsmHarness(), + return new LSMRTreeAccessor(getHarness(), createOpContext(iap.getModificationCallback(), iap.getSearchOperationCallback()), buddyBTreeFields); } @@ -321,7 +322,7 @@ public class LSMRTree extends AbstractLSMRTree { protected ILSMIOOperation createFlushOperation(AbstractLSMIndexOperationContext opCtx, LSMComponentFileReferences componentFileRefs, ILSMIOOperationCallback callback) throws HyracksDataException { - LSMRTreeAccessor accessor = new LSMRTreeAccessor(getLsmHarness(), opCtx, buddyBTreeFields); + LSMRTreeAccessor accessor = new LSMRTreeAccessor(getHarness(), opCtx, buddyBTreeFields); return new LSMRTreeFlushOperation(accessor, componentFileRefs.getInsertIndexFileReference(), componentFileRefs.getDeleteIndexFileReference(), componentFileRefs.getBloomFilterFileReference(), callback, fileManager.getBaseDir().getAbsolutePath()); @@ -331,7 +332,7 @@ public class LSMRTree extends AbstractLSMRTree { protected ILSMIOOperation createMergeOperation(AbstractLSMIndexOperationContext opCtx, LSMComponentFileReferences mergeFileRefs, ILSMIOOperationCallback callback) throws HyracksDataException { ITreeIndexCursor cursor = new LSMRTreeSortedCursor(opCtx, linearizer, buddyBTreeFields); - ILSMIndexAccessor accessor = new LSMRTreeAccessor(getLsmHarness(), opCtx, buddyBTreeFields); + ILSMIndexAccessor accessor = new LSMRTreeAccessor(getHarness(), opCtx, buddyBTreeFields); return new LSMRTreeMergeOperation(accessor, cursor, mergeFileRefs.getInsertIndexFileReference(), mergeFileRefs.getDeleteIndexFileReference(), mergeFileRefs.getBloomFilterFileReference(), callback, fileManager.getBaseDir().getAbsolutePath()); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e5a65429/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeOpContext.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeOpContext.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeOpContext.java index f54f8b9..50f1961 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeOpContext.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeOpContext.java @@ -29,6 +29,7 @@ import org.apache.hyracks.storage.am.common.impls.NoOpIndexAccessParameters; import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallback; import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation; import org.apache.hyracks.storage.am.lsm.common.api.ILSMHarness; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex; import org.apache.hyracks.storage.am.lsm.common.api.ILSMMemoryComponent; import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndexOperationContext; import org.apache.hyracks.storage.am.rtree.impls.RTree; @@ -37,6 +38,7 @@ import org.apache.hyracks.storage.common.IModificationOperationCallback; import org.apache.hyracks.storage.common.ISearchOperationCallback; import org.apache.hyracks.storage.common.ISearchPredicate; import org.apache.hyracks.storage.common.MultiComparator; +import org.apache.hyracks.util.trace.ITracer; public final class LSMRTreeOpContext extends AbstractLSMIndexOperationContext { @@ -50,12 +52,14 @@ public final class LSMRTreeOpContext extends AbstractLSMIndexOperationContext { private BTreeOpContext currentBTreeOpContext; private LSMRTreeCursorInitialState searchInitialState; - public LSMRTreeOpContext(List<ILSMMemoryComponent> mutableComponents, ITreeIndexFrameFactory rtreeLeafFrameFactory, - ITreeIndexFrameFactory rtreeInteriorFrameFactory, ITreeIndexFrameFactory btreeLeafFrameFactory, - IModificationOperationCallback modificationCallback, ISearchOperationCallback searchCallback, - int[] rtreeFields, int[] filterFields, ILSMHarness lsmHarness, int[] comparatorFields, - IBinaryComparatorFactory[] linearizerArray, IBinaryComparatorFactory[] filterComparatorFactories) { - super(rtreeFields, filterFields, filterComparatorFactories, searchCallback, modificationCallback); + public LSMRTreeOpContext(ILSMIndex index, List<ILSMMemoryComponent> mutableComponents, + ITreeIndexFrameFactory rtreeLeafFrameFactory, ITreeIndexFrameFactory rtreeInteriorFrameFactory, + ITreeIndexFrameFactory btreeLeafFrameFactory, IModificationOperationCallback modificationCallback, + ISearchOperationCallback searchCallback, int[] rtreeFields, int[] filterFields, ILSMHarness lsmHarness, + int[] comparatorFields, IBinaryComparatorFactory[] linearizerArray, + IBinaryComparatorFactory[] filterComparatorFactories, ITracer tracer) { + super(index, rtreeFields, filterFields, filterComparatorFactories, searchCallback, modificationCallback, + tracer); mutableRTreeAccessors = new RTree.RTreeAccessor[mutableComponents.size()]; mutableBTreeAccessors = new BTree.BTreeAccessor[mutableComponents.size()]; rtreeOpContexts = new RTreeOpContext[mutableComponents.size()]; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e5a65429/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuples.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuples.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuples.java index a20eeb7..41e9b92 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuples.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuples.java @@ -214,14 +214,14 @@ public class LSMRTreeWithAntiMatterTuples extends AbstractLSMRTree { @Override public ILSMIndexAccessor createAccessor(IIndexAccessParameters iap) { LSMRTreeOpContext opCtx = createOpContext(iap.getModificationCallback(), iap.getSearchOperationCallback()); - return new LSMTreeIndexAccessor(getLsmHarness(), opCtx, cursorFactory); + return new LSMTreeIndexAccessor(getHarness(), opCtx, cursorFactory); } @Override protected ILSMIOOperation createFlushOperation(AbstractLSMIndexOperationContext opCtx, LSMComponentFileReferences componentFileRefs, ILSMIOOperationCallback callback) throws HyracksDataException { - ILSMIndexAccessor accessor = new LSMTreeIndexAccessor(getLsmHarness(), opCtx, cursorFactory); + ILSMIndexAccessor accessor = new LSMTreeIndexAccessor(getHarness(), opCtx, cursorFactory); return new LSMRTreeFlushOperation(accessor, componentFileRefs.getInsertIndexFileReference(), null, null, callback, fileManager.getBaseDir().getAbsolutePath()); } @@ -235,7 +235,7 @@ public class LSMRTreeWithAntiMatterTuples extends AbstractLSMRTree { returnDeletedTuples = true; } ITreeIndexCursor cursor = new LSMRTreeWithAntiMatterTuplesSearchCursor(opCtx, returnDeletedTuples); - ILSMIndexAccessor accessor = new LSMTreeIndexAccessor(getLsmHarness(), opCtx, cursorFactory); + ILSMIndexAccessor accessor = new LSMTreeIndexAccessor(getHarness(), opCtx, cursorFactory); return new LSMRTreeMergeOperation(accessor, cursor, mergeFileRefs.getInsertIndexFileReference(), null, null, callback, fileManager.getBaseDir().getAbsolutePath()); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e5a65429/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/utils/LSMRTreeUtils.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/utils/LSMRTreeUtils.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/utils/LSMRTreeUtils.java index c9df11f..451b122 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/utils/LSMRTreeUtils.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/utils/LSMRTreeUtils.java @@ -72,6 +72,7 @@ import org.apache.hyracks.storage.am.rtree.linearize.ZCurveDoubleComparatorFacto import org.apache.hyracks.storage.am.rtree.linearize.ZCurveIntComparatorFactory; import org.apache.hyracks.storage.am.rtree.tuples.RTreeTypeAwareTupleWriterFactory; import org.apache.hyracks.storage.common.buffercache.IBufferCache; +import org.apache.hyracks.util.trace.ITracer; public class LSMRTreeUtils { public static LSMRTree createLSMTree(IIOManager ioManager, List<IVirtualBufferCache> virtualBufferCaches, @@ -243,7 +244,7 @@ public class LSMRTreeUtils { ILSMOperationTracker opTracker, ILSMIOOperationScheduler ioScheduler, ILSMIOOperationCallbackFactory ioOpCallbackFactory, ILinearizeComparatorFactory linearizeCmpFactory, int[] buddyBTreeFields, boolean durable, boolean isPointMBR, - IMetadataPageManagerFactory freePageManagerFactory) throws HyracksDataException { + IMetadataPageManagerFactory freePageManagerFactory, ITracer tracer) throws HyracksDataException { int keyFieldCount = rtreeCmpFactories.length; int valueFieldCount = typeTraits.length - keyFieldCount; @@ -290,7 +291,7 @@ public class LSMRTreeUtils { btreeLeafFrameFactory, diskBufferCache, fileNameManager, componentFactory, bloomFilterFalsePositiveRate, typeTraits.length, rtreeCmpFactories, btreeCmpFactories, linearizeCmpFactory, comparatorFields, linearizerArray, mergePolicy, opTracker, ioScheduler, ioOpCallbackFactory, buddyBTreeFields, durable, - isPointMBR); + isPointMBR, tracer); } public static ILinearizeComparatorFactory proposeBestLinearizer(ITypeTraits[] typeTraits, int numKeyFields) http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e5a65429/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/TestLsmBtree.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/TestLsmBtree.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/TestLsmBtree.java index 70a29db..8782565 100644 --- a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/TestLsmBtree.java +++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/TestLsmBtree.java @@ -60,6 +60,7 @@ public class TestLsmBtree extends LSMBTree { private final List<ITestOpCallback> searchCallbacks = new ArrayList<>(); private final List<ITestOpCallback> flushCallbacks = new ArrayList<>(); private final List<ITestOpCallback> mergeCallbacks = new ArrayList<>(); + private final List<ITestOpCallback> allocateComponentCallbacks = new ArrayList<>(); private volatile int numScheduledFlushes; private volatile int numStartedFlushes; @@ -171,7 +172,7 @@ public class TestLsmBtree extends LSMBTree { @Override public ILSMIndexAccessor createAccessor(AbstractLSMIndexOperationContext opCtx) { - return new LSMTreeIndexAccessor(getLsmHarness(), opCtx, ctx -> new TestLsmBtreeSearchCursor(ctx, this)); + return new LSMTreeIndexAccessor(getHarness(), opCtx, ctx -> new TestLsmBtreeSearchCursor(ctx, this)); } public int getNumScheduledFlushes() { @@ -257,4 +258,26 @@ public class TestLsmBtree extends LSMBTree { public Semaphore getSearchSemaphore() { return searchSemaphore; } + + public void addAllocateCallback(ITestOpCallback callback) { + synchronized (allocateComponentCallbacks) { + allocateComponentCallbacks.add(callback); + } + } + + public void clearAllocateCallbacks() { + synchronized (allocateComponentCallbacks) { + allocateComponentCallbacks.clear(); + } + } + + @Override + public void allocateMemoryComponents() throws HyracksDataException { + super.allocateMemoryComponents(); + synchronized (allocateComponentCallbacks) { + for (ITestOpCallback callback : allocateComponentCallbacks) { + callback(callback, null); + } + } + } }