Young-Seok Kim has submitted this change and it was merged. Change subject: ASTERIXDB-1011: added flow control for merge policy See the design document here: https://cwiki.apache.org/confluence/display/ASTERIXDB/Flush-Operation+Flow+Control+For+Merge+Policy ......................................................................
ASTERIXDB-1011: added flow control for merge policy See the design document here: https://cwiki.apache.org/confluence/display/ASTERIXDB/Flush-Operation+Flow+Control+For+Merge+Policy Change-Id: Ide99c022861f96cd60bc8f5795c4964ab02b3e14 Reviewed-on: https://asterix-gerrit.ics.uci.edu/795 Tested-by: Jenkins <[email protected]> Reviewed-by: Murtadha Hubail <[email protected]> --- M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicy.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMMergePolicy.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ConstantMergePolicy.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/NoMergePolicy.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/PrefixMergePolicy.java 6 files changed, 344 insertions(+), 37 deletions(-) Approvals: Murtadha Hubail: Looks good to me, approved Jenkins: Verified diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicy.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicy.java index a7374d3..3d112ef 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicy.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicy.java @@ -46,13 +46,13 @@ private final int datasetID; public CorrelatedPrefixMergePolicy(IIndexLifecycleManager datasetLifecycleManager, int datasetID) { - this.datasetLifecycleManager = (DatasetLifecycleManager)datasetLifecycleManager; + this.datasetLifecycleManager = (DatasetLifecycleManager) datasetLifecycleManager; this.datasetID = datasetID; } @Override - public void diskComponentAdded(final ILSMIndex index, boolean fullMergeIsRequested) throws HyracksDataException, - IndexException { + public void diskComponentAdded(final ILSMIndex index, boolean fullMergeIsRequested) + throws HyracksDataException, IndexException { // This merge policy will only look at primary indexes in order to evaluate if a merge operation is needed. If it decides that // a merge operation is needed, then it will merge *all* the indexes that belong to the dataset. The criteria to decide if a merge // is needed is the same as the one that is used in the prefix merge policy: @@ -113,8 +113,8 @@ // Reverse the components order back to its original order Collections.reverse(mergableComponents); - ILSMIndexAccessor accessor = lsmIndex.createAccessor( - NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE); + ILSMIndexAccessor accessor = lsmIndex.createAccessor(NoOpOperationCallback.INSTANCE, + NoOpOperationCallback.INSTANCE); accessor.scheduleMerge(lsmIndex.getIOOperationCallback(), mergableComponents); } break; @@ -127,4 +127,10 @@ maxMergableComponentSize = Long.parseLong(properties.get("max-mergable-component-size")); maxToleranceComponentCount = Integer.parseInt(properties.get("max-tolerance-component-count")); } + + @Override + public boolean isMergeLagging(ILSMIndex index) { + //TODO implement properly according to the merge policy + return false; + } } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMMergePolicy.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMMergePolicy.java index 841117b..c64fe63 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMMergePolicy.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMMergePolicy.java @@ -25,8 +25,35 @@ import org.apache.hyracks.storage.am.common.api.IndexException; public interface ILSMMergePolicy { - public void diskComponentAdded(ILSMIndex index, boolean fullMergeIsRequested) throws HyracksDataException, - IndexException; + public void diskComponentAdded(ILSMIndex index, boolean fullMergeIsRequested) + throws HyracksDataException, IndexException; public void configure(Map<String, String> properties); + + /** + * This method is used for flush-operation flow control: + * When the space occupancy of the in-memory component exceeds a specified memory threshold, + * entries are flushed to disk. As entries accumulate on disk, the entries are periodically + * merged together subject to a merge policy that decides when and what to merge. A merge + * policy may impose a certain constraint such as a maximum number of mergable(merge-able) + * disk components in order to provide a reasonable query response time. Otherwise, the query + * response time gets slower as the number of disk components increases. + * In order to avoid such an unexpected situation according to the merge policy, a way to + * control the number of disk components is provided by introducing a new method, + * isMegeLagging() in ILSMMergePolicy interface. When flushing an in-memory component is completed, + * the provided isMergeLagging() method is called to decide whether the memory budget for the + * current flushed in-memory component should be available for the incoming updated(inserted/deleted/updated) + * entries or not. If the method returns true, i.e., the merge operation is lagged according to + * the merge policy, the memory budget will not be made available for the incoming entries by + * making the current flush operation thread wait until (ongoing) merge operation finishes. + * Therefore, this will effectively prevent the number of disk components from exceeding + * a threshold of the allowed number of disk components. + * + * @param index + * @return true if merge operation is lagged according to the implemented merge policy, + * false otherwise. + * @throws HyracksDataException + * @throws IndexException + */ + public boolean isMergeLagging(ILSMIndex index) throws HyracksDataException, IndexException; } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ConstantMergePolicy.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ConstantMergePolicy.java index 93d6fd4..aad5bf4 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ConstantMergePolicy.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ConstantMergePolicy.java @@ -35,20 +35,20 @@ private int numComponents; @Override - public void diskComponentAdded(final ILSMIndex index, boolean fullMergeIsRequested) throws HyracksDataException, - IndexException { + public void diskComponentAdded(final ILSMIndex index, boolean fullMergeIsRequested) + throws HyracksDataException, IndexException { List<ILSMComponent> immutableComponents = index.getImmutableComponents(); - for (ILSMComponent c : immutableComponents) { - if (c.getState() != ComponentState.READABLE_UNWRITABLE) { - return; - } + + if (!areComponentsMergable(immutableComponents)) { + return; } + if (fullMergeIsRequested) { - ILSMIndexAccessor accessor = (ILSMIndexAccessor) index.createAccessor(NoOpOperationCallback.INSTANCE, + ILSMIndexAccessor accessor = index.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE); accessor.scheduleFullMerge(index.getIOOperationCallback()); } else if (immutableComponents.size() >= numComponents) { - ILSMIndexAccessor accessor = (ILSMIndexAccessor) index.createAccessor(NoOpOperationCallback.INSTANCE, + ILSMIndexAccessor accessor = index.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE); accessor.scheduleMerge(index.getIOOperationCallback(), immutableComponents); } @@ -58,4 +58,89 @@ public void configure(Map<String, String> properties) { numComponents = Integer.parseInt(properties.get("num-components")); } + + @Override + public boolean isMergeLagging(ILSMIndex index) throws HyracksDataException, IndexException { + // see PrefixMergePolicy.isMergeLagging() for the rationale behind this code. + + /** + * case 1. + * if totalImmutableCommponentCount < threshold, + * merge operation is not lagged ==> return false. + * case 2. + * if a) totalImmutableCommponentCount >= threshold && b) there is an ongoing merge, + * merge operation is lagged. ==> return true. + * case 3. *SPECIAL CASE* + * if a) totalImmutableCommponentCount >= threshold && b) there is *NO* ongoing merge, + * merge operation is lagged. ==> *schedule a merge operation* and then return true. + * This is a special case that requires to schedule a merge operation. + * Otherwise, all flush operations will be hung. + * This case can happen in a following situation: + * The system may crash when + * condition 1) the mergableImmutableCommponentCount >= threshold and + * condition 2) merge operation is going on. + * After the system is recovered, still condition 1) is true. + * If there are flush operations in the same dataset partition after the recovery, + * all these flush operations may not proceed since there is no ongoing merge and + * there will be no new merge either in this situation. + */ + + List<ILSMComponent> immutableComponents = index.getImmutableComponents(); + int totalImmutableComponentCount = immutableComponents.size(); + + // [case 1] + if (totalImmutableComponentCount < numComponents) { + return false; + } + + boolean isMergeOngoing = isMergeOngoing(immutableComponents); + + // here, implicitly (totalImmutableComponentCount >= numComponents) is true by passing case 1. + if (isMergeOngoing) { + // [case 2] + return true; + } else { + // [case 3] + // schedule a merge operation after making sure that all components are mergable + if (!areComponentsMergable(immutableComponents)) { + throw new IllegalStateException(); + } + ILSMIndexAccessor accessor = index.createAccessor(NoOpOperationCallback.INSTANCE, + NoOpOperationCallback.INSTANCE); + accessor.scheduleMerge(index.getIOOperationCallback(), immutableComponents); + return true; + } + } + + /** + * checks whether all given components are mergable or not + * + * @param immutableComponents + * @return true if all components are mergable, false otherwise. + */ + private boolean areComponentsMergable(List<ILSMComponent> immutableComponents) { + for (ILSMComponent c : immutableComponents) { + if (c.getState() != ComponentState.READABLE_UNWRITABLE) { + return false; + } + } + return true; + } + + /** + * This method returns whether there is an ongoing merge operation or not by checking + * each component state of given components. + * + * @param immutableComponents + * @return true if there is an ongoing merge operation, false otherwise. + */ + private boolean isMergeOngoing(List<ILSMComponent> immutableComponents) { + int size = immutableComponents.size(); + for (int i = 0; i < size; i++) { + if (immutableComponents.get(i).getState() == ComponentState.READABLE_MERGING) { + return true; + } + } + return false; + } } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java index a19532f..b58cc29 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java @@ -200,6 +200,25 @@ try { synchronized (opTracker) { try { + + /** + * [flow control] + * If merge operations are lagged according to the merge policy, + * flushing in-memory components are hold until the merge operation catches up. + * See PrefixMergePolicy.isMergeLagging() for more details. + */ + if (opType == LSMOperationType.FLUSH) { + while (mergePolicy.isMergeLagging(lsmIndex)) { + try { + opTracker.wait(); + } catch (InterruptedException e) { + //ignore + } + } + } else if (opType == LSMOperationType.MERGE) { + opTracker.notifyAll(); + } + int i = 0; // First check if there is any action that is needed to be taken based on the state of each component. for (ILSMComponent c : ctx.getComponentHolder()) { diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/NoMergePolicy.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/NoMergePolicy.java index 39ab815..86be9c8 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/NoMergePolicy.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/NoMergePolicy.java @@ -28,8 +28,8 @@ public class NoMergePolicy implements ILSMMergePolicy { @Override - public void diskComponentAdded(final ILSMIndex index, boolean fullMergeIsRequested) throws HyracksDataException, - IndexException { + public void diskComponentAdded(final ILSMIndex index, boolean fullMergeIsRequested) + throws HyracksDataException, IndexException { // Do nothing } @@ -37,4 +37,9 @@ public void configure(Map<String, String> properties) { // Do nothing } + + @Override + public boolean isMergeLagging(ILSMIndex index) { + return false; + } } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/PrefixMergePolicy.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/PrefixMergePolicy.java index 5b8da53..36cb958 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/PrefixMergePolicy.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/PrefixMergePolicy.java @@ -39,8 +39,188 @@ private int maxToleranceComponentCount; @Override - public void diskComponentAdded(final ILSMIndex index, boolean fullMergeIsRequested) throws HyracksDataException, - IndexException { + public void diskComponentAdded(final ILSMIndex index, boolean fullMergeIsRequested) + throws HyracksDataException, IndexException { + + List<ILSMComponent> immutableComponents = new ArrayList<ILSMComponent>(index.getImmutableComponents()); + + if (!areComponentsReadableWritableState(immutableComponents)) { + return; + } + + if (fullMergeIsRequested) { + ILSMIndexAccessor accessor = index.createAccessor(NoOpOperationCallback.INSTANCE, + NoOpOperationCallback.INSTANCE); + accessor.scheduleFullMerge(index.getIOOperationCallback()); + return; + } + + scheduleMerge(index); + } + + @Override + public void configure(Map<String, String> properties) { + maxMergableComponentSize = Long.parseLong(properties.get("max-mergable-component-size")); + maxToleranceComponentCount = Integer.parseInt(properties.get("max-tolerance-component-count")); + } + + @Override + public boolean isMergeLagging(ILSMIndex index) throws HyracksDataException, IndexException { + + /** + * [for flow-control purpose] + * when merge operations are lagged, threads which flushed components will be blocked + * until merge operations catch up, i.e, until the number of mergable immutable components <= maxToleranceComponentCount + * example: + * suppose that maxToleranceComponentCount = 3 and maxMergableComponentSize = 1GB + * The following shows a set of events occurred in time ti with a brief description. + * time + * t40: c32-1(1GB, RU) c38-33(192MB, RU) c39-39(32MB, RU) c40-40(32MB, RU) + * --> a thread which added c40-40 will trigger a merge including c38-33,c39-39,c40-40 + * t41: c32-1(1GB, RU) c38-33(192MB, RUM) c39-39(32MB, RUM) c40-40(32MB, RUM) c41-41(32MB, RU) + * --> a thread which added c41-41 will not be blocked + * t42: c32-1(1GB, RU) c38-33(192MB, RUM) c39-39(32MB, RUM) c40-40(32MB, RUM) c41-41(32MB, RU) c42-42(32MB, RU) + * --> a thread which added c42-42 will not be blocked + * t43: c32-1(1GB, RU) c38-33(192MB, RUM) c39-39(32MB, RUM) c40-40(32MB, RUM) c41-41(32MB, RU) c42-42(32MB, RU) c43-43(32MB, RU) + * --> a thread which added c43-43 will not be blocked and will not trigger a merge since there is an ongoing merge triggered in t1. + * t44: c32-1(1GB, RU) c38-33(192MB, RUM) c39-39(32MB, RUM) c40-40(32MB, RUM) c41-41(32MB, RU) c42-42(32MB, RU) c43-43(32MB, RU) 'c44-44(32MB, RU)' + * --> a thread which will add c44-44 (even if the disk component is created, but not added to index instance disk components yet) + * will be blocked until the number of RU components < maxToleranceComponentCount + * t45: c32-1(1GB, RU) *c40-33(256MB, RU)* c41-41(32MB, RU) c42-42(32MB, RU) c43-43(32MB, RU) 'c44-44(32MB, RU)' + * --> a thread which completed the merge triggered in t1 added c40-33 and will go ahead and trigger the next merge with c40-33,c41-41,c42-42,c43-43. + * Still, the blocked thread will continue being blocked and the c44-44 was not included in the merge since it's not added yet. + * t46: c32-1(1GB, RU) c40-33(256MB, RUM) c41-41(32MB, RUM) c42-42(32MB, RUM) c43-43(32MB, RUM) c44-44(32MB, RUM) + * --> the merge triggered in t45 is going on and the merge unblocked the blocked thread, so c44-44 was added. + * t47: c32-1(1GB, RU) *c43-33(320MB, RU)* c44-44(32MB, RUM) + * --> a thread completed the merge triggered in t45 and added c43-33. + * t48: c32-1(1GB, RU) c43-33(320MB, RU) c44-44(32MB, RUM) c48-48(32MB, RU) + * --> a thread added c48-48 and will not be blocked and will trigger a merge with c43-44, c44-44, c48-48. + * ... continues ... + * ---------------------------------------- + * legend: + * For example, C32-1 represents a disk component, more specifically, disk component name, where 32-1 represents a timestamp range from t1 to time t32. + * This means that the component C32-1 is a component resulting from a merge operation that merged components C1-1 to C32-32. + * This also implies that if two timestamps in a component name are equal, the component has not been merged yet after it was created. + * RU and RUM are possible state of disk components, where RU represents READABLE_UNWRITABLE and RUM represents READABLE_UNWRITABLE_MERGING. + * Now, c32-1(1GB, RU) represents a disk component resulted from merging c1-1 ~ c32-32 and the component size is 1GB. + * ---------------------------------------- + * The flow control allows at most maxToleranceComponentCount mergable components, + * where the mergable components are disk components whose i) state == RU and ii) size < maxMergableComponentSize. + */ + + /** + * case 1. + * if mergableImmutableCommponentCount < threshold, + * merge operation is not lagged ==> return false. + * case 2. + * if a) mergableImmutableCommponentCount >= threshold && b) there is an ongoing merge, + * merge operation is lagged. ==> return true. + * case 3. *SPECIAL CASE* + * if a) mergableImmutableCommponentCount >= threshold && b) there is *NO* ongoing merge, + * merge operation is lagged. ==> *schedule a merge operation* and then return true. + * This is a special case that requires to schedule a merge operation. + * Otherwise, all flush operations will be hung. + * This case can happen in a following situation: + * The system may crash when + * condition 1) the mergableImmutableCommponentCount >= threshold and + * condition 2) merge operation is going on. + * After the system is recovered, still condition 1) is true. + * If there are flush operations in the same dataset partition after the recovery, + * all these flush operations may not proceed since there is no ongoing merge and + * there will be no new merge either in this situation. + */ + + List<ILSMComponent> immutableComponents = index.getImmutableComponents(); + int mergableImmutableComponentCount = getMergableImmutableComponentCount(immutableComponents); + + // [case 1] + if (mergableImmutableComponentCount < maxToleranceComponentCount) { + return false; + } + + boolean isMergeOngoing = isMergeOngoing(immutableComponents); + + // here, implicitly (mergableImmutableComponentCount >= maxToleranceComponentCount) is true by passing case 1. + if (isMergeOngoing) { + // [case 2] + return true; + } else { + // [case 3] + // make sure that all components are of READABLE_UNWRITABLE state. + if (!areComponentsReadableWritableState(immutableComponents)) { + throw new IllegalStateException(); + } + // schedule a merge operation + boolean isMergeTriggered = scheduleMerge(index); + if (!isMergeTriggered) { + throw new IllegalStateException(); + } + return true; + } + } + + /** + * This method returns whether there is an ongoing merge operation or not by checking + * each component state of given components. + * + * @param immutableComponents + * @return true if there is an ongoing merge operation, false otherwise. + */ + private boolean isMergeOngoing(List<ILSMComponent> immutableComponents) { + int size = immutableComponents.size(); + for (int i = 0; i < size; i++) { + if (immutableComponents.get(i).getState() == ComponentState.READABLE_MERGING) { + return true; + } + } + return false; + } + + /** + * This method returns the number of mergable components among the given list + * of immutable components that are ordered from the latest component to order ones. A caller + * need to make sure the order in the list. + * + * @param immutableComponents + * @return the number of mergable component + */ + private int getMergableImmutableComponentCount(List<ILSMComponent> immutableComponents) { + int count = 0; + for (ILSMComponent c : immutableComponents) { + long componentSize = ((AbstractDiskLSMComponent) c).getComponentSize(); + //stop when the first non-mergable component is found. + if (c.getState() != ComponentState.READABLE_UNWRITABLE || componentSize > maxMergableComponentSize) { + break; + } + ++count; + } + return count; + } + + /** + * checks whether all given components are of READABLE_UNWRITABLE state + * + * @param immutableComponents + * @return true if all components are of READABLE_UNWRITABLE state, false otherwise. + */ + private boolean areComponentsReadableWritableState(List<ILSMComponent> immutableComponents) { + for (ILSMComponent c : immutableComponents) { + if (c.getState() != ComponentState.READABLE_UNWRITABLE) { + return false; + } + } + return true; + } + + /** + * schedule a merge operation according to this prefix merge policy + * + * @param index + * @return true if merge is scheduled, false otherwise. + * @throws HyracksDataException + * @throws IndexException + */ + private boolean scheduleMerge(final ILSMIndex index) throws HyracksDataException, IndexException { // 1. Look at the candidate components for merging in oldest-first order. If one exists, identify the prefix of the sequence of // all such components for which the sum of their sizes exceeds MaxMrgCompSz. Schedule a merge of those components into a new component. // 2. If a merge from 1 doesn't happen, see if the set of candidate components for merging exceeds MaxTolCompCnt. If so, schedule @@ -49,17 +229,6 @@ // Reverse the components order so that we look at components from oldest to newest. Collections.reverse(immutableComponents); - for (ILSMComponent c : immutableComponents) { - if (c.getState() != ComponentState.READABLE_UNWRITABLE) { - return; - } - } - if (fullMergeIsRequested) { - ILSMIndexAccessor accessor = (ILSMIndexAccessor) index.createAccessor(NoOpOperationCallback.INSTANCE, - NoOpOperationCallback.INSTANCE); - accessor.scheduleFullMerge(index.getIOOperationCallback()); - return; - } long totalSize = 0; int startIndex = -1; for (int i = 0; i < immutableComponents.size(); i++) { @@ -80,17 +249,13 @@ } // Reverse the components order back to its original order Collections.reverse(mergableComponents); - ILSMIndexAccessor accessor = (ILSMIndexAccessor) index.createAccessor(NoOpOperationCallback.INSTANCE, + ILSMIndexAccessor accessor = index.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE); accessor.scheduleMerge(index.getIOOperationCallback(), mergableComponents); - break; + return true; } } + return false; } - @Override - public void configure(Map<String, String> properties) { - maxMergableComponentSize = Long.parseLong(properties.get("max-mergable-component-size")); - maxToleranceComponentCount = Integer.parseInt(properties.get("max-tolerance-component-count")); - } } -- To view, visit https://asterix-gerrit.ics.uci.edu/795 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-MessageType: merged Gerrit-Change-Id: Ide99c022861f96cd60bc8f5795c4964ab02b3e14 Gerrit-PatchSet: 5 Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-Owner: Young-Seok Kim <[email protected]> Gerrit-Reviewer: Jenkins <[email protected]> Gerrit-Reviewer: Murtadha Hubail <[email protected]> Gerrit-Reviewer: Young-Seok Kim <[email protected]>
