Young-Seok Kim has uploaded a new change for review.
https://asterix-gerrit.ics.uci.edu/795
Change subject: PLEASE EDIT to provide a meaningful commit message!
......................................................................
PLEASE EDIT to provide a meaningful commit message!
Fix ASTERIXDB-1011 issue: added flow control for merge policy
Change-Id: Ide99c022861f96cd60bc8f5795c4964ab02b3e14
---
M
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicy.java
M
hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMMergePolicy.java
M
hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ConstantMergePolicy.java
M
hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
M
hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/NoMergePolicy.java
M
hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/PrefixMergePolicy.java
6 files changed, 130 insertions(+), 13 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb
refs/changes/95/795/1
diff --git
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicy.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicy.java
index a7374d3..3d112ef 100644
---
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicy.java
+++
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicy.java
@@ -46,13 +46,13 @@
private final int datasetID;
public CorrelatedPrefixMergePolicy(IIndexLifecycleManager
datasetLifecycleManager, int datasetID) {
- this.datasetLifecycleManager =
(DatasetLifecycleManager)datasetLifecycleManager;
+ this.datasetLifecycleManager = (DatasetLifecycleManager)
datasetLifecycleManager;
this.datasetID = datasetID;
}
@Override
- public void diskComponentAdded(final ILSMIndex index, boolean
fullMergeIsRequested) throws HyracksDataException,
- IndexException {
+ public void diskComponentAdded(final ILSMIndex index, boolean
fullMergeIsRequested)
+ throws HyracksDataException, IndexException {
// This merge policy will only look at primary indexes in order to
evaluate if a merge operation is needed. If it decides that
// a merge operation is needed, then it will merge *all* the indexes
that belong to the dataset. The criteria to decide if a merge
// is needed is the same as the one that is used in the prefix merge
policy:
@@ -113,8 +113,8 @@
// Reverse the components order back to its original order
Collections.reverse(mergableComponents);
- ILSMIndexAccessor accessor = lsmIndex.createAccessor(
- NoOpOperationCallback.INSTANCE,
NoOpOperationCallback.INSTANCE);
+ ILSMIndexAccessor accessor =
lsmIndex.createAccessor(NoOpOperationCallback.INSTANCE,
+ NoOpOperationCallback.INSTANCE);
accessor.scheduleMerge(lsmIndex.getIOOperationCallback(),
mergableComponents);
}
break;
@@ -127,4 +127,10 @@
maxMergableComponentSize =
Long.parseLong(properties.get("max-mergable-component-size"));
maxToleranceComponentCount =
Integer.parseInt(properties.get("max-tolerance-component-count"));
}
+
+ @Override
+ public boolean isMergeLagging(ILSMIndex index) {
+ //TODO implement properly according to the merge policy
+ return false;
+ }
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMMergePolicy.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMMergePolicy.java
index 841117b..4afe4f6 100644
---
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMMergePolicy.java
+++
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMMergePolicy.java
@@ -25,8 +25,10 @@
import org.apache.hyracks.storage.am.common.api.IndexException;
public interface ILSMMergePolicy {
- public void diskComponentAdded(ILSMIndex index, boolean
fullMergeIsRequested) throws HyracksDataException,
- IndexException;
+ public void diskComponentAdded(ILSMIndex index, boolean
fullMergeIsRequested)
+ throws HyracksDataException, IndexException;
public void configure(Map<String, String> properties);
+
+ public boolean isMergeLagging(ILSMIndex index);
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ConstantMergePolicy.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ConstantMergePolicy.java
index 93d6fd4..7176122 100644
---
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ConstantMergePolicy.java
+++
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ConstantMergePolicy.java
@@ -35,8 +35,8 @@
private int numComponents;
@Override
- public void diskComponentAdded(final ILSMIndex index, boolean
fullMergeIsRequested) throws HyracksDataException,
- IndexException {
+ public void diskComponentAdded(final ILSMIndex index, boolean
fullMergeIsRequested)
+ throws HyracksDataException, IndexException {
List<ILSMComponent> immutableComponents =
index.getImmutableComponents();
for (ILSMComponent c : immutableComponents) {
if (c.getState() != ComponentState.READABLE_UNWRITABLE) {
@@ -58,4 +58,25 @@
public void configure(Map<String, String> properties) {
numComponents = Integer.parseInt(properties.get("num-components"));
}
+
+ @Override
+ public boolean isMergeLagging(ILSMIndex index) {
+ // see PrefixMergePolicy.isMergeLagging() for the rationale behind
this code.
+
+ List<ILSMComponent> immutableComponents =
index.getImmutableComponents();
+ int totalImmutableComponentCount = immutableComponents.size();
+ int mergableImmutableComponentCount = 0;
+ int i = 0;
+ for (i = 0; i < totalImmutableComponentCount; i++) {
+ AbstractDiskLSMComponent c = (AbstractDiskLSMComponent)
immutableComponents.get(i);
+ if (c.getState() != ComponentState.READABLE_UNWRITABLE) {
+ break;
+ }
+ ++mergableImmutableComponentCount;
+ if (mergableImmutableComponentCount == numComponents) {
+ return true;
+ }
+ }
+ return false;
+ }
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
index a19532f..b58cc29 100644
---
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
+++
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
@@ -200,6 +200,25 @@
try {
synchronized (opTracker) {
try {
+
+ /**
+ * [flow control]
+ * If merge operations are lagged according to the merge
policy,
+ * flushing in-memory components are hold until the merge
operation catches up.
+ * See PrefixMergePolicy.isMergeLagging() for more details.
+ */
+ if (opType == LSMOperationType.FLUSH) {
+ while (mergePolicy.isMergeLagging(lsmIndex)) {
+ try {
+ opTracker.wait();
+ } catch (InterruptedException e) {
+ //ignore
+ }
+ }
+ } else if (opType == LSMOperationType.MERGE) {
+ opTracker.notifyAll();
+ }
+
int i = 0;
// First check if there is any action that is needed to be
taken based on the state of each component.
for (ILSMComponent c : ctx.getComponentHolder()) {
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/NoMergePolicy.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/NoMergePolicy.java
index 39ab815..86be9c8 100644
---
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/NoMergePolicy.java
+++
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/NoMergePolicy.java
@@ -28,8 +28,8 @@
public class NoMergePolicy implements ILSMMergePolicy {
@Override
- public void diskComponentAdded(final ILSMIndex index, boolean
fullMergeIsRequested) throws HyracksDataException,
- IndexException {
+ public void diskComponentAdded(final ILSMIndex index, boolean
fullMergeIsRequested)
+ throws HyracksDataException, IndexException {
// Do nothing
}
@@ -37,4 +37,9 @@
public void configure(Map<String, String> properties) {
// Do nothing
}
+
+ @Override
+ public boolean isMergeLagging(ILSMIndex index) {
+ return false;
+ }
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/PrefixMergePolicy.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/PrefixMergePolicy.java
index 5b8da53..4409759 100644
---
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/PrefixMergePolicy.java
+++
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/PrefixMergePolicy.java
@@ -39,8 +39,8 @@
private int maxToleranceComponentCount;
@Override
- public void diskComponentAdded(final ILSMIndex index, boolean
fullMergeIsRequested) throws HyracksDataException,
- IndexException {
+ public void diskComponentAdded(final ILSMIndex index, boolean
fullMergeIsRequested)
+ throws HyracksDataException, IndexException {
// 1. Look at the candidate components for merging in oldest-first
order. If one exists, identify the prefix of the sequence of
// all such components for which the sum of their sizes exceeds
MaxMrgCompSz. Schedule a merge of those components into a new component.
// 2. If a merge from 1 doesn't happen, see if the set of candidate
components for merging exceeds MaxTolCompCnt. If so, schedule
@@ -93,4 +93,68 @@
maxMergableComponentSize =
Long.parseLong(properties.get("max-mergable-component-size"));
maxToleranceComponentCount =
Integer.parseInt(properties.get("max-tolerance-component-count"));
}
+
+ @Override
+ public boolean isMergeLagging(ILSMIndex index) {
+
+ /**
+ * [for flow-control purpose]
+ * when merge operations are lagged, threads which flushed components
will be blocked
+ * until merge operations catch up, i.e, until the number of mergable
immutable components <= maxToleranceComponentCount
+ * example:
+ * suppose that maxToleranceComponentCount = 3 and
maxMergableComponentSize = 1GB
+ * The following shows a set of events occurred in time ti with a
brief description.
+ * time
+ * t40: c32-1(1GB, RU) c38-33(192MB, RU) c39-39(32MB, RU) c40-40(32MB,
RU)
+ * --> a thread which added c40-40 will trigger a merge including
c38-33,c39-39,c40-40
+ * t41: c32-1(1GB, RU) c38-33(192MB, RUM) c39-39(32MB, RUM)
c40-40(32MB, RUM) c41-41(32MB, RU)
+ * --> a thread which added c41-41 will not be blocked
+ * t42: c32-1(1GB, RU) c38-33(192MB, RUM) c39-39(32MB, RUM)
c40-40(32MB, RUM) c41-41(32MB, RU) c42-42(32MB, RU)
+ * --> a thread which added c42-42 will not be blocked
+ * t43: c32-1(1GB, RU) c38-33(192MB, RUM) c39-39(32MB, RUM)
c40-40(32MB, RUM) c41-41(32MB, RU) c42-42(32MB, RU) c43-43(32MB, RU)
+ * --> a thread which added c43-43 will not be blocked and will not
trigger a merge since there is an ongoing merge triggered in t1.
+ * t44: c32-1(1GB, RU) c38-33(192MB, RUM) c39-39(32MB, RUM)
c40-40(32MB, RUM) c41-41(32MB, RU) c42-42(32MB, RU) c43-43(32MB, RU)
'c44-44(32MB, RU)'
+ * --> a thread which will add c44-44 (even if the disk component is
created, but not added to index instance disk components yet)
+ * will be blocked until the number of RU components <
maxToleranceComponentCount
+ * t45: c32-1(1GB, RU) *c40-33(256MB, RU)* c41-41(32MB, RU)
c42-42(32MB, RU) c43-43(32MB, RU) 'c44-44(32MB, RU)'
+ * --> a thread which completed the merge triggered in t1 added c40-33
and will go ahead and trigger the next merge with c40-33,c41-41,c42-42,c43-43.
+ * Still, the blocked thread will continue being blocked and the
c44-44 was not included in the merge since it's not added yet.
+ * t46: c32-1(1GB, RU) c40-33(256MB, RUM) c41-41(32MB, RUM)
c42-42(32MB, RUM) c43-43(32MB, RUM) c44-44(32MB, RUM)
+ * --> the merge triggered in t45 is going on and the merge unblocked
the blocked thread, so c44-44 was added.
+ * t47: c32-1(1GB, RU) *c43-33(320MB, RU)* c44-44(32MB, RUM)
+ * --> a thread completed the merge triggered in t45 and added c43-33.
+ * t48: c32-1(1GB, RU) c43-33(320MB, RU) c44-44(32MB, RUM)
c48-48(32MB, RU)
+ * --> a thread added c48-48 and will not be blocked and will trigger
a merge with c43-44, c44-44, c48-48.
+ * ... continues ...
+ * ----------------------------------------
+ * legend:
+ * For example, C32-1 represents a disk component, more specifically,
disk component name, where 32-1 represents a timestamp range from t1 to time
t32.
+ * This means that the component C32-1 is a component resulting from a
merge operation that merged components C1-1 to C32-32.
+ * This also implies that if two timestamps in a component name are
equal, the component has not been merged yet after it was created.
+ * RU and RUM are possible state of disk components, where RU
represents READABLE_UNWRITABLE and RUM represents READABLE_UNWRITABLE_MERGING.
+ * Now, c32-1(1GB, RU) represents a disk component resulted from
merging c1-1 ~ c32-32 and the component size is 1GB.
+ * ----------------------------------------
+ * The flow control allows at most maxToleranceComponentCount mergable
components,
+ * where the mergable components are disk components whose i) state ==
RU and ii) size < maxMergableComponentSize.
+ */
+
+ List<ILSMComponent> immutableComponents =
index.getImmutableComponents();
+ int totalImmutableComponentCount = immutableComponents.size();
+ int mergableImmutableComponentCount = 0;
+ int i = 0;
+ for (i = 0; i < totalImmutableComponentCount; i++) {
+ AbstractDiskLSMComponent c = (AbstractDiskLSMComponent)
immutableComponents.get(i);
+ if (c.getComponentSize() > maxMergableComponentSize) {
+ break;
+ }
+ if (c.getState() != ComponentState.READABLE_UNWRITABLE) {
+ break;
+ }
+ ++mergableImmutableComponentCount;
+ if (mergableImmutableComponentCount == maxToleranceComponentCount)
{
+ return true;
+ }
+ }
+ return false;
+ }
}
--
To view, visit https://asterix-gerrit.ics.uci.edu/795
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: Ide99c022861f96cd60bc8f5795c4964ab02b3e14
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Young-Seok Kim <[email protected]>