[ASTERIXDB-2103][STO] Too many disk components for CorrelatedPolicy - user model changes: no - storage format changes: no - interface changes: yes
Details: Currently CorrelatedMergePolicy uses component Ids to ensure disk components of primary and secondary indexes are merged together, but without synchronization. However, this results in too many disk components for secondary InvertedIndex. The reason is that secondary index could miss some round of merges, if the merge policy finds out the corresponding secondary components are not available (either being merged or being flushed). Even though flow-control on secondary indexes can guarantee the secondary index would catch up the next time, it is still possible that the primary component is finialized, which leaves the secondary components which miss this round of merge are never merged again. This patch fixes this bug by: - Add the mechanism of depending operations to LSM IO operation. An operation finishes only after all depending operations have finished. - For correlated merge policy, the flush/merge of the primary index depends on all flushes/merges of secondary indexes. This ensures when the correlated policy schedules merge, all related components of all indexes are available to merge. Change-Id: Ib6c06ee23f3bfd16b758802388389c00e29780b1 Reviewed-on: https://asterix-gerrit.ics.uci.edu/2018 Sonar-Qube: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Tested-by: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Contrib: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Integration-Tests: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Reviewed-by: Jianfeng Jia <jianfeng....@gmail.com> Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/21ed0f72 Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/21ed0f72 Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/21ed0f72 Branch: refs/heads/master Commit: 21ed0f72681a20ccb6a654f9aa4d54b8d0ea9c5c Parents: 6496590 Author: luochen01 <cl...@uci.edu> Authored: Sun Oct 8 17:23:44 2017 -0700 Committer: Luo Chen <cl...@uci.edu> Committed: Tue Oct 10 08:42:36 2017 -0700 ---------------------------------------------------------------------- .../test/dataflow/ComponentRollbackTest.java | 16 ++--- .../context/CorrelatedPrefixMergePolicy.java | 48 +++++++++---- .../asterix/common/context/DatasetInfo.java | 10 +++ .../common/context/DatasetLifecycleManager.java | 25 ++++--- .../asterix/common/context/DatasetResource.java | 5 ++ .../context/PrimaryIndexOperationTracker.java | 76 ++++++++++++++++++-- .../CorrelatedPrefixMergePolicyTest.java | 2 +- .../am/lsm/btree/impls/ExternalBTree.java | 11 +-- .../lsm/btree/impls/ExternalBTreeWithBuddy.java | 11 +-- .../storage/am/lsm/btree/impls/LSMBTree.java | 6 +- .../lsm/btree/impls/LSMBTreeFlushOperation.java | 8 ++- .../lsm/btree/impls/LSMBTreeMergeOperation.java | 8 ++- .../impls/LSMBTreeWithBuddyMergeOperation.java | 7 +- .../storage/am/lsm/common/api/ILSMHarness.java | 8 ++- .../am/lsm/common/api/ILSMIOOperation.java | 13 ++++ .../storage/am/lsm/common/api/ILSMIndex.java | 6 +- .../am/lsm/common/api/ILSMIndexAccessor.java | 14 ++-- .../common/api/ILSMIndexOperationContext.java | 4 ++ .../lsm/common/impls/AbstractIoOperation.java | 35 ++++++++- .../am/lsm/common/impls/AbstractLSMIndex.java | 19 +++-- .../impls/AbstractLSMIndexOperationContext.java | 16 +++++ .../lsm/common/impls/ConstantMergePolicy.java | 4 +- .../lsm/common/impls/ExternalIndexHarness.java | 9 +-- .../am/lsm/common/impls/FlushOperation.java | 8 +-- .../storage/am/lsm/common/impls/LSMHarness.java | 40 +++++++++-- .../lsm/common/impls/LSMTreeIndexAccessor.java | 13 ++-- .../am/lsm/common/impls/MergeOperation.java | 17 ++--- .../am/lsm/common/impls/PrefixMergePolicy.java | 2 +- .../lsm/common/impls/ThreadCountingTracker.java | 2 +- .../am/lsm/common/impls/TracedIOOperation.java | 11 +++ .../invertedindex/impls/LSMInvertedIndex.java | 15 ++-- .../impls/LSMInvertedIndexAccessor.java | 14 ++-- .../impls/LSMInvertedIndexFlushOperation.java | 7 +- .../impls/LSMInvertedIndexMergeOperation.java | 7 +- .../am/lsm/rtree/impls/ExternalRTree.java | 12 ++-- .../storage/am/lsm/rtree/impls/LSMRTree.java | 4 +- .../lsm/rtree/impls/LSMRTreeFlushOperation.java | 8 ++- .../lsm/rtree/impls/LSMRTreeMergeOperation.java | 7 +- .../impls/LSMRTreeWithAntiMatterTuples.java | 4 +- .../am/lsm/btree/LSMBTreeFileManagerTest.java | 2 +- .../btree/LSMBTreeFilterMergeTestDriver.java | 4 +- .../am/lsm/btree/LSMBTreeMergeTestDriver.java | 2 +- ...MBTreeModificationOperationCallbackTest.java | 6 +- .../btree/LSMBTreeScanDiskComponentsTest.java | 8 +-- ...TreeUpdateInPlaceScanDiskComponentsTest.java | 4 +- .../am/lsm/btree/LSMBTreeUpdateInPlaceTest.java | 4 +- .../storage/am/lsm/btree/impl/TestLsmBtree.java | 10 +-- .../btree/multithread/LSMBTreeTestWorker.java | 2 +- .../lsm/common/test/PrefixMergePolicyTest.java | 2 +- .../LSMInvertedIndexMergeTest.java | 2 +- .../PartitionedLSMInvertedIndexMergeTest.java | 2 +- .../multithread/LSMInvertedIndexTestWorker.java | 2 +- .../am/lsm/rtree/LSMRTreeMergeTestDriver.java | 2 +- .../rtree/multithread/LSMRTreeTestWorker.java | 2 +- .../LSMRTreeWithAntiMatterTuplesTestWorker.java | 2 +- 55 files changed, 435 insertions(+), 153 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/asterixdb/blob/21ed0f72/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ComponentRollbackTest.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ComponentRollbackTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ComponentRollbackTest.java index 76bec8c..5e0e072 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ComponentRollbackTest.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ComponentRollbackTest.java @@ -132,9 +132,9 @@ public class ComponentRollbackTest { public void createIndex() throws Exception { List<List<String>> partitioningKeys = new ArrayList<>(); partitioningKeys.add(Collections.singletonList("key")); - dataset = new TestDataset(DATAVERSE_NAME, DATASET_NAME, DATAVERSE_NAME, DATA_TYPE_NAME, - NODE_GROUP_NAME, null, null, new InternalDatasetDetails(null, PartitioningStrategy.HASH, - partitioningKeys, null, null, null, false, null, false), + dataset = new TestDataset(DATAVERSE_NAME, DATASET_NAME, DATAVERSE_NAME, DATA_TYPE_NAME, NODE_GROUP_NAME, null, + null, new InternalDatasetDetails(null, PartitioningStrategy.HASH, partitioningKeys, null, null, null, + false, null, false), null, DatasetType.INTERNAL, DATASET_ID, 0); PrimaryIndexInfo primaryIndexInfo = nc.createPrimaryIndex(dataset, KEY_TYPES, RECORD_TYPE, META_TYPE, new NoMergePolicyFactory(), null, null, storageManager, KEY_INDEXES, KEY_INDICATORS_LIST); @@ -448,7 +448,7 @@ public class ComponentRollbackTest { for (int i = 0; i < numMergedComponents; i++) { mergedComponents.add(diskComponents.get(i)); } - mergeAccessor.scheduleMerge(lsmBtree.getIOOperationCallback(), mergedComponents); + mergeAccessor.scheduleMerge(lsmBtree.getIOOperationCallback(), mergedComponents, null); merger.waitUntilCount(1); // now that we enetered, we will rollback Rollerback rollerback = new Rollerback(lsmBtree, new DiskComponentLsnPredicate(lsn)); @@ -635,7 +635,7 @@ public class ComponentRollbackTest { for (int i = 0; i < numMergedComponents; i++) { mergedComponents.add(diskComponents.get(i)); } - mergeAccessor.scheduleMerge(lsmBtree.getIOOperationCallback(), mergedComponents); + mergeAccessor.scheduleMerge(lsmBtree.getIOOperationCallback(), mergedComponents, null); merger.waitUntilCount(1); // we will block search lsmBtree.clearSearchCallbacks(); @@ -707,7 +707,7 @@ public class ComponentRollbackTest { for (int i = 0; i < numMergedComponents; i++) { mergedComponents.add(diskComponents.get(i)); } - mergeAccessor.scheduleMerge(lsmBtree.getIOOperationCallback(), mergedComponents); + mergeAccessor.scheduleMerge(lsmBtree.getIOOperationCallback(), mergedComponents, null); merger.waitUntilCount(1); // we will block search lsmBtree.clearSearchCallbacks(); @@ -736,7 +736,7 @@ public class ComponentRollbackTest { } private class Rollerback { - private Thread task; + private final Thread task; private Exception failure; public Rollerback(TestLsmBtree lsmBtree, Predicate<ILSMComponent> predicate) { @@ -766,7 +766,7 @@ public class ComponentRollbackTest { } private class Searcher { - private ExecutorService executor = Executors.newSingleThreadExecutor(); + private final ExecutorService executor = Executors.newSingleThreadExecutor(); private Future<Boolean> task; private volatile boolean entered = false; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/21ed0f72/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicy.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicy.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicy.java index a20e660..e18181c 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicy.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicy.java @@ -31,6 +31,7 @@ import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallback; import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent; import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponentId; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor; import org.apache.hyracks.storage.am.lsm.common.impls.PrefixMergePolicy; @@ -91,14 +92,17 @@ public class CorrelatedPrefixMergePolicy extends PrefixMergePolicy { Set<IndexInfo> indexInfos = datasetLifecycleManager.getDatasetInfo(datasetId).getDatsetIndexInfos(); int partition = getIndexPartition(index, indexInfos); - triggerScheduledMerge(minID, maxID, + List<ILSMIOOperation> dependingMerges = scheduleSecondaryIndexes(minID, maxID, indexInfos.stream().filter(info -> info.getPartition() == partition).collect(Collectors.toSet())); + + schedulePrimaryIndex(minID, maxID, index, dependingMerges); + return true; } /** * Submit merge requests for all disk components within [minID, maxID] - * of all indexes of a given dataset in the given partition + * of all of secondary indexes of a given dataset in the given partition * * @param minID * @param maxID @@ -106,17 +110,39 @@ public class CorrelatedPrefixMergePolicy extends PrefixMergePolicy { * @param indexInfos * @throws HyracksDataException */ - private void triggerScheduledMerge(long minID, long maxID, Set<IndexInfo> indexInfos) throws HyracksDataException { + private List<ILSMIOOperation> scheduleSecondaryIndexes(long minID, long maxID, Set<IndexInfo> indexInfos) + throws HyracksDataException { + List<ILSMIOOperation> mergeOps = new ArrayList<>(); for (IndexInfo info : indexInfos) { ILSMIndex lsmIndex = info.getIndex(); - - List<ILSMDiskComponent> immutableComponents = new ArrayList<>(lsmIndex.getDiskComponents()); - if (isMergeOngoing(immutableComponents)) { + List<ILSMDiskComponent> diskComponents = lsmIndex.getDiskComponents(); + if (lsmIndex.isPrimaryIndex() || isMergeOngoing(diskComponents)) { continue; } - List<ILSMDiskComponent> mergableComponents = new ArrayList<>(); - for (ILSMDiskComponent component : immutableComponents) { - ILSMDiskComponentId id = component.getComponentId(); + List<ILSMDiskComponent> mergeableComponents = collectMergeableComponents(minID, maxID, diskComponents); + ILSMIndexAccessor accessor = + lsmIndex.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE); + mergeOps.add(accessor.scheduleMerge(lsmIndex.getIOOperationCallback(), mergeableComponents, null)); + } + return mergeOps; + } + + private void schedulePrimaryIndex(long minID, long maxID, ILSMIndex primaryIndex, + List<ILSMIOOperation> dependingMerges) throws HyracksDataException { + assert primaryIndex.isPrimaryIndex(); + List<ILSMDiskComponent> diskComponents = primaryIndex.getDiskComponents(); + List<ILSMDiskComponent> mergeableComponents = collectMergeableComponents(minID, maxID, diskComponents); + ILSMIndexAccessor accessor = + primaryIndex.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE); + accessor.scheduleMerge(primaryIndex.getIOOperationCallback(), mergeableComponents, dependingMerges); + } + + private List<ILSMDiskComponent> collectMergeableComponents(long minID, long maxID, + List<ILSMDiskComponent> diskComponents) throws HyracksDataException { + List<ILSMDiskComponent> mergableComponents = new ArrayList<>(); + for (ILSMDiskComponent component : diskComponents) { + ILSMDiskComponentId id = component.getComponentId(); + if (!id.notFound()) { if (id.getMinId() >= minID && id.getMaxId() <= maxID) { mergableComponents.add(component); } @@ -126,10 +152,8 @@ public class CorrelatedPrefixMergePolicy extends PrefixMergePolicy { break; } } - ILSMIndexAccessor accessor = - lsmIndex.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE); - accessor.scheduleMerge(lsmIndex.getIOOperationCallback(), mergableComponents); } + return mergableComponents; } private int getIndexPartition(ILSMIndex index, Set<IndexInfo> indexInfos) { http://git-wip-us.apache.org/repos/asf/asterixdb/blob/21ed0f72/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetInfo.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetInfo.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetInfo.java index 71d4a96..0df8dcc 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetInfo.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetInfo.java @@ -34,6 +34,7 @@ public class DatasetInfo extends Info implements Comparable<DatasetInfo> { private boolean isRegistered; private boolean memoryAllocated; private boolean durable; + private boolean correlated; public DatasetInfo(int datasetID) { this.indexes = new HashMap<>(); @@ -41,6 +42,7 @@ public class DatasetInfo extends Info implements Comparable<DatasetInfo> { this.datasetID = datasetID; this.setRegistered(false); this.setMemoryAllocated(false); + this.setCorrelated(false); } @Override @@ -195,4 +197,12 @@ public class DatasetInfo extends Info implements Comparable<DatasetInfo> { public void setLastAccess(long lastAccess) { this.lastAccess = lastAccess; } + + public void setCorrelated(boolean correlated) { + this.correlated = correlated; + } + + public boolean isCorrelated() { + return correlated; + } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/21ed0f72/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java index 37bd789..ad9f6a5 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java @@ -223,7 +223,7 @@ public class DatasetLifecycleManager implements IDatasetLifecycleManager, ILifeC if (iInfo.isOpen()) { ILSMIndexAccessor accessor = iInfo.getIndex().createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE); - accessor.scheduleFlush(iInfo.getIndex().getIOOperationCallback()); + accessor.scheduleFlush(iInfo.getIndex().getIOOperationCallback(), null); } // Wait for the above flush op. @@ -417,16 +417,22 @@ public class DatasetLifecycleManager implements IDatasetLifecycleManager, ILifeC } if (asyncFlush) { - for (IndexInfo iInfo : dsInfo.getIndexes().values()) { - ILSMIndexAccessor accessor = - iInfo.getIndex().createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE); - accessor.scheduleFlush(iInfo.getIndex().getIOOperationCallback()); - } + PrimaryIndexOperationTracker.flushDatasetIndexes(dsInfo.getDatsetIndexInfos(), dsInfo.isCorrelated()); } else { + List<IndexInfo> primaryIndexes = new ArrayList<>(); for (IndexInfo iInfo : dsInfo.getIndexes().values()) { - // TODO: This is not efficient since we flush the indexes sequentially. - // Think of a way to allow submitting the flush requests concurrently. We don't do them concurrently because this - // may lead to a deadlock scenario between the DatasetLifeCycleManager and the PrimaryIndexOperationTracker. + if (iInfo.getIndex().isPrimaryIndex()) { + // primary indexes are flushed later to guarantee the correctness of the correlated merge policy + primaryIndexes.add(iInfo); + } else { + // TODO: This is not efficient since we flush the indexes sequentially. + // Think of a way to allow submitting the flush requests concurrently. + // We don't do them concurrently because this may lead to a deadlock scenario + // between the DatasetLifeCycleManager and the PrimaryIndexOperationTracker. + flushAndWaitForIO(dsInfo, iInfo); + } + } + for (IndexInfo iInfo : primaryIndexes) { flushAndWaitForIO(dsInfo, iInfo); } } @@ -591,4 +597,5 @@ public class DatasetLifecycleManager implements IDatasetLifecycleManager, ILifeC } } } + } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/21ed0f72/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java index f2f3b93..5eb3c02 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java @@ -23,6 +23,7 @@ import java.util.Map; import org.apache.asterix.common.dataflow.DatasetLocalResource; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex; +import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndex; import org.apache.hyracks.storage.common.IIndex; import org.apache.hyracks.storage.common.LocalResource; @@ -95,6 +96,10 @@ public class DatasetResource implements Comparable<DatasetResource> { datasetInfo.setExternal(!index.hasMemoryComponents()); datasetInfo.setRegistered(true); datasetInfo.setDurable(((ILSMIndex) index).isDurable()); + //TODO use a general mechanism to set correlated property when we have more + // correlated merge policies + datasetInfo.setCorrelated( + ((AbstractLSMIndex) index).getMergePolicy() instanceof CorrelatedPrefixMergePolicy); } } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/21ed0f72/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java index 67b25b6..4eb1c3a 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java @@ -19,6 +19,9 @@ package org.apache.asterix.common.context; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; @@ -31,6 +34,7 @@ import org.apache.asterix.common.utils.TransactionUtil; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallback; import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent.ComponentState; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor; import org.apache.hyracks.storage.am.lsm.common.api.ILSMMemoryComponent; @@ -141,17 +145,16 @@ public class PrimaryIndexOperationTracker extends BaseOperationTracker { //This method is called sequentially by LogPage.notifyFlushTerminator in the sequence flushes were scheduled. public synchronized void triggerScheduleFlush(LogRecord logRecord) throws HyracksDataException { - for (ILSMIndex lsmIndex : dsInfo.getDatasetIndexes()) { - //get resource - ILSMIndexAccessor accessor = - lsmIndex.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE); + Set<IndexInfo> indexInfos = dsInfo.getDatsetIndexInfos(); + for (IndexInfo iInfo : indexInfos) { //update resource lsn AbstractLSMIOOperationCallback ioOpCallback = - (AbstractLSMIOOperationCallback) lsmIndex.getIOOperationCallback(); + (AbstractLSMIOOperationCallback) iInfo.getIndex().getIOOperationCallback(); ioOpCallback.updateLastLSN(logRecord.getLSN()); - //schedule flush after update - accessor.scheduleFlush(lsmIndex.getIOOperationCallback()); } + + flushDatasetIndexes(indexInfos, dsInfo.isCorrelated()); + flushLogCreated = false; } @@ -198,4 +201,63 @@ public class PrimaryIndexOperationTracker extends BaseOperationTracker { return flushLogCreated; } + public static void flushDatasetIndexes(Set<IndexInfo> indexes, boolean correlated) throws HyracksDataException { + if (!correlated) { + // if not correlated, we simply schedule flushes of each index independently + for (IndexInfo iInfo : indexes) { + ILSMIndex lsmIndex = iInfo.getIndex(); + //get resource + ILSMIndexAccessor accessor = + lsmIndex.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE); + //schedule flush after update + accessor.scheduleFlush(lsmIndex.getIOOperationCallback(), null); + } + } else { + // otherwise, we need to schedule indexes properly s.t. the primary index would depend on + // all secondary indexes in the same partition + + // collect partitions + Set<Integer> partitions = new HashSet<>(); + indexes.forEach(iInfo -> partitions.add(iInfo.getPartition())); + for (Integer partition : partitions) { + flushCorrelatedDatasetIndexes(indexes, partition); + } + + } + } + + private static void flushCorrelatedDatasetIndexes(Set<IndexInfo> indexes, int partition) + throws HyracksDataException { + ILSMIndex primaryIndex = null; + List<ILSMIOOperation> flushOps = new ArrayList<>(); + for (IndexInfo iInfo : indexes) { + if (iInfo.getPartition() != partition) { + continue; + } + ILSMIndex lsmIndex = iInfo.getIndex(); + if (lsmIndex.isPrimaryIndex()) { + primaryIndex = lsmIndex; + } else { + //get resource + ILSMIndexAccessor accessor = + lsmIndex.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE); + //schedule flush + ILSMIOOperation flushOp = accessor.scheduleFlush(lsmIndex.getIOOperationCallback(), null); + if (flushOp != null) { + flushOps.add(flushOp); + } + } + } + + if (primaryIndex != null) { + //get resource + ILSMIndexAccessor accessor = + primaryIndex.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE); + //schedule flush after update + accessor.scheduleFlush(primaryIndex.getIOOperationCallback(), flushOps); + + } + + } + } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/21ed0f72/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/context/CorrelatedPrefixMergePolicyTest.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/context/CorrelatedPrefixMergePolicyTest.java b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/context/CorrelatedPrefixMergePolicyTest.java index 9f071bb..ee795d5 100644 --- a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/context/CorrelatedPrefixMergePolicyTest.java +++ b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/context/CorrelatedPrefixMergePolicyTest.java @@ -236,7 +236,7 @@ public class CorrelatedPrefixMergePolicyTest extends TestCase { return null; } }).when(accessor).scheduleMerge(Mockito.any(ILSMIOOperationCallback.class), - Mockito.anyListOf(ILSMDiskComponent.class)); + Mockito.anyListOf(ILSMDiskComponent.class), Mockito.any()); Mockito.when(index.createAccessor(Mockito.any(IModificationOperationCallback.class), Mockito.any(ISearchOperationCallback.class))).thenReturn(accessor); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/21ed0f72/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTree.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTree.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTree.java index 3775985..f5c3013 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTree.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTree.java @@ -56,6 +56,7 @@ import org.apache.hyracks.storage.am.lsm.common.impls.ExternalIndexHarness; import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentFileReferences; import org.apache.hyracks.storage.am.lsm.common.impls.LSMTreeIndexAccessor; import org.apache.hyracks.storage.am.lsm.common.impls.LSMTreeIndexAccessor.ICursorFactory; +import org.apache.hyracks.storage.am.lsm.common.impls.MergeOperation; import org.apache.hyracks.storage.am.lsm.common.impls.TreeIndexFactory; import org.apache.hyracks.storage.common.IIndexBulkLoader; import org.apache.hyracks.storage.common.IIndexCursor; @@ -170,7 +171,7 @@ public class ExternalBTree extends LSMBTree implements ITwoPCIndex { // The only reason to override the following method is that it uses a different context object // in addition, determining whether or not to keep deleted tuples is different here @Override - public void scheduleMerge(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback) + public ILSMIOOperation scheduleMerge(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback) throws HyracksDataException { ExternalBTreeOpContext opCtx = createOpContext(NoOpOperationCallback.INSTANCE, -1); opCtx.setOperation(IndexOperation.MERGE); @@ -195,9 +196,11 @@ public class ExternalBTree extends LSMBTree implements ITwoPCIndex { LSMComponentFileReferences relMergeFileRefs = fileManager.getRelMergeFileReference(firstFile.getFile().getName(), lastFile.getFile().getName()); ILSMIndexAccessor accessor = new LSMTreeIndexAccessor(getLsmHarness(), opCtx, cursorFactory); - ioScheduler.scheduleOperation(new LSMBTreeMergeOperation(accessor, cursor, + MergeOperation mergeOp = new LSMBTreeMergeOperation(accessor, cursor, relMergeFileRefs.getInsertIndexFileReference(), relMergeFileRefs.getBloomFilterFileReference(), - callback, fileManager.getBaseDir().getAbsolutePath())); + callback, fileManager.getBaseDir().getAbsolutePath(), ctx.getDependingOps()); + ioScheduler.scheduleOperation(mergeOp); + return mergeOp; } // This function should only be used when a transaction fail. it doesn't @@ -369,7 +372,7 @@ public class ExternalBTree extends LSMBTree implements ITwoPCIndex { // Not supported @Override - public void scheduleFlush(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback) + public ILSMIOOperation scheduleFlush(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback) throws HyracksDataException { throw new UnsupportedOperationException("flush not supported in LSM-Disk-Only-BTree"); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/21ed0f72/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTreeWithBuddy.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTreeWithBuddy.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTreeWithBuddy.java index ff17905..071a4bd 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTreeWithBuddy.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTreeWithBuddy.java @@ -319,7 +319,7 @@ public class ExternalBTreeWithBuddy extends AbstractLSMIndex implements ITreeInd } @Override - public void scheduleFlush(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback) + public ILSMIOOperation scheduleFlush(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback) throws HyracksDataException { throw HyracksDataException.create(ErrorCode.FLUSH_NOT_SUPPORTED_IN_EXTERNAL_INDEX); } @@ -342,7 +342,7 @@ public class ExternalBTreeWithBuddy extends AbstractLSMIndex implements ITreeInd } @Override - public void scheduleMerge(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback) + public ILSMIOOperation scheduleMerge(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback) throws HyracksDataException { ILSMIndexOperationContext bctx = createOpContext(NoOpOperationCallback.INSTANCE, 0); bctx.setOperation(IndexOperation.MERGE); @@ -363,11 +363,12 @@ public class ExternalBTreeWithBuddy extends AbstractLSMIndex implements ITreeInd keepDeleteTuples = mergingComponents.get(mergingComponents.size() - 1) != secondDiskComponents .get(secondDiskComponents.size() - 1); } - - ioScheduler.scheduleOperation( + ILSMIOOperation mergeOp = new LSMBTreeWithBuddyMergeOperation(accessor, cursor, relMergeFileRefs.getInsertIndexFileReference(), relMergeFileRefs.getDeleteIndexFileReference(), relMergeFileRefs.getBloomFilterFileReference(), - callback, fileManager.getBaseDir().getAbsolutePath(), keepDeleteTuples)); + callback, fileManager.getBaseDir().getAbsolutePath(), keepDeleteTuples, ctx.getDependingOps()); + ioScheduler.scheduleOperation(mergeOp); + return mergeOp; } // This method creates the appropriate opContext for the targeted version http://git-wip-us.apache.org/repos/asf/asterixdb/blob/21ed0f72/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTree.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTree.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTree.java index c7d45e1..00a489e 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTree.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTree.java @@ -462,7 +462,8 @@ public class LSMBTree extends AbstractLSMIndex implements ITreeIndex { LSMComponentFileReferences componentFileRefs, ILSMIOOperationCallback callback) { ILSMIndexAccessor accessor = createAccessor(opCtx); return new LSMBTreeFlushOperation(accessor, componentFileRefs.getInsertIndexFileReference(), - componentFileRefs.getBloomFilterFileReference(), callback, fileManager.getBaseDir().getAbsolutePath()); + componentFileRefs.getBloomFilterFileReference(), callback, fileManager.getBaseDir().getAbsolutePath(), + opCtx.getDependingOps()); } @Override @@ -611,6 +612,7 @@ public class LSMBTree extends AbstractLSMIndex implements ITreeIndex { } ITreeIndexCursor cursor = new LSMBTreeRangeSearchCursor(opCtx, returnDeletedTuples); return new LSMBTreeMergeOperation(accessor, cursor, mergeFileRefs.getInsertIndexFileReference(), - mergeFileRefs.getBloomFilterFileReference(), callback, fileManager.getBaseDir().getAbsolutePath()); + mergeFileRefs.getBloomFilterFileReference(), callback, fileManager.getBaseDir().getAbsolutePath(), + opCtx.getDependingOps()); } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/21ed0f72/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeFlushOperation.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeFlushOperation.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeFlushOperation.java index e3424e5..92b53de 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeFlushOperation.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeFlushOperation.java @@ -18,7 +18,10 @@ */ package org.apache.hyracks.storage.am.lsm.btree.impls; +import java.util.List; + import org.apache.hyracks.api.io.FileReference; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor; import org.apache.hyracks.storage.am.lsm.common.impls.FlushOperation; @@ -27,8 +30,9 @@ public class LSMBTreeFlushOperation extends FlushOperation { private final FileReference bloomFilterFlushTarget; public LSMBTreeFlushOperation(ILSMIndexAccessor accessor, FileReference flushTarget, - FileReference bloomFilterFlushTarget, ILSMIOOperationCallback callback, String indexIdentifier) { - super(accessor, flushTarget, callback, indexIdentifier); + FileReference bloomFilterFlushTarget, ILSMIOOperationCallback callback, String indexIdentifier, + List<ILSMIOOperation> dependingOps) { + super(accessor, flushTarget, callback, indexIdentifier, dependingOps); this.bloomFilterFlushTarget = bloomFilterFlushTarget; } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/21ed0f72/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeMergeOperation.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeMergeOperation.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeMergeOperation.java index ec96303..40ac7b1 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeMergeOperation.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeMergeOperation.java @@ -19,8 +19,11 @@ package org.apache.hyracks.storage.am.lsm.btree.impls; +import java.util.List; + import org.apache.hyracks.api.io.FileReference; import org.apache.hyracks.storage.am.common.api.ITreeIndexCursor; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor; import org.apache.hyracks.storage.am.lsm.common.impls.MergeOperation; @@ -30,8 +33,9 @@ public class LSMBTreeMergeOperation extends MergeOperation { private final FileReference bloomFilterMergeTarget; public LSMBTreeMergeOperation(ILSMIndexAccessor accessor, ITreeIndexCursor cursor, FileReference target, - FileReference bloomFilterMergeTarget, ILSMIOOperationCallback callback, String indexIdentifier) { - super(accessor, target, callback, indexIdentifier, cursor); + FileReference bloomFilterMergeTarget, ILSMIOOperationCallback callback, String indexIdentifier, + List<ILSMIOOperation> dependingOps) { + super(accessor, target, callback, indexIdentifier, cursor, dependingOps); this.bloomFilterMergeTarget = bloomFilterMergeTarget; } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/21ed0f72/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeWithBuddyMergeOperation.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeWithBuddyMergeOperation.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeWithBuddyMergeOperation.java index f682bde..e0c1512 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeWithBuddyMergeOperation.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeWithBuddyMergeOperation.java @@ -18,8 +18,11 @@ */ package org.apache.hyracks.storage.am.lsm.btree.impls; +import java.util.List; + import org.apache.hyracks.api.io.FileReference; import org.apache.hyracks.storage.am.common.api.ITreeIndexCursor; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor; import org.apache.hyracks.storage.am.lsm.common.impls.MergeOperation; @@ -32,8 +35,8 @@ public class LSMBTreeWithBuddyMergeOperation extends MergeOperation { public LSMBTreeWithBuddyMergeOperation(ILSMIndexAccessor accessor, ITreeIndexCursor cursor, FileReference target, FileReference buddyBtreeMergeTarget, FileReference bloomFilterMergeTarget, ILSMIOOperationCallback callback, - String indexIdentifier, boolean keepDeletedTuples) { - super(accessor, target, callback, indexIdentifier, cursor); + String indexIdentifier, boolean keepDeletedTuples, List<ILSMIOOperation> dependingOps) { + super(accessor, target, callback, indexIdentifier, cursor, dependingOps); this.buddyBtreeMergeTarget = buddyBtreeMergeTarget; this.bloomFilterMergeTarget = bloomFilterMergeTarget; this.keepDeletedTuples = keepDeletedTuples; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/21ed0f72/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMHarness.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMHarness.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMHarness.java index 89c8cb9..380b2f2 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMHarness.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMHarness.java @@ -105,10 +105,12 @@ public interface ILSMHarness { * * @param ctx * @param callback + * @return The scheduled merge operation, used for the caller to track its status * @throws HyracksDataException * @throws IndexException */ - void scheduleMerge(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback) throws HyracksDataException; + ILSMIOOperation scheduleMerge(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback) + throws HyracksDataException; /** * Schedule full merge @@ -135,9 +137,11 @@ public interface ILSMHarness { * * @param ctx * @param callback + * @return The scheduled flush operation, used for the caller to track its status * @throws HyracksDataException */ - void scheduleFlush(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback) throws HyracksDataException; + ILSMIOOperation scheduleFlush(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback) + throws HyracksDataException; /** * Perform a flush http://git-wip-us.apache.org/repos/asf/asterixdb/blob/21ed0f72/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIOOperation.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIOOperation.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIOOperation.java index c2ae786..ff1613b 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIOOperation.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIOOperation.java @@ -18,6 +18,7 @@ */ package org.apache.hyracks.storage.am.lsm.common.api; +import java.util.List; import java.util.concurrent.Callable; import org.apache.hyracks.api.exceptions.HyracksDataException; @@ -63,7 +64,19 @@ public interface ILSMIOOperation extends Callable<Boolean> { FileReference getTarget(); /** + * <<<<<<< HEAD + * * @return the accessor of the operation */ ILSMIndexAccessor getAccessor(); + + /** + * @return whether this operation has finished + */ + boolean isFinished(); + + /** + * @return a list of operations that this operation depends on + */ + List<ILSMIOOperation> getDependingOps(); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/21ed0f72/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndex.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndex.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndex.java index addeb27..5f43bcd 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndex.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndex.java @@ -67,11 +67,13 @@ public interface ILSMIndex extends IIndex { public void scanDiskComponents(ILSMIndexOperationContext ctx, IIndexCursor cursor) throws HyracksDataException; - void scheduleFlush(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback) throws HyracksDataException; + ILSMIOOperation scheduleFlush(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback) + throws HyracksDataException; ILSMDiskComponent flush(ILSMIOOperation operation) throws HyracksDataException; - void scheduleMerge(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback) throws HyracksDataException; + ILSMIOOperation scheduleMerge(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback) + throws HyracksDataException; ILSMDiskComponent merge(ILSMIOOperation operation) throws HyracksDataException; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/21ed0f72/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndexAccessor.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndexAccessor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndexAccessor.java index b8d64af..b303a39 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndexAccessor.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndexAccessor.java @@ -46,9 +46,13 @@ public interface ILSMIndexAccessor extends IIndexAccessor { * * @param callback * the IO operation callback + * @param dependingOps + * other operations that this operation depends on + * @return The scheduled flush operation * @throws HyracksDataException */ - void scheduleFlush(ILSMIOOperationCallback callback) throws HyracksDataException; + ILSMIOOperation scheduleFlush(ILSMIOOperationCallback callback, List<ILSMIOOperation> dependingOps) + throws HyracksDataException; /** * Schedule a merge operation @@ -57,11 +61,13 @@ public interface ILSMIndexAccessor extends IIndexAccessor { * the merge operation callback * @param components * the components to be merged + * @param dependingOps + * other operations that this operation depends on + * @return The scheduled merge operation * @throws HyracksDataException - * @throws IndexException */ - void scheduleMerge(ILSMIOOperationCallback callback, List<ILSMDiskComponent> components) - throws HyracksDataException; + ILSMIOOperation scheduleMerge(ILSMIOOperationCallback callback, List<ILSMDiskComponent> components, + List<ILSMIOOperation> dependingOps) throws HyracksDataException; /** * Schedule a full merge http://git-wip-us.apache.org/repos/asf/asterixdb/blob/21ed0f72/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndexOperationContext.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndexOperationContext.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndexOperationContext.java index 5b0378a..66af93f 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndexOperationContext.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndexOperationContext.java @@ -56,4 +56,8 @@ public interface ILSMIndexOperationContext extends IIndexOperationContext { PermutingTupleReference getFilterTuple(); MultiComparator getFilterCmp(); + + List<ILSMIOOperation> getDependingOps(); + + void setDependingOps(List<ILSMIOOperation> dependingOps); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/21ed0f72/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractIoOperation.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractIoOperation.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractIoOperation.java index aee46f0..1d13c94 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractIoOperation.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractIoOperation.java @@ -18,6 +18,10 @@ */ package org.apache.hyracks.storage.am.lsm.common.impls; +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.io.FileReference; import org.apache.hyracks.api.io.IODeviceHandle; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation; @@ -30,13 +34,32 @@ public abstract class AbstractIoOperation implements ILSMIOOperation { protected final FileReference target; protected final ILSMIOOperationCallback callback; protected final String indexIdentifier; + protected final List<ILSMIOOperation> dependingOps; + + protected AtomicBoolean isFinished = new AtomicBoolean(false); public AbstractIoOperation(ILSMIndexAccessor accessor, FileReference target, ILSMIOOperationCallback callback, - String indexIdentifier) { + String indexIdentifier, List<ILSMIOOperation> dependingOps) { this.accessor = accessor; this.target = target; this.callback = callback; this.indexIdentifier = indexIdentifier; + this.dependingOps = dependingOps; + } + + protected abstract void callInternal() throws HyracksDataException; + + @Override + public Boolean call() throws HyracksDataException { + try { + callInternal(); + } finally { + synchronized (this) { + isFinished.set(true); + notifyAll(); + } + } + return true; } @Override @@ -63,4 +86,14 @@ public abstract class AbstractIoOperation implements ILSMIOOperation { public String getIndexIdentifier() { return indexIdentifier; } + + @Override + public List<ILSMIOOperation> getDependingOps() { + return dependingOps; + } + + @Override + public boolean isFinished() { + return isFinished.get(); + } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/21ed0f72/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java index dc64f9b..e7b21cb 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java @@ -86,6 +86,7 @@ public abstract class AbstractLSMIndex implements ILSMIndex { protected final int[] treeFields; protected final int[] filterFields; protected final boolean durable; + protected final ILSMMergePolicy mergePolicy; protected boolean isActive; protected final AtomicBoolean[] flushRequests; protected boolean memoryComponentsAllocated = false; @@ -113,6 +114,7 @@ public abstract class AbstractLSMIndex implements ILSMIndex { this.inactiveDiskComponents = new LinkedList<>(); this.durable = durable; this.tracer = tracer; + this.mergePolicy = mergePolicy; lsmHarness = new LSMHarness(this, mergePolicy, opTracker, diskBufferCache.isReplicationEnabled(), tracer); isActive = false; diskComponents = new ArrayList<>(); @@ -135,6 +137,7 @@ public abstract class AbstractLSMIndex implements ILSMIndex { this.ioScheduler = ioScheduler; this.ioOpCallback = ioOpCallback; this.durable = durable; + this.mergePolicy = mergePolicy; lsmHarness = new ExternalIndexHarness(this, mergePolicy, opTracker, diskBufferCache.isReplicationEnabled()); isActive = false; diskComponents = new LinkedList<>(); @@ -199,7 +202,7 @@ public abstract class AbstractLSMIndex implements ILSMIndex { protected void flushMemoryComponent() throws HyracksDataException { BlockingIOOperationCallbackWrapper cb = new BlockingIOOperationCallbackWrapper(ioOpCallback); ILSMIndexAccessor accessor = createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE); - accessor.scheduleFlush(cb); + accessor.scheduleFlush(cb, null); try { cb.waitForIO(); } catch (InterruptedException e) { @@ -326,19 +329,21 @@ public abstract class AbstractLSMIndex implements ILSMIndex { } @Override - public void scheduleFlush(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback) + public ILSMIOOperation scheduleFlush(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback) throws HyracksDataException { LSMComponentFileReferences componentFileRefs = fileManager.getRelFlushFileReference(); AbstractLSMIndexOperationContext opCtx = createOpContext(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE); opCtx.setOperation(ctx.getOperation()); opCtx.getComponentHolder().addAll(ctx.getComponentHolder()); + opCtx.setDependingOps(ctx.getDependingOps()); ILSMIOOperation flushOp = createFlushOperation(opCtx, componentFileRefs, callback); ioScheduler.scheduleOperation(TracedIOOperation.wrap(flushOp, tracer)); + return flushOp; } @Override - public void scheduleMerge(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback) + public ILSMIOOperation scheduleMerge(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback) throws HyracksDataException { List<ILSMComponent> mergingComponents = ctx.getComponentHolder(); // merge must create a different op ctx @@ -346,11 +351,13 @@ public abstract class AbstractLSMIndex implements ILSMIndex { createOpContext(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE); opCtx.setOperation(ctx.getOperation()); opCtx.getComponentHolder().addAll(mergingComponents); + opCtx.setDependingOps(ctx.getDependingOps()); ILSMDiskComponent firstComponent = (ILSMDiskComponent) mergingComponents.get(0); ILSMDiskComponent lastComponent = (ILSMDiskComponent) mergingComponents.get(mergingComponents.size() - 1); LSMComponentFileReferences mergeFileRefs = getMergeFileReferences(firstComponent, lastComponent); - ILSMIOOperation mergeOp = createMergeOperation(opCtx, mergeFileRefs, callback); + MergeOperation mergeOp = (MergeOperation) createMergeOperation(opCtx, mergeFileRefs, callback); ioScheduler.scheduleOperation(TracedIOOperation.wrap(mergeOp, tracer)); + return mergeOp; } private void addOperationalMutableComponents(List<ILSMComponent> operationalComponents) { @@ -628,6 +635,10 @@ public abstract class AbstractLSMIndex implements ILSMIndex { : doMerge(operation); } + public ILSMMergePolicy getMergePolicy() { + return mergePolicy; + } + public abstract Set<String> getLSMComponentPhysicalFiles(ILSMComponent newComponent); protected abstract void allocateMemoryComponent(ILSMMemoryComponent c) throws HyracksDataException; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/21ed0f72/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndexOperationContext.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndexOperationContext.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndexOperationContext.java index 065d465..5fdeafd 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndexOperationContext.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndexOperationContext.java @@ -27,6 +27,7 @@ import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation; import org.apache.hyracks.storage.am.common.tuples.PermutingTupleReference; import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent; import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexOperationContext; import org.apache.hyracks.storage.common.IModificationOperationCallback; import org.apache.hyracks.storage.common.ISearchOperationCallback; @@ -47,6 +48,7 @@ public abstract class AbstractLSMIndexOperationContext implements ILSMIndexOpera protected IndexOperation op; protected boolean accessingComponents = false; protected ISearchPredicate searchPredicate; + protected final List<ILSMIOOperation> dependingOps; public AbstractLSMIndexOperationContext(int[] treeFields, int[] filterFields, IBinaryComparatorFactory[] filterCmpFactories, ISearchOperationCallback searchCallback, @@ -56,6 +58,7 @@ public abstract class AbstractLSMIndexOperationContext implements ILSMIndexOpera this.componentHolder = new LinkedList<>(); this.componentsToBeMerged = new LinkedList<>(); this.componentsToBeReplicated = new LinkedList<>(); + this.dependingOps = new LinkedList<>(); if (filterFields != null) { indexTuple = new PermutingTupleReference(treeFields); filterCmp = MultiComparator.create(filterCmpFactories); @@ -153,4 +156,17 @@ public abstract class AbstractLSMIndexOperationContext implements ILSMIndexOpera public ISearchPredicate getSearchPredicate() { return searchPredicate; } + + @Override + public List<ILSMIOOperation> getDependingOps() { + return dependingOps; + } + + @Override + public void setDependingOps(List<ILSMIOOperation> dependingOps) { + this.dependingOps.clear(); + if (dependingOps != null) { + this.dependingOps.addAll(dependingOps); + } + } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/21ed0f72/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ConstantMergePolicy.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ConstantMergePolicy.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ConstantMergePolicy.java index 847b882..7ac9bfb 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ConstantMergePolicy.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ConstantMergePolicy.java @@ -49,7 +49,7 @@ public class ConstantMergePolicy implements ILSMMergePolicy { } else if (immutableComponents.size() >= numComponents) { ILSMIndexAccessor accessor = index.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE); - accessor.scheduleMerge(index.getIOOperationCallback(), immutableComponents); + accessor.scheduleMerge(index.getIOOperationCallback(), immutableComponents, null); } } @@ -106,7 +106,7 @@ public class ConstantMergePolicy implements ILSMMergePolicy { } ILSMIndexAccessor accessor = index.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE); - accessor.scheduleMerge(index.getIOOperationCallback(), immutableComponents); + accessor.scheduleMerge(index.getIOOperationCallback(), immutableComponents, null); return true; } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/21ed0f72/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ExternalIndexHarness.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ExternalIndexHarness.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ExternalIndexHarness.java index 2f65b18..b93d943 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ExternalIndexHarness.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ExternalIndexHarness.java @@ -203,13 +203,13 @@ public class ExternalIndexHarness extends LSMHarness { } @Override - public void scheduleMerge(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback) + public ILSMIOOperation scheduleMerge(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback) throws HyracksDataException { if (!getAndEnterComponents(ctx, LSMOperationType.MERGE, true)) { callback.afterFinalize(LSMOperationType.MERGE, null); - return; + return null; } - lsmIndex.scheduleMerge(ctx, callback); + return lsmIndex.scheduleMerge(ctx, callback); } @Override @@ -297,9 +297,10 @@ public class ExternalIndexHarness extends LSMHarness { } @Override - public void scheduleFlush(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback) + public ILSMIOOperation scheduleFlush(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback) throws HyracksDataException { callback.afterFinalize(LSMOperationType.FLUSH, null); + return null; } @Override http://git-wip-us.apache.org/repos/asf/asterixdb/blob/21ed0f72/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/FlushOperation.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/FlushOperation.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/FlushOperation.java index 7b7f950..750c690 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/FlushOperation.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/FlushOperation.java @@ -18,6 +18,7 @@ */ package org.apache.hyracks.storage.am.lsm.common.impls; +import java.util.List; import java.util.Objects; import org.apache.hyracks.api.exceptions.HyracksDataException; @@ -30,14 +31,13 @@ import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor; public class FlushOperation extends AbstractIoOperation implements Comparable<ILSMIOOperation> { public FlushOperation(ILSMIndexAccessor accessor, FileReference target, ILSMIOOperationCallback callback, - String indexIdentifier) { - super(accessor, target, callback, indexIdentifier); + String indexIdentifier, List<ILSMIOOperation> dependingOps) { + super(accessor, target, callback, indexIdentifier, dependingOps); } @Override - public Boolean call() throws HyracksDataException { + protected void callInternal() throws HyracksDataException { accessor.flush(this); - return true; } @Override http://git-wip-us.apache.org/repos/asf/asterixdb/blob/21ed0f72/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java index 1069f8f..bc3a5a1 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java @@ -500,13 +500,13 @@ public class LSMHarness implements ILSMHarness { } @Override - public void scheduleFlush(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback) + public ILSMIOOperation scheduleFlush(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback) throws HyracksDataException { if (!getAndEnterComponents(ctx, LSMOperationType.FLUSH, true)) { callback.afterFinalize(LSMOperationType.FLUSH, null); - return; + return null; } - lsmIndex.scheduleFlush(ctx, callback); + return lsmIndex.scheduleFlush(ctx, callback); } @Override @@ -519,6 +519,7 @@ public class LSMHarness implements ILSMHarness { boolean failedOperation = false; try { newComponent = lsmIndex.flush(operation); + waitForDependingOps(operation); operation.getCallback().afterOperation(LSMOperationType.FLUSH, null, newComponent); newComponent.markAsValid(lsmIndex.isDurable()); } catch (Throwable e) { @@ -537,13 +538,13 @@ public class LSMHarness implements ILSMHarness { } @Override - public void scheduleMerge(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback) + public ILSMIOOperation scheduleMerge(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback) throws HyracksDataException { if (!getAndEnterComponents(ctx, LSMOperationType.MERGE, true)) { callback.afterFinalize(LSMOperationType.MERGE, null); - return; + return null; } - lsmIndex.scheduleMerge(ctx, callback); + return lsmIndex.scheduleMerge(ctx, callback); } @Override @@ -570,6 +571,7 @@ public class LSMHarness implements ILSMHarness { boolean failedOperation = false; try { newComponent = lsmIndex.merge(operation); + waitForDependingOps(operation); operation.getCallback().afterOperation(LSMOperationType.MERGE, ctx.getComponentHolder(), newComponent); newComponent.markAsValid(lsmIndex.isDurable()); } catch (Throwable e) { @@ -754,6 +756,32 @@ public class LSMHarness implements ILSMHarness { } } + /** + * Wait for depending operations to finish. + * + * @param op + */ + private void waitForDependingOps(ILSMIOOperation op) throws HyracksDataException { + List<ILSMIOOperation> dependingOps = op.getDependingOps(); + if (dependingOps == null) { + return; + } + for (ILSMIOOperation dependingOp : dependingOps) { + if (dependingOp != null && !dependingOp.isFinished()) { + synchronized (dependingOp) { + while (!dependingOp.isFinished()) { + try { + dependingOp.wait(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw HyracksDataException.create(e); + } + } + } + } + } + } + @Override public void deleteComponents(ILSMIndexOperationContext ctx, Predicate<ILSMComponent> predicate) throws HyracksDataException { http://git-wip-us.apache.org/repos/asf/asterixdb/blob/21ed0f72/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java index c0fd443..f008fde 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java @@ -135,18 +135,21 @@ public class LSMTreeIndexAccessor implements ILSMIndexAccessor { } @Override - public void scheduleFlush(ILSMIOOperationCallback callback) throws HyracksDataException { + public ILSMIOOperation scheduleFlush(ILSMIOOperationCallback callback, List<ILSMIOOperation> dependingOps) + throws HyracksDataException { ctx.setOperation(IndexOperation.FLUSH); - lsmHarness.scheduleFlush(ctx, callback); + ctx.setDependingOps(dependingOps); + return lsmHarness.scheduleFlush(ctx, callback); } @Override - public void scheduleMerge(ILSMIOOperationCallback callback, List<ILSMDiskComponent> components) - throws HyracksDataException { + public ILSMIOOperation scheduleMerge(ILSMIOOperationCallback callback, List<ILSMDiskComponent> components, + List<ILSMIOOperation> dependingOps) throws HyracksDataException { ctx.setOperation(IndexOperation.MERGE); ctx.getComponentsToBeMerged().clear(); ctx.getComponentsToBeMerged().addAll(components); - lsmHarness.scheduleMerge(ctx, callback); + ctx.setDependingOps(dependingOps); + return lsmHarness.scheduleMerge(ctx, callback); } @Override http://git-wip-us.apache.org/repos/asf/asterixdb/blob/21ed0f72/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/MergeOperation.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/MergeOperation.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/MergeOperation.java index c83d534..2210fd0 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/MergeOperation.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/MergeOperation.java @@ -23,6 +23,7 @@ import java.util.List; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.io.FileReference; import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor; import org.apache.hyracks.storage.common.IIndexCursor; @@ -31,8 +32,8 @@ public class MergeOperation extends AbstractIoOperation { protected final IIndexCursor cursor; public MergeOperation(ILSMIndexAccessor accessor, FileReference target, ILSMIOOperationCallback callback, - String indexIdentifier, IIndexCursor cursor) { - super(accessor, target, callback, indexIdentifier); + String indexIdentifier, IIndexCursor cursor, List<ILSMIOOperation> dependingOps) { + super(accessor, target, callback, indexIdentifier, dependingOps); this.cursor = cursor; } @@ -41,12 +42,6 @@ public class MergeOperation extends AbstractIoOperation { } @Override - public Boolean call() throws HyracksDataException { - accessor.merge(this); - return true; - } - - @Override public LSMIOOpertionType getIOOpertionType() { return LSMIOOpertionType.MERGE; } @@ -54,4 +49,10 @@ public class MergeOperation extends AbstractIoOperation { public IIndexCursor getCursor() { return cursor; } + + @Override + protected void callInternal() throws HyracksDataException { + accessor.merge(this); + + } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/21ed0f72/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/PrefixMergePolicy.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/PrefixMergePolicy.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/PrefixMergePolicy.java index 7d7266e..f159232 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/PrefixMergePolicy.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/PrefixMergePolicy.java @@ -247,7 +247,7 @@ public class PrefixMergePolicy implements ILSMMergePolicy { Collections.reverse(mergableComponents); ILSMIndexAccessor accessor = index.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE); - accessor.scheduleMerge(index.getIOOperationCallback(), mergableComponents); + accessor.scheduleMerge(index.getIOOperationCallback(), mergableComponents, null); } /** http://git-wip-us.apache.org/repos/asf/asterixdb/blob/21ed0f72/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ThreadCountingTracker.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ThreadCountingTracker.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ThreadCountingTracker.java index 85081a1..7cdcc52 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ThreadCountingTracker.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ThreadCountingTracker.java @@ -59,7 +59,7 @@ public class ThreadCountingTracker implements ILSMOperationTracker { && index.hasFlushRequestForCurrentMutableComponent()) { ILSMIndexAccessor accessor = index.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE); - accessor.scheduleFlush(NoOpIOOperationCallbackFactory.INSTANCE.createIoOpCallback()); + accessor.scheduleFlush(NoOpIOOperationCallbackFactory.INSTANCE.createIoOpCallback(), null); } } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/21ed0f72/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/TracedIOOperation.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/TracedIOOperation.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/TracedIOOperation.java index 9cc8022..3e35e51 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/TracedIOOperation.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/TracedIOOperation.java @@ -19,6 +19,7 @@ package org.apache.hyracks.storage.am.lsm.common.impls; +import java.util.List; import java.util.logging.Logger; import org.apache.hyracks.api.exceptions.HyracksDataException; @@ -100,6 +101,16 @@ class TracedIOOperation implements ILSMIOOperation { public ILSMIndexAccessor getAccessor() { return ioOp.getAccessor(); } + + @Override + public boolean isFinished() { + return ioOp.isFinished(); + } + + @Override + public List<ILSMIOOperation> getDependingOps() { + return ioOp.getDependingOps(); + } } class ComparableTracedIOOperation extends TracedIOOperation implements Comparable<ILSMIOOperation> { http://git-wip-us.apache.org/repos/asf/asterixdb/blob/21ed0f72/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java index eb3924c..60da50a 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java @@ -691,17 +691,20 @@ public class LSMInvertedIndex extends AbstractLSMIndex implements IInvertedIndex LSMComponentFileReferences componentFileRefs, ILSMIOOperationCallback callback) throws HyracksDataException { return new LSMInvertedIndexFlushOperation(new LSMInvertedIndexAccessor(getLsmHarness(), opCtx), - componentFileRefs.getInsertIndexFileReference(), componentFileRefs.getDeleteIndexFileReference(), - componentFileRefs.getBloomFilterFileReference(), callback, fileManager.getBaseDir().getAbsolutePath()); + componentFileRefs.getInsertIndexFileReference(), + componentFileRefs.getDeleteIndexFileReference(), componentFileRefs.getBloomFilterFileReference(), + callback, fileManager.getBaseDir().getAbsolutePath(), opCtx.getDependingOps()); } @Override protected ILSMIOOperation createMergeOperation(AbstractLSMIndexOperationContext opCtx, - LSMComponentFileReferences mergeFileRefs, ILSMIOOperationCallback callback) throws HyracksDataException { + LSMComponentFileReferences mergeFileRefs, + ILSMIOOperationCallback callback) throws HyracksDataException { ILSMIndexAccessor accessor = new LSMInvertedIndexAccessor(getLsmHarness(), opCtx); IIndexCursor cursor = new LSMInvertedIndexRangeSearchCursor(opCtx); - return new LSMInvertedIndexMergeOperation(accessor, cursor, mergeFileRefs.getInsertIndexFileReference(), - mergeFileRefs.getDeleteIndexFileReference(), mergeFileRefs.getBloomFilterFileReference(), callback, - fileManager.getBaseDir().getAbsolutePath()); + return new LSMInvertedIndexMergeOperation(accessor, cursor, + mergeFileRefs.getInsertIndexFileReference(), mergeFileRefs.getDeleteIndexFileReference(), + mergeFileRefs.getBloomFilterFileReference(), callback, fileManager.getBaseDir().getAbsolutePath(), + opCtx.getDependingOps()); } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/21ed0f72/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexAccessor.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexAccessor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexAccessor.java index 61fc84e..242bc83 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexAccessor.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexAccessor.java @@ -85,9 +85,11 @@ public class LSMInvertedIndexAccessor implements ILSMIndexAccessor, IInvertedInd } @Override - public void scheduleFlush(ILSMIOOperationCallback callback) throws HyracksDataException { + public ILSMIOOperation scheduleFlush(ILSMIOOperationCallback callback, List<ILSMIOOperation> dependingOps) + throws HyracksDataException { ctx.setOperation(IndexOperation.FLUSH); - lsmHarness.scheduleFlush(ctx, callback); + ctx.setDependingOps(dependingOps); + return lsmHarness.scheduleFlush(ctx, callback); } @Override @@ -96,12 +98,13 @@ public class LSMInvertedIndexAccessor implements ILSMIndexAccessor, IInvertedInd } @Override - public void scheduleMerge(ILSMIOOperationCallback callback, List<ILSMDiskComponent> components) - throws HyracksDataException { + public ILSMIOOperation scheduleMerge(ILSMIOOperationCallback callback, List<ILSMDiskComponent> components, + List<ILSMIOOperation> dependingOps) throws HyracksDataException { ctx.setOperation(IndexOperation.MERGE); ctx.getComponentsToBeMerged().clear(); ctx.getComponentsToBeMerged().addAll(components); - lsmHarness.scheduleMerge(ctx, callback); + ctx.setDependingOps(dependingOps); + return lsmHarness.scheduleMerge(ctx, callback); } @Override @@ -116,6 +119,7 @@ public class LSMInvertedIndexAccessor implements ILSMIndexAccessor, IInvertedInd @Override public void scheduleFullMerge(ILSMIOOperationCallback callback) throws HyracksDataException { ctx.setOperation(IndexOperation.FULL_MERGE); + ctx.setDependingOps(null); lsmHarness.scheduleFullMerge(ctx, callback); }