Murtadha Hubail has submitted this change and it was merged. Change subject: Introducing data replication API to LSM indexes ......................................................................
Introducing data replication API to LSM indexes Change-Id: I80565fc9d74e30440d2df5917911904ba8f33c25 Reviewed-on: https://asterix-gerrit.ics.uci.edu/322 Tested-by: Jenkins <[email protected]> Reviewed-by: abdullah alamoudi <[email protected]> --- A hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/replication/IIOReplicationManager.java A hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/replication/IReplicationJob.java A hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/replication/impl/AbstractReplicationJob.java M hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/api/IIndexLifecycleManager.java M hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/frames/LIFOMetaDataFrame.java M hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/ophelpers/IndexOperation.java M hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/util/IndexFileNameUtil.java M hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/ExternalBTree.java M hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/ExternalBTreeOpContext.java M hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/ExternalBTreeWithBuddy.java M hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/ExternalBTreeWithBuddyOpContext.java M hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTree.java M hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeOpContext.java M hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMHarness.java M hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIndexAccessor.java M hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIndexInternal.java M hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIndexOperationContext.java A hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIndexReplicationJob.java M hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/AbstractDiskLSMComponent.java M hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java M hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/AbstractMemoryLSMComponent.java M hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/ExternalIndexHarness.java M hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMHarness.java A hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMIndexReplicationJob.java M hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMOperationType.java M hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java M hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/MultitenantVirtualBufferCache.java M hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/VirtualBufferCache.java M hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java M hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexAccessor.java M hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexOpContext.java M hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/AbstractLSMRTree.java M hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/ExternalRTree.java M hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/ExternalRTreeOpContext.java M hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java M hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeOpContext.java M hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuples.java M hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/BufferCache.java M hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/DebugBufferCache.java M hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/IBufferCache.java 40 files changed, 563 insertions(+), 17 deletions(-) Approvals: abdullah alamoudi: Looks good to me, approved Jenkins: Verified diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/replication/IIOReplicationManager.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/replication/IIOReplicationManager.java new file mode 100644 index 0000000..d2e7011 --- /dev/null +++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/replication/IIOReplicationManager.java @@ -0,0 +1,27 @@ +/* + * Copyright 2009-2013 by The Regents of the University of California + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * you may obtain a copy of the License from + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package edu.uci.ics.hyracks.api.replication; + +import java.io.IOException; + +import edu.uci.ics.hyracks.api.lifecycle.ILifeCycleComponent; + +public interface IIOReplicationManager extends ILifeCycleComponent { + + public void submitJob(IReplicationJob job) throws IOException; + + public boolean isReplicationEnabled(); + +} diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/replication/IReplicationJob.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/replication/IReplicationJob.java new file mode 100644 index 0000000..4d3b58d --- /dev/null +++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/replication/IReplicationJob.java @@ -0,0 +1,45 @@ +/* + * Copyright 2009-2013 by The Regents of the University of California + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * you may obtain a copy of the License from + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package edu.uci.ics.hyracks.api.replication; + +import java.util.Set; + +public interface IReplicationJob { + + public enum ReplicationJobType { + LSM_COMPONENT, + METADATA + } + + public enum ReplicationOperation { + REPLICATE, + DELETE, + STOP + } + + public enum ReplicationExecutionType { + ASYNC, + SYNC + } + + public ReplicationJobType getJobType(); + + public ReplicationOperation getOperation(); + + public ReplicationExecutionType getExecutionType(); + + public Set<String> getJobFiles(); + +} diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/replication/impl/AbstractReplicationJob.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/replication/impl/AbstractReplicationJob.java new file mode 100644 index 0000000..f73c9e1 --- /dev/null +++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/replication/impl/AbstractReplicationJob.java @@ -0,0 +1,54 @@ +/* + * Copyright 2009-2013 by The Regents of the University of California + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * you may obtain a copy of the License from + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package edu.uci.ics.hyracks.api.replication.impl; + +import java.util.Set; + +import edu.uci.ics.hyracks.api.replication.IReplicationJob; + +public abstract class AbstractReplicationJob implements IReplicationJob { + + private final Set<String> filesToReplicate; + private final ReplicationOperation operation; + private final ReplicationExecutionType executionType; + private final ReplicationJobType jobType; + + public AbstractReplicationJob(ReplicationJobType jobType, ReplicationOperation operation, ReplicationExecutionType executionType, Set<String> filesToReplicate){ + this.jobType = jobType; + this.operation = operation; + this.executionType = executionType; + this.filesToReplicate = filesToReplicate; + } + + @Override + public Set<String> getJobFiles() { + return filesToReplicate; + } + + @Override + public ReplicationOperation getOperation() { + return operation; + } + + @Override + public ReplicationExecutionType getExecutionType() { + return executionType; + } + + @Override + public ReplicationJobType getJobType() { + return jobType; + } +} diff --git a/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/api/IIndexLifecycleManager.java b/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/api/IIndexLifecycleManager.java index 938464f..de4ce5a 100644 --- a/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/api/IIndexLifecycleManager.java +++ b/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/api/IIndexLifecycleManager.java @@ -30,4 +30,5 @@ public void close(long resourceID) throws HyracksDataException; public List<IIndex> getOpenIndexes(); + } \ No newline at end of file diff --git a/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/frames/LIFOMetaDataFrame.java b/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/frames/LIFOMetaDataFrame.java index d23d484..5cc7602 100644 --- a/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/frames/LIFOMetaDataFrame.java +++ b/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/frames/LIFOMetaDataFrame.java @@ -41,7 +41,7 @@ // We store the page id that will be used to store the information of the the filter that is associated with a disk component. // It is only set in the first meta page other meta pages (i.e., with level -2) have junk in the max page field. private static final int additionalFilteringPageOff = validOff + 4; // 29 - protected static final int lsnOff = additionalFilteringPageOff + 4; // 33 + public static final int lsnOff = additionalFilteringPageOff + 4; // 33 protected ICachedPage page = null; protected ByteBuffer buf = null; diff --git a/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/ophelpers/IndexOperation.java b/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/ophelpers/IndexOperation.java index b5c0a1d..9d00adb 100644 --- a/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/ophelpers/IndexOperation.java +++ b/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/ophelpers/IndexOperation.java @@ -26,5 +26,6 @@ NOOP, MERGE, FULL_MERGE, - FLUSH + FLUSH, + REPLICATE } diff --git a/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/util/IndexFileNameUtil.java b/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/util/IndexFileNameUtil.java index 5b3ccb7..1fdfe28 100644 --- a/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/util/IndexFileNameUtil.java +++ b/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/util/IndexFileNameUtil.java @@ -19,7 +19,9 @@ public class IndexFileNameUtil { + public static final String IO_DEVICE_NAME_PREFIX = "device_id_"; + public static String prepareFileName(String path, int ioDeviceId) { - return path + File.separator + "device_id_" + ioDeviceId; + return path + File.separator + IO_DEVICE_NAME_PREFIX + ioDeviceId; } } \ No newline at end of file diff --git a/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/ExternalBTree.java b/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/ExternalBTree.java index 088a15b..f350e51 100644 --- a/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/ExternalBTree.java +++ b/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/ExternalBTree.java @@ -440,6 +440,9 @@ case FULL_MERGE: operationalComponents.addAll(immutableComponents); break; + case REPLICATE: + operationalComponents.addAll(ctx.getComponentsToBeReplicated()); + break; default: throw new UnsupportedOperationException("Operation " + ctx.getOperation() + " not supported."); } diff --git a/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/ExternalBTreeOpContext.java b/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/ExternalBTreeOpContext.java index 68ec7d1..a51e460 100644 --- a/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/ExternalBTreeOpContext.java +++ b/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/ExternalBTreeOpContext.java @@ -39,6 +39,7 @@ public final ISearchOperationCallback searchCallback; private final List<ILSMComponent> componentHolder; private final List<ILSMComponent> componentsToBeMerged; + private final List<ILSMComponent> componentsToBeReplicated; private final int targetIndexVersion; public ISearchPredicate searchPredicate; @@ -63,6 +64,7 @@ } this.componentHolder = new LinkedList<ILSMComponent>(); this.componentsToBeMerged = new LinkedList<ILSMComponent>(); + this.componentsToBeReplicated = new LinkedList<ILSMComponent>(); this.searchCallback = searchCallback; this.targetIndexVersion = targetIndexVersion; } @@ -77,6 +79,7 @@ public void reset() { componentHolder.clear(); componentsToBeMerged.clear(); + componentsToBeReplicated.clear(); } public IndexOperation getOperation() { @@ -123,5 +126,10 @@ public ISearchPredicate getSearchPredicate() { return searchPredicate; } + + @Override + public List<ILSMComponent> getComponentsToBeReplicated() { + return componentsToBeReplicated; + } } \ No newline at end of file diff --git a/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/ExternalBTreeWithBuddy.java b/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/ExternalBTreeWithBuddy.java index 1a4b85f..184ac9b 100644 --- a/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/ExternalBTreeWithBuddy.java +++ b/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/ExternalBTreeWithBuddy.java @@ -15,8 +15,10 @@ package edu.uci.ics.hyracks.storage.am.lsm.btree.impls; import java.io.IOException; +import java.util.HashSet; import java.util.LinkedList; import java.util.List; +import java.util.Set; import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory; import edu.uci.ics.hyracks.api.exceptions.HyracksDataException; @@ -479,6 +481,8 @@ break; case FULL_MERGE: operationalComponents.addAll(immutableComponents); + case REPLICATE: + operationalComponents.addAll(ctx.getComponentsToBeReplicated()); break; case FLUSH: // Do nothing. this is left here even though the index never @@ -878,4 +882,16 @@ public boolean isPrimaryIndex() { return false; } + + @Override + public Set<String> getLSMComponentPhysicalFiles(ILSMComponent lsmComponent) { + Set<String> files = new HashSet<String>(); + + LSMBTreeWithBuddyDiskComponent component = (LSMBTreeWithBuddyDiskComponent) lsmComponent; + files.add(component.getBTree().getFileReference().toString()); + files.add(component.getBuddyBTree().getFileReference().toString()); + files.add(component.getBloomFilter().getFileReference().toString()); + + return files; + } } \ No newline at end of file diff --git a/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/ExternalBTreeWithBuddyOpContext.java b/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/ExternalBTreeWithBuddyOpContext.java index 645810a..e898e3b 100644 --- a/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/ExternalBTreeWithBuddyOpContext.java +++ b/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/ExternalBTreeWithBuddyOpContext.java @@ -32,6 +32,7 @@ private MultiComparator buddyBTreeCmp; public final List<ILSMComponent> componentHolder; private final List<ILSMComponent> componentsToBeMerged; + private final List<ILSMComponent> componentsToBeReplicated; public final ISearchOperationCallback searchCallback; private final int targetIndexVersion; public ISearchPredicate searchPredicate; @@ -42,6 +43,7 @@ this.componentHolder = new LinkedList<ILSMComponent>(); this.componentsToBeMerged = new LinkedList<ILSMComponent>(); + this.componentsToBeReplicated = new LinkedList<ILSMComponent>(); this.searchCallback = searchCallback; this.targetIndexVersion = targetIndexVersion; this.bTreeCmp = MultiComparator.create(btreeCmpFactories); @@ -62,6 +64,7 @@ public void reset() { componentHolder.clear(); componentsToBeMerged.clear(); + componentsToBeReplicated.clear(); } @Override @@ -111,4 +114,9 @@ public ISearchPredicate getSearchPredicate() { return searchPredicate; } + + @Override + public List<ILSMComponent> getComponentsToBeReplicated() { + return componentsToBeReplicated; + } } \ No newline at end of file diff --git a/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTree.java b/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTree.java index 6d2f78c..b1b6837 100644 --- a/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTree.java +++ b/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTree.java @@ -17,7 +17,9 @@ import java.io.File; import java.util.ArrayList; +import java.util.HashSet; import java.util.List; +import java.util.Set; import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory; import edu.uci.ics.hyracks.api.exceptions.HyracksDataException; @@ -331,6 +333,9 @@ break; case FULL_MERGE: operationalComponents.addAll(immutableComponents); + break; + case REPLICATE: + operationalComponents.addAll(ctx.getComponentsToBeReplicated()); break; default: throw new UnsupportedOperationException("Operation " + ctx.getOperation() + " not supported."); @@ -845,4 +850,16 @@ public boolean isPrimaryIndex() { return needKeyDupCheck; } + + @Override + public Set<String> getLSMComponentPhysicalFiles(ILSMComponent lsmComponent) { + + Set<String> files = new HashSet<String>(); + LSMBTreeDiskComponent component = (LSMBTreeDiskComponent) lsmComponent; + + files.add(component.getBTree().getFileReference().toString()); + files.add(component.getBloomFilter().getFileReference().toString()); + + return files; + } } diff --git a/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeOpContext.java b/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeOpContext.java index a2598ea..419a765 100644 --- a/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeOpContext.java +++ b/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeOpContext.java @@ -51,6 +51,7 @@ public final ISearchOperationCallback searchCallback; private final List<ILSMComponent> componentHolder; private final List<ILSMComponent> componentsToBeMerged; + private final List<ILSMComponent> componentsToBeReplicated; public final PermutingTupleReference indexTuple; public final MultiComparator filterCmp; public final PermutingTupleReference filterTuple; @@ -92,6 +93,7 @@ } this.componentHolder = new LinkedList<ILSMComponent>(); this.componentsToBeMerged = new LinkedList<ILSMComponent>(); + this.componentsToBeReplicated = new LinkedList<ILSMComponent>(); this.modificationCallback = modificationCallback; this.searchCallback = searchCallback; @@ -126,6 +128,7 @@ public void reset() { componentHolder.clear(); componentsToBeMerged.clear(); + componentsToBeReplicated.clear(); } public IndexOperation getOperation() { @@ -187,4 +190,9 @@ public ISearchPredicate getSearchPredicate() { return searchPredicate; } + + @Override + public List<ILSMComponent> getComponentsToBeReplicated() { + return componentsToBeReplicated; + } } \ No newline at end of file diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMHarness.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMHarness.java index 1903998..a5e57c1 100644 --- a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMHarness.java +++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMHarness.java @@ -15,6 +15,8 @@ package edu.uci.ics.hyracks.storage.am.lsm.common.api; +import java.util.List; + import edu.uci.ics.hyracks.api.exceptions.HyracksDataException; import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference; import edu.uci.ics.hyracks.storage.am.common.api.IIndexCursor; @@ -52,4 +54,10 @@ public void addBulkLoadedComponent(ILSMComponent index) throws HyracksDataException, IndexException; public ILSMOperationTracker getOperationTracker(); + + public void scheduleReplication(ILSMIndexOperationContext ctx, List<ILSMComponent> lsmComponents, boolean bulkload) + throws HyracksDataException; + + public void endReplication(ILSMIndexOperationContext ctx) throws HyracksDataException; + } diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIndexAccessor.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIndexAccessor.java index 36a2ca1..2b0d64b 100644 --- a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIndexAccessor.java +++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIndexAccessor.java @@ -114,4 +114,6 @@ public void forceInsert(ITupleReference tuple) throws HyracksDataException, IndexException; public void forceDelete(ITupleReference tuple) throws HyracksDataException, IndexException; + + public void scheduleReplication(List<ILSMComponent> lsmComponents, boolean bulkload) throws HyracksDataException; } diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIndexInternal.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIndexInternal.java index be67611..04f7760 100644 --- a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIndexInternal.java +++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIndexInternal.java @@ -18,6 +18,7 @@ import java.util.List; import edu.uci.ics.hyracks.api.exceptions.HyracksDataException; +import edu.uci.ics.hyracks.api.replication.IReplicationJob.ReplicationOperation; import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference; import edu.uci.ics.hyracks.storage.am.common.api.IIndexCursor; import edu.uci.ics.hyracks.storage.am.common.api.IIndexOperationContext; @@ -80,5 +81,8 @@ public void markAsValid(ILSMComponent lsmComponent) throws HyracksDataException; public boolean isCurrentMutableComponentEmpty() throws HyracksDataException; + + public void scheduleReplication(ILSMIndexOperationContext ctx, List<ILSMComponent> lsmComponents, boolean bulkload, + ReplicationOperation operation) throws HyracksDataException; } diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIndexOperationContext.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIndexOperationContext.java index 954ae1b..86a145c 100644 --- a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIndexOperationContext.java +++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIndexOperationContext.java @@ -35,4 +35,6 @@ public void setSearchPredicate(ISearchPredicate searchPredicate); public ISearchPredicate getSearchPredicate(); + + public List<ILSMComponent> getComponentsToBeReplicated(); } diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIndexReplicationJob.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIndexReplicationJob.java new file mode 100644 index 0000000..1791084 --- /dev/null +++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIndexReplicationJob.java @@ -0,0 +1,23 @@ +/* + * Copyright 2009-2013 by The Regents of the University of California + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * you may obtain a copy of the License from + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package edu.uci.ics.hyracks.storage.am.lsm.common.api; + +import edu.uci.ics.hyracks.api.exceptions.HyracksDataException; +import edu.uci.ics.hyracks.api.replication.IReplicationJob; + +public interface ILSMIndexReplicationJob extends IReplicationJob { + + public void endReplication() throws HyracksDataException; +} diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/AbstractDiskLSMComponent.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/AbstractDiskLSMComponent.java index 8c1d826..d8dd7ea 100644 --- a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/AbstractDiskLSMComponent.java +++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/AbstractDiskLSMComponent.java @@ -37,6 +37,7 @@ switch (opType) { case FORCE_MODIFICATION: case MODIFICATION: + case REPLICATE: case SEARCH: readerCount++; break; @@ -68,6 +69,7 @@ } case FORCE_MODIFICATION: case MODIFICATION: + case REPLICATE: case SEARCH: readerCount--; if (readerCount == 0 && state == ComponentState.READABLE_MERGING) { diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java index a14e3a7..d947155 100644 --- a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java +++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java @@ -15,13 +15,18 @@ package edu.uci.ics.hyracks.storage.am.lsm.common.impls; +import java.io.IOException; import java.util.ArrayList; +import java.util.HashSet; import java.util.LinkedList; import java.util.List; +import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import edu.uci.ics.hyracks.api.exceptions.HyracksDataException; +import edu.uci.ics.hyracks.api.replication.IReplicationJob.ReplicationExecutionType; +import edu.uci.ics.hyracks.api.replication.IReplicationJob.ReplicationOperation; import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndex; import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexMetaDataFrame; import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMComponent; @@ -32,6 +37,7 @@ import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationScheduler; import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexFileManager; import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexInternal; +import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexOperationContext; import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicy; import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMOperationTracker; import edu.uci.ics.hyracks.storage.am.lsm.common.api.IVirtualBufferCache; @@ -85,7 +91,7 @@ this.filterFields = filterFields; this.inactiveDiskComponents = new LinkedList<ILSMComponent>(); this.durable = durable; - lsmHarness = new LSMHarness(this, mergePolicy, opTracker); + lsmHarness = new LSMHarness(this, mergePolicy, opTracker, diskBufferCache.isReplicationEnabled()); isActivated = false; diskComponents = new ArrayList<ILSMComponent>(); memoryComponents = new ArrayList<ILSMComponent>(); @@ -108,7 +114,7 @@ this.ioScheduler = ioScheduler; this.ioOpCallback = ioOpCallback; this.durable = durable; - lsmHarness = new ExternalIndexHarness(this, mergePolicy, opTracker); + lsmHarness = new ExternalIndexHarness(this, mergePolicy, opTracker, diskBufferCache.isReplicationEnabled()); isActivated = false; diskComponents = new LinkedList<ILSMComponent>(); this.inactiveDiskComponents = new LinkedList<ILSMComponent>(); @@ -293,4 +299,35 @@ public void addInactiveDiskComponent(ILSMComponent diskComponent) { inactiveDiskComponents.add(diskComponent); } + + public abstract Set<String> getLSMComponentPhysicalFiles(ILSMComponent newComponent); + + @Override + public void scheduleReplication(ILSMIndexOperationContext ctx, List<ILSMComponent> lsmComponents, boolean bulkload, + ReplicationOperation operation) throws HyracksDataException { + //get set of files to be replicated for this component + Set<String> componentFiles = new HashSet<String>(); + + //get set of files to be replicated for each component + for (ILSMComponent lsmComponent : lsmComponents) { + componentFiles.addAll(getLSMComponentPhysicalFiles(lsmComponent)); + } + + ReplicationExecutionType executionType; + if (bulkload) { + executionType = ReplicationExecutionType.SYNC; + } else { + executionType = ReplicationExecutionType.ASYNC; + } + + //create replication job and submit it + LSMIndexReplicationJob job = new LSMIndexReplicationJob(this, ctx, componentFiles, operation, + executionType); + try { + diskBufferCache.getIIOReplicationManager().submitJob(job); + } catch (IOException e) { + throw new HyracksDataException(e); + } + + } } diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/AbstractMemoryLSMComponent.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/AbstractMemoryLSMComponent.java index 1d93331..c45ce58 100644 --- a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/AbstractMemoryLSMComponent.java +++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/AbstractMemoryLSMComponent.java @@ -82,6 +82,7 @@ } } break; + case REPLICATE: case SEARCH: if (state == ComponentState.READABLE_WRITABLE || state == ComponentState.READABLE_UNWRITABLE || state == ComponentState.READABLE_UNWRITABLE_FLUSHING) { @@ -126,6 +127,7 @@ } } break; + case REPLICATE: case SEARCH: readerCount--; if (state == ComponentState.UNREADABLE_UNWRITABLE && readerCount == 0) { diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/ExternalIndexHarness.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/ExternalIndexHarness.java index d82489e..7bf1c40 100644 --- a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/ExternalIndexHarness.java +++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/ExternalIndexHarness.java @@ -19,6 +19,7 @@ import java.util.logging.Logger; import edu.uci.ics.hyracks.api.exceptions.HyracksDataException; +import edu.uci.ics.hyracks.api.replication.IReplicationJob.ReplicationOperation; import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference; import edu.uci.ics.hyracks.storage.am.common.api.IIndexCursor; import edu.uci.ics.hyracks.storage.am.common.api.ISearchPredicate; @@ -36,8 +37,8 @@ public class ExternalIndexHarness extends LSMHarness { private static final Logger LOGGER = Logger.getLogger(ExternalIndexHarness.class.getName()); - public ExternalIndexHarness(ILSMIndexInternal lsmIndex, ILSMMergePolicy mergePolicy, ILSMOperationTracker opTracker) { - super(lsmIndex, mergePolicy, opTracker); + public ExternalIndexHarness(ILSMIndexInternal lsmIndex, ILSMMergePolicy mergePolicy, ILSMOperationTracker opTracker, boolean replicationEnabled) { + super(lsmIndex, mergePolicy, opTracker, replicationEnabled); } @Override @@ -111,6 +112,11 @@ c.threadExit(opType, failedOperation, false); switch (c.getState()) { case INACTIVE: + if (replicationEnabled) { + componentsToBeReplicated.clear(); + componentsToBeReplicated.add(c); + lsmIndex.scheduleReplication(null, componentsToBeReplicated, false, ReplicationOperation.DELETE); + } ((AbstractDiskLSMComponent) c).destroy(); break; default: @@ -124,6 +130,11 @@ if (newComponent != null) { beforeSubsumeMergedComponents(newComponent, ctx.getComponentHolder()); lsmIndex.subsumeMergedComponents(newComponent, ctx.getComponentHolder()); + if (replicationEnabled) { + componentsToBeReplicated.clear(); + componentsToBeReplicated.add(newComponent); + triggerReplication(componentsToBeReplicated, false); + } mergePolicy.diskComponentAdded(lsmIndex, fullMergeIsRequested.get()); } break; @@ -223,6 +234,11 @@ lsmIndex.markAsValid(c); synchronized (opTracker) { lsmIndex.addComponent(c); + if (replicationEnabled) { + componentsToBeReplicated.clear(); + componentsToBeReplicated.add(c); + triggerReplication(componentsToBeReplicated, true); + } // Enter the component enterComponent(c); mergePolicy.diskComponentAdded(lsmIndex, false); @@ -311,6 +327,11 @@ private void exitComponent(ILSMComponent diskComponent) throws HyracksDataException { diskComponent.threadExit(LSMOperationType.SEARCH, false, false); if (diskComponent.getState() == ILSMComponent.ComponentState.INACTIVE) { + if (replicationEnabled) { + componentsToBeReplicated.clear(); + componentsToBeReplicated.add(diskComponent); + lsmIndex.scheduleReplication(null, componentsToBeReplicated, false, ReplicationOperation.DELETE); + } ((AbstractDiskLSMComponent) diskComponent).destroy(); } } diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMHarness.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMHarness.java index d861404..a5f3e33 100644 --- a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMHarness.java +++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMHarness.java @@ -15,6 +15,7 @@ package edu.uci.ics.hyracks.storage.am.lsm.common.impls; +import java.util.ArrayList; import java.util.LinkedList; import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; @@ -22,10 +23,12 @@ import java.util.logging.Logger; import edu.uci.ics.hyracks.api.exceptions.HyracksDataException; +import edu.uci.ics.hyracks.api.replication.IReplicationJob.ReplicationOperation; import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference; import edu.uci.ics.hyracks.storage.am.common.api.IIndexCursor; import edu.uci.ics.hyracks.storage.am.common.api.ISearchPredicate; import edu.uci.ics.hyracks.storage.am.common.api.IndexException; +import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallback; import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOperation; import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMComponent; import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMComponent.ComponentState; @@ -33,6 +36,7 @@ import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMHarness; import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperation; import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback; +import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexAccessorInternal; import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexInternal; import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexOperationContext; import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicy; @@ -45,12 +49,18 @@ protected final ILSMMergePolicy mergePolicy; protected final ILSMOperationTracker opTracker; protected final AtomicBoolean fullMergeIsRequested; - - public LSMHarness(ILSMIndexInternal lsmIndex, ILSMMergePolicy mergePolicy, ILSMOperationTracker opTracker) { + protected final boolean replicationEnabled; + protected List<ILSMComponent> componentsToBeReplicated; + + public LSMHarness(ILSMIndexInternal lsmIndex, ILSMMergePolicy mergePolicy, ILSMOperationTracker opTracker, boolean replicationEnabled) { this.lsmIndex = lsmIndex; this.opTracker = opTracker; this.mergePolicy = mergePolicy; fullMergeIsRequested = new AtomicBoolean(); + this.replicationEnabled = replicationEnabled; + if (replicationEnabled) { + this.componentsToBeReplicated = new ArrayList<ILSMComponent>(); + } } protected boolean getAndEnterComponents(ILSMIndexOperationContext ctx, LSMOperationType opType, @@ -213,6 +223,11 @@ // newComponent is null if the flush op. was not performed. if (newComponent != null) { lsmIndex.addComponent(newComponent); + if (replicationEnabled) { + componentsToBeReplicated.clear(); + componentsToBeReplicated.add(newComponent); + triggerReplication(componentsToBeReplicated, false); + } mergePolicy.diskComponentAdded(lsmIndex, false); } break; @@ -220,6 +235,11 @@ // newComponent is null if the merge op. was not performed. if (newComponent != null) { lsmIndex.subsumeMergedComponents(newComponent, ctx.getComponentHolder()); + if (replicationEnabled) { + componentsToBeReplicated.clear(); + componentsToBeReplicated.add(newComponent); + triggerReplication(componentsToBeReplicated, false); + } mergePolicy.diskComponentAdded(lsmIndex, fullMergeIsRequested.get()); } break; @@ -269,6 +289,12 @@ */ if (inactiveDiskComponentsToBeDeleted != null) { try { + //schedule a replication job to delete these inactive disk components from replicas + if (replicationEnabled) { + lsmIndex.scheduleReplication(null, inactiveDiskComponentsToBeDeleted, false, + ReplicationOperation.DELETE); + } + for (ILSMComponent c : inactiveDiskComponentsToBeDeleted) { ((AbstractDiskLSMComponent) c).destroy(); } @@ -427,6 +453,11 @@ lsmIndex.markAsValid(c); synchronized (opTracker) { lsmIndex.addComponent(c); + if (replicationEnabled) { + componentsToBeReplicated.clear(); + componentsToBeReplicated.add(c); + triggerReplication(componentsToBeReplicated, true); + } mergePolicy.diskComponentAdded(lsmIndex, false); } } @@ -435,4 +466,31 @@ public ILSMOperationTracker getOperationTracker() { return opTracker; } + + protected void triggerReplication(List<ILSMComponent> lsmComponents, boolean bulkload) throws HyracksDataException { + ILSMIndexAccessorInternal accessor = lsmIndex.createAccessor(NoOpOperationCallback.INSTANCE, + NoOpOperationCallback.INSTANCE); + accessor.scheduleReplication(lsmComponents, bulkload); + } + + @Override + public void scheduleReplication(ILSMIndexOperationContext ctx, List<ILSMComponent> lsmComponents, boolean bulkload) + throws HyracksDataException { + + //enter the LSM components to be replicated to prevent them from being deleted until they are replicated + if (!getAndEnterComponents(ctx, LSMOperationType.REPLICATE, false)) { + return; + } + + lsmIndex.scheduleReplication(ctx, lsmComponents, bulkload, ReplicationOperation.REPLICATE); + } + + @Override + public void endReplication(ILSMIndexOperationContext ctx) throws HyracksDataException { + try { + exitComponents(ctx, LSMOperationType.REPLICATE, null, false); + } catch (IndexException e) { + throw new HyracksDataException(e); + } + } } diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMIndexReplicationJob.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMIndexReplicationJob.java new file mode 100644 index 0000000..232bfb2 --- /dev/null +++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMIndexReplicationJob.java @@ -0,0 +1,42 @@ +/* + * Copyright 2009-2013 by The Regents of the University of California + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * you may obtain a copy of the License from + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package edu.uci.ics.hyracks.storage.am.lsm.common.impls; + +import java.util.Set; + +import edu.uci.ics.hyracks.api.exceptions.HyracksDataException; +import edu.uci.ics.hyracks.api.replication.impl.AbstractReplicationJob; +import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexOperationContext; +import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexReplicationJob; + +public class LSMIndexReplicationJob extends AbstractReplicationJob implements ILSMIndexReplicationJob { + + private final AbstractLSMIndex lsmIndex; + private final ILSMIndexOperationContext ctx; + + public LSMIndexReplicationJob(AbstractLSMIndex lsmIndex, ILSMIndexOperationContext ctx, + Set<String> filesToReplicate, ReplicationOperation operation, ReplicationExecutionType executionType) { + super(ReplicationJobType.LSM_COMPONENT, operation, executionType, filesToReplicate); + this.lsmIndex = lsmIndex; + this.ctx = ctx; + } + + @Override + public void endReplication() throws HyracksDataException { + if (ctx != null) { + lsmIndex.lsmHarness.endReplication(ctx); + } + } +} diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMOperationType.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMOperationType.java index 853b6d0..070b0da 100644 --- a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMOperationType.java +++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMOperationType.java @@ -19,5 +19,6 @@ MODIFICATION, FORCE_MODIFICATION, FLUSH, - MERGE + MERGE, + REPLICATE } diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java index c828bd2..9d2cf36 100644 --- a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java +++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java @@ -126,6 +126,14 @@ ctx.getComponentsToBeMerged().addAll(components); lsmHarness.scheduleMerge(ctx, callback); } + + @Override + public void scheduleReplication(List<ILSMComponent> lsmComponents, boolean bulkload) throws HyracksDataException { + ctx.setOperation(IndexOperation.REPLICATE); + ctx.getComponentsToBeReplicated().clear(); + ctx.getComponentsToBeReplicated().addAll(lsmComponents); + lsmHarness.scheduleReplication(ctx, lsmComponents, bulkload); + } @Override public void scheduleFullMerge(ILSMIOOperationCallback callback) throws HyracksDataException, IndexException { diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/MultitenantVirtualBufferCache.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/MultitenantVirtualBufferCache.java index 3c009c10..63b3349 100644 --- a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/MultitenantVirtualBufferCache.java +++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/MultitenantVirtualBufferCache.java @@ -16,6 +16,7 @@ import edu.uci.ics.hyracks.api.exceptions.HyracksDataException; import edu.uci.ics.hyracks.api.io.FileReference; +import edu.uci.ics.hyracks.api.replication.IIOReplicationManager; import edu.uci.ics.hyracks.storage.am.lsm.common.api.IVirtualBufferCache; import edu.uci.ics.hyracks.storage.common.buffercache.ICachedPage; import edu.uci.ics.hyracks.storage.common.file.IFileMapManager; @@ -146,4 +147,13 @@ return 0; } + @Override + public boolean isReplicationEnabled() { + return false; + } + + @Override + public IIOReplicationManager getIIOReplicationManager() { + return null; + } } diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/VirtualBufferCache.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/VirtualBufferCache.java index 80b2897..d5d04fb 100644 --- a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/VirtualBufferCache.java +++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/VirtualBufferCache.java @@ -23,6 +23,7 @@ import edu.uci.ics.hyracks.api.exceptions.HyracksDataException; import edu.uci.ics.hyracks.api.io.FileReference; +import edu.uci.ics.hyracks.api.replication.IIOReplicationManager; import edu.uci.ics.hyracks.storage.am.lsm.common.api.IVirtualBufferCache; import edu.uci.ics.hyracks.storage.common.buffercache.ICacheMemoryAllocator; import edu.uci.ics.hyracks.storage.common.buffercache.ICachedPage; @@ -371,4 +372,14 @@ public int getFileReferenceCount(int fileId) { return 0; } + + @Override + public boolean isReplicationEnabled() { + return false; + } + + @Override + public IIOReplicationManager getIIOReplicationManager() { + return null; + } } diff --git a/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java index 7bfb378..4250e6d 100644 --- a/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java +++ b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java @@ -16,7 +16,9 @@ import java.io.File; import java.util.ArrayList; +import java.util.HashSet; import java.util.List; +import java.util.Set; import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory; import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits; @@ -323,6 +325,9 @@ break; case FULL_MERGE: operationalComponents.addAll(immutableComponents); + break; + case REPLICATE: + operationalComponents.addAll(ctx.getComponentsToBeReplicated()); break; default: throw new UnsupportedOperationException("Operation " + ctx.getOperation() + " not supported."); @@ -934,4 +939,19 @@ public boolean isPrimaryIndex() { return false; } + + @Override + public Set<String> getLSMComponentPhysicalFiles(ILSMComponent lsmComponent) { + Set<String> files = new HashSet<String>(); + + LSMInvertedIndexDiskComponent invIndexComponent = (LSMInvertedIndexDiskComponent) lsmComponent; + OnDiskInvertedIndex invIndex = (OnDiskInvertedIndex) invIndexComponent.getInvIndex(); + + files.add(invIndex.getInvListsFile().toString()); + files.add(invIndex.getBTree().toString()); + files.add(invIndexComponent.getBloomFilter().getFileReference().toString()); + files.add(invIndexComponent.getDeletedKeysBTree().getFileReference().toString()); + + return files; + } } diff --git a/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexAccessor.java b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexAccessor.java index f21cc2a..03a2691 100644 --- a/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexAccessor.java +++ b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexAccessor.java @@ -96,6 +96,14 @@ ctx.getComponentsToBeMerged().addAll(components); lsmHarness.scheduleMerge(ctx, callback); } + + @Override + public void scheduleReplication(List<ILSMComponent> lsmComponents, boolean bulkload) throws HyracksDataException { + ctx.setOperation(IndexOperation.REPLICATE); + ctx.getComponentsToBeReplicated().clear(); + ctx.getComponentsToBeReplicated().addAll(lsmComponents); + lsmHarness.scheduleReplication(ctx, lsmComponents, bulkload); + } @Override public void scheduleFullMerge(ILSMIOOperationCallback callback) throws HyracksDataException, IndexException { @@ -171,4 +179,5 @@ throws HyracksDataException, IndexException { throw new UnsupportedOperationException("Cannot open inverted list cursor on lsm inverted index."); } + } diff --git a/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexOpContext.java b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexOpContext.java index 7a8305d..1b1f65b 100644 --- a/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexOpContext.java +++ b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexOpContext.java @@ -38,6 +38,7 @@ private IndexOperation op; private final List<ILSMComponent> componentHolder; private final List<ILSMComponent> componentsToBeMerged; + private final List<ILSMComponent> componentsToBeReplicated; public final IModificationOperationCallback modificationCallback; public final ISearchOperationCallback searchCallback; @@ -64,6 +65,7 @@ int[] invertedIndexFields, int[] filterFields) throws HyracksDataException { this.componentHolder = new LinkedList<ILSMComponent>(); this.componentsToBeMerged = new LinkedList<ILSMComponent>(); + this.componentsToBeReplicated = new LinkedList<ILSMComponent>(); this.modificationCallback = modificationCallback; this.searchCallback = searchCallback; @@ -105,6 +107,7 @@ public void reset() { componentHolder.clear(); componentsToBeMerged.clear(); + componentsToBeReplicated.clear(); } @Override @@ -154,4 +157,9 @@ public ISearchPredicate getSearchPredicate() { return searchPredicate; } + + @Override + public List<ILSMComponent> getComponentsToBeReplicated() { + return componentsToBeReplicated; + } } diff --git a/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/AbstractLSMRTree.java b/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/AbstractLSMRTree.java index 4f866a7..4b93100 100644 --- a/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/AbstractLSMRTree.java +++ b/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/AbstractLSMRTree.java @@ -270,6 +270,9 @@ case FULL_MERGE: operationalComponents.addAll(immutableComponents); break; + case REPLICATE: + operationalComponents.addAll(ctx.getComponentsToBeReplicated()); + break; default: throw new UnsupportedOperationException("Operation " + ctx.getOperation() + " not supported."); } diff --git a/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/ExternalRTree.java b/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/ExternalRTree.java index 66a9250..ea66b46 100644 --- a/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/ExternalRTree.java +++ b/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/ExternalRTree.java @@ -483,6 +483,9 @@ case FULL_MERGE: operationalComponents.addAll(immutableComponents); break; + case REPLICATE: + operationalComponents.addAll(ctx.getComponentsToBeReplicated()); + break; case FLUSH: // Do nothing. this is left here even though the index never // performs flushes because a flush is triggered by diff --git a/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/ExternalRTreeOpContext.java b/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/ExternalRTreeOpContext.java index 0ce83dc..b49199d 100644 --- a/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/ExternalRTreeOpContext.java +++ b/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/ExternalRTreeOpContext.java @@ -32,6 +32,7 @@ private MultiComparator rTreeCmp; public final List<ILSMComponent> componentHolder; private final List<ILSMComponent> componentsToBeMerged; + private final List<ILSMComponent> componentsToBeReplicated; public final ISearchOperationCallback searchCallback; private final int targetIndexVersion; public ISearchPredicate searchPredicate; @@ -42,6 +43,7 @@ this.componentHolder = new LinkedList<ILSMComponent>(); this.componentsToBeMerged = new LinkedList<ILSMComponent>(); + this.componentsToBeReplicated = new LinkedList<ILSMComponent>(); this.searchCallback = searchCallback; this.targetIndexVersion = targetIndexVersion; this.bTreeCmp = MultiComparator.create(btreeCmpFactories); @@ -62,6 +64,7 @@ public void reset() { componentHolder.clear(); componentsToBeMerged.clear(); + componentsToBeReplicated.clear(); } @Override @@ -111,4 +114,9 @@ public ISearchPredicate getSearchPredicate() { return searchPredicate; } + + @Override + public List<ILSMComponent> getComponentsToBeReplicated() { + return componentsToBeReplicated; + } } \ No newline at end of file diff --git a/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java b/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java index 825874a..b84fb8b 100644 --- a/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java +++ b/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java @@ -16,7 +16,9 @@ package edu.uci.ics.hyracks.storage.am.lsm.rtree.impls; import java.util.ArrayList; +import java.util.HashSet; import java.util.List; +import java.util.Set; import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory; import edu.uci.ics.hyracks.api.dataflow.value.ILinearizeComparatorFactory; @@ -617,4 +619,18 @@ forceFlushDirtyPages(component.getBTree()); markAsValidInternal(component.getBTree()); } + + @Override + public Set<String> getLSMComponentPhysicalFiles(ILSMComponent lsmComponent) { + Set<String> files = new HashSet<String>(); + + LSMRTreeDiskComponent component = (LSMRTreeDiskComponent) lsmComponent; + + files.add(component.getBTree().getFileReference().toString()); + files.add(component.getRTree().getFileReference().toString()); + files.add(component.getBloomFilter().getFileReference().toString()); + + return files; + } + } diff --git a/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeOpContext.java b/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeOpContext.java index b2c939a..4ff05e6 100644 --- a/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeOpContext.java +++ b/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeOpContext.java @@ -51,6 +51,7 @@ private IndexOperation op; public final List<ILSMComponent> componentHolder; private final List<ILSMComponent> componentsToBeMerged; + private final List<ILSMComponent> componentsToBeReplicated; public final IModificationOperationCallback modificationCallback; public final ISearchOperationCallback searchCallback; public final PermutingTupleReference indexTuple; @@ -86,6 +87,7 @@ currentBTreeOpContext = btreeOpContexts[0]; this.componentHolder = new LinkedList<ILSMComponent>(); this.componentsToBeMerged = new LinkedList<ILSMComponent>(); + this.componentsToBeReplicated = new LinkedList<ILSMComponent>(); this.modificationCallback = modificationCallback; this.searchCallback = searchCallback; @@ -162,4 +164,9 @@ public ISearchPredicate getSearchPredicate() { return searchPredicate; } + + @Override + public List<ILSMComponent> getComponentsToBeReplicated() { + return componentsToBeReplicated; + } } \ No newline at end of file diff --git a/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuples.java b/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuples.java index f1703b1..4eb1709 100644 --- a/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuples.java +++ b/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuples.java @@ -16,7 +16,9 @@ package edu.uci.ics.hyracks.storage.am.lsm.rtree.impls; import java.util.ArrayList; +import java.util.HashSet; import java.util.List; +import java.util.Set; import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory; import edu.uci.ics.hyracks.api.dataflow.value.ILinearizeComparatorFactory; @@ -447,4 +449,15 @@ forceFlushDirtyPages(rtree); markAsValidInternal(rtree); } + + @Override + public Set<String> getLSMComponentPhysicalFiles(ILSMComponent lsmComponent) { + Set<String> files = new HashSet<String>(); + + RTree rtree = ((LSMRTreeDiskComponent) lsmComponent).getRTree(); + files.add(rtree.getFileReference().toString()); + + return files; + } + } diff --git a/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/BufferCache.java b/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/BufferCache.java index f5b5b17..94ad801 100644 --- a/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/BufferCache.java +++ b/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/BufferCache.java @@ -35,6 +35,7 @@ import edu.uci.ics.hyracks.api.io.IFileHandle; import edu.uci.ics.hyracks.api.io.IIOManager; import edu.uci.ics.hyracks.api.lifecycle.ILifeCycleComponent; +import edu.uci.ics.hyracks.api.replication.IIOReplicationManager; import edu.uci.ics.hyracks.storage.common.file.BufferedFileHandle; import edu.uci.ics.hyracks.storage.common.file.IFileMapManager; @@ -55,9 +56,8 @@ private final CleanerThread cleanerThread; private final Map<Integer, BufferedFileHandle> fileInfoMap; private final Set<Integer> virtualFiles; - + private IIOReplicationManager ioReplicationManager; private List<ICachedPageInternal> cachedPages = new ArrayList<ICachedPageInternal>(); - private boolean closed; public BufferCache(IIOManager ioManager, IPageReplacementStrategy pageReplacementStrategy, @@ -81,6 +81,15 @@ cleanerThread = new CleanerThread(); executor.execute(cleanerThread); closed = false; + } + + //this constructor is used when replication is enabled to pass the IIOReplicationManager + public BufferCache(IIOManager ioManager, IPageReplacementStrategy pageReplacementStrategy, + IPageCleanerPolicy pageCleanerPolicy, IFileMapManager fileMapManager, int maxOpenFiles, + ThreadFactory threadFactory, IIOReplicationManager ioReplicationManager) { + + this(ioManager, pageReplacementStrategy, pageCleanerPolicy, fileMapManager, maxOpenFiles, threadFactory); + this.ioReplicationManager = ioReplicationManager; } @Override @@ -366,7 +375,7 @@ buffer.append("Buffer cache state\n"); buffer.append("Page Size: ").append(pageSize).append('\n'); buffer.append("Number of physical pages: ").append(pageReplacementStrategy.getMaxAllowedNumPages()) - .append('\n'); + .append('\n'); buffer.append("Hash table size: ").append(pageMap.length).append('\n'); buffer.append("Page Map:\n"); int nCachedPages = 0; @@ -379,10 +388,10 @@ buffer.append(" ").append(i).append('\n'); while (cp != null) { buffer.append(" ").append(cp.cpid).append(" -> [") - .append(BufferedFileHandle.getFileId(cp.dpid)).append(':') - .append(BufferedFileHandle.getPageId(cp.dpid)).append(", ").append(cp.pinCount.get()) - .append(", ").append(cp.valid ? "valid" : "invalid").append(", ") - .append(cp.dirty.get() ? "dirty" : "clean").append("]\n"); + .append(BufferedFileHandle.getFileId(cp.dpid)).append(':') + .append(BufferedFileHandle.getPageId(cp.dpid)).append(", ").append(cp.pinCount.get()) + .append(", ").append(cp.valid ? "valid" : "invalid").append(", ") + .append(cp.dirty.get() ? "dirty" : "clean").append("]\n"); cp = cp.next; ++nCachedPages; } @@ -808,4 +817,17 @@ public void dumpState(OutputStream os) throws IOException { os.write(dumpState().getBytes()); } + + @Override + public boolean isReplicationEnabled() { + if (ioReplicationManager != null) { + return ioReplicationManager.isReplicationEnabled(); + } + return false; + } + + @Override + public IIOReplicationManager getIIOReplicationManager() { + return ioReplicationManager; + } } diff --git a/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/DebugBufferCache.java b/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/DebugBufferCache.java index 3a88091..3716445 100644 --- a/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/DebugBufferCache.java +++ b/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/DebugBufferCache.java @@ -19,6 +19,7 @@ import edu.uci.ics.hyracks.api.exceptions.HyracksDataException; import edu.uci.ics.hyracks.api.io.FileReference; +import edu.uci.ics.hyracks.api.replication.IIOReplicationManager; /** * Implementation of an IBufferCache that counts the number of pins/unpins, @@ -192,5 +193,14 @@ public int getFileReferenceCount(int fileId) { return bufferCache.getFileReferenceCount(fileId); } + + @Override + public boolean isReplicationEnabled() { + return false; + } + @Override + public IIOReplicationManager getIIOReplicationManager() { + return null; + } } diff --git a/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/IBufferCache.java b/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/IBufferCache.java index 478c641..957d19d 100644 --- a/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/IBufferCache.java +++ b/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/IBufferCache.java @@ -16,6 +16,7 @@ import edu.uci.ics.hyracks.api.exceptions.HyracksDataException; import edu.uci.ics.hyracks.api.io.FileReference; +import edu.uci.ics.hyracks.api.replication.IIOReplicationManager; public interface IBufferCache { public void createFile(FileReference fileRef) throws HyracksDataException; @@ -51,4 +52,9 @@ public int getFileReferenceCount(int fileId); public void close() throws HyracksDataException; + + public boolean isReplicationEnabled(); + + public IIOReplicationManager getIIOReplicationManager(); + } \ No newline at end of file -- To view, visit https://asterix-gerrit.ics.uci.edu/322 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-MessageType: merged Gerrit-Change-Id: I80565fc9d74e30440d2df5917911904ba8f33c25 Gerrit-PatchSet: 4 Gerrit-Project: hyracks Gerrit-Branch: master Gerrit-Owner: Murtadha Hubail <[email protected]> Gerrit-Reviewer: Ian Maxon <[email protected]> Gerrit-Reviewer: Jenkins <[email protected]> Gerrit-Reviewer: Murtadha Hubail <[email protected]> Gerrit-Reviewer: abdullah alamoudi <[email protected]>
