Young-Seok Kim has uploaded a new change for review.

  https://asterix-gerrit.ics.uci.edu/795

Change subject: PLEASE EDIT to provide a meaningful commit message!
......................................................................

PLEASE EDIT to provide a meaningful commit message!

Fix ASTERIXDB-1011 issue: added flow control for merge policy

Change-Id: Ide99c022861f96cd60bc8f5795c4964ab02b3e14
---
M 
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicy.java
M 
hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMMergePolicy.java
M 
hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ConstantMergePolicy.java
M 
hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
M 
hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/NoMergePolicy.java
M 
hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/PrefixMergePolicy.java
6 files changed, 130 insertions(+), 13 deletions(-)


  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb 
refs/changes/95/795/1

diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicy.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicy.java
index a7374d3..3d112ef 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicy.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicy.java
@@ -46,13 +46,13 @@
     private final int datasetID;
 
     public CorrelatedPrefixMergePolicy(IIndexLifecycleManager 
datasetLifecycleManager, int datasetID) {
-        this.datasetLifecycleManager = 
(DatasetLifecycleManager)datasetLifecycleManager;
+        this.datasetLifecycleManager = (DatasetLifecycleManager) 
datasetLifecycleManager;
         this.datasetID = datasetID;
     }
 
     @Override
-    public void diskComponentAdded(final ILSMIndex index, boolean 
fullMergeIsRequested) throws HyracksDataException,
-            IndexException {
+    public void diskComponentAdded(final ILSMIndex index, boolean 
fullMergeIsRequested)
+            throws HyracksDataException, IndexException {
         // This merge policy will only look at primary indexes in order to 
evaluate if a merge operation is needed. If it decides that
         // a merge operation is needed, then it will merge *all* the indexes 
that belong to the dataset. The criteria to decide if a merge
         // is needed is the same as the one that is used in the prefix merge 
policy:
@@ -113,8 +113,8 @@
                     // Reverse the components order back to its original order
                     Collections.reverse(mergableComponents);
 
-                    ILSMIndexAccessor accessor = lsmIndex.createAccessor(
-                            NoOpOperationCallback.INSTANCE, 
NoOpOperationCallback.INSTANCE);
+                    ILSMIndexAccessor accessor = 
lsmIndex.createAccessor(NoOpOperationCallback.INSTANCE,
+                            NoOpOperationCallback.INSTANCE);
                     accessor.scheduleMerge(lsmIndex.getIOOperationCallback(), 
mergableComponents);
                 }
                 break;
@@ -127,4 +127,10 @@
         maxMergableComponentSize = 
Long.parseLong(properties.get("max-mergable-component-size"));
         maxToleranceComponentCount = 
Integer.parseInt(properties.get("max-tolerance-component-count"));
     }
+
+    @Override
+    public boolean isMergeLagging(ILSMIndex index) {
+        //TODO implement properly according to the merge policy
+        return false;
+    }
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMMergePolicy.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMMergePolicy.java
index 841117b..4afe4f6 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMMergePolicy.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMMergePolicy.java
@@ -25,8 +25,10 @@
 import org.apache.hyracks.storage.am.common.api.IndexException;
 
 public interface ILSMMergePolicy {
-    public void diskComponentAdded(ILSMIndex index, boolean 
fullMergeIsRequested) throws HyracksDataException,
-            IndexException;
+    public void diskComponentAdded(ILSMIndex index, boolean 
fullMergeIsRequested)
+            throws HyracksDataException, IndexException;
 
     public void configure(Map<String, String> properties);
+
+    public boolean isMergeLagging(ILSMIndex index);
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ConstantMergePolicy.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ConstantMergePolicy.java
index 93d6fd4..7176122 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ConstantMergePolicy.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ConstantMergePolicy.java
@@ -35,8 +35,8 @@
     private int numComponents;
 
     @Override
-    public void diskComponentAdded(final ILSMIndex index, boolean 
fullMergeIsRequested) throws HyracksDataException,
-            IndexException {
+    public void diskComponentAdded(final ILSMIndex index, boolean 
fullMergeIsRequested)
+            throws HyracksDataException, IndexException {
         List<ILSMComponent> immutableComponents = 
index.getImmutableComponents();
         for (ILSMComponent c : immutableComponents) {
             if (c.getState() != ComponentState.READABLE_UNWRITABLE) {
@@ -58,4 +58,25 @@
     public void configure(Map<String, String> properties) {
         numComponents = Integer.parseInt(properties.get("num-components"));
     }
+
+    @Override
+    public boolean isMergeLagging(ILSMIndex index) {
+        // see PrefixMergePolicy.isMergeLagging() for the rationale behind 
this code.
+
+        List<ILSMComponent> immutableComponents = 
index.getImmutableComponents();
+        int totalImmutableComponentCount = immutableComponents.size();
+        int mergableImmutableComponentCount = 0;
+        int i = 0;
+        for (i = 0; i < totalImmutableComponentCount; i++) {
+            AbstractDiskLSMComponent c = (AbstractDiskLSMComponent) 
immutableComponents.get(i);
+            if (c.getState() != ComponentState.READABLE_UNWRITABLE) {
+                break;
+            }
+            ++mergableImmutableComponentCount;
+            if (mergableImmutableComponentCount == numComponents) {
+                return true;
+            }
+        }
+        return false;
+    }
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
index a19532f..b58cc29 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
@@ -200,6 +200,25 @@
         try {
             synchronized (opTracker) {
                 try {
+
+                    /**
+                     * [flow control]
+                     * If merge operations are lagged according to the merge 
policy,
+                     * flushing in-memory components are hold until the merge 
operation catches up.
+                     * See PrefixMergePolicy.isMergeLagging() for more details.
+                     */
+                    if (opType == LSMOperationType.FLUSH) {
+                        while (mergePolicy.isMergeLagging(lsmIndex)) {
+                            try {
+                                opTracker.wait();
+                            } catch (InterruptedException e) {
+                                //ignore
+                            }
+                        }
+                    } else if (opType == LSMOperationType.MERGE) {
+                        opTracker.notifyAll();
+                    }
+
                     int i = 0;
                     // First check if there is any action that is needed to be 
taken based on the state of each component.
                     for (ILSMComponent c : ctx.getComponentHolder()) {
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/NoMergePolicy.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/NoMergePolicy.java
index 39ab815..86be9c8 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/NoMergePolicy.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/NoMergePolicy.java
@@ -28,8 +28,8 @@
 public class NoMergePolicy implements ILSMMergePolicy {
 
     @Override
-    public void diskComponentAdded(final ILSMIndex index, boolean 
fullMergeIsRequested) throws HyracksDataException,
-            IndexException {
+    public void diskComponentAdded(final ILSMIndex index, boolean 
fullMergeIsRequested)
+            throws HyracksDataException, IndexException {
         // Do nothing
     }
 
@@ -37,4 +37,9 @@
     public void configure(Map<String, String> properties) {
         // Do nothing
     }
+
+    @Override
+    public boolean isMergeLagging(ILSMIndex index) {
+        return false;
+    }
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/PrefixMergePolicy.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/PrefixMergePolicy.java
index 5b8da53..4409759 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/PrefixMergePolicy.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/PrefixMergePolicy.java
@@ -39,8 +39,8 @@
     private int maxToleranceComponentCount;
 
     @Override
-    public void diskComponentAdded(final ILSMIndex index, boolean 
fullMergeIsRequested) throws HyracksDataException,
-            IndexException {
+    public void diskComponentAdded(final ILSMIndex index, boolean 
fullMergeIsRequested)
+            throws HyracksDataException, IndexException {
         // 1.  Look at the candidate components for merging in oldest-first 
order.  If one exists, identify the prefix of the sequence of
         // all such components for which the sum of their sizes exceeds 
MaxMrgCompSz.  Schedule a merge of those components into a new component.
         // 2.  If a merge from 1 doesn't happen, see if the set of candidate 
components for merging exceeds MaxTolCompCnt.  If so, schedule
@@ -93,4 +93,68 @@
         maxMergableComponentSize = 
Long.parseLong(properties.get("max-mergable-component-size"));
         maxToleranceComponentCount = 
Integer.parseInt(properties.get("max-tolerance-component-count"));
     }
+
+    @Override
+    public boolean isMergeLagging(ILSMIndex index) {
+
+        /**
+         * [for flow-control purpose]
+         * when merge operations are lagged, threads which flushed components 
will be blocked
+         * until merge operations catch up, i.e, until the number of mergable 
immutable components <= maxToleranceComponentCount
+         * example:
+         * suppose that maxToleranceComponentCount = 3 and 
maxMergableComponentSize = 1GB
+         * The following shows a set of events occurred in time ti with a 
brief description.
+         * time
+         * t40: c32-1(1GB, RU) c38-33(192MB, RU) c39-39(32MB, RU) c40-40(32MB, 
RU)
+         * --> a thread which added c40-40 will trigger a merge including 
c38-33,c39-39,c40-40
+         * t41: c32-1(1GB, RU) c38-33(192MB, RUM) c39-39(32MB, RUM) 
c40-40(32MB, RUM) c41-41(32MB, RU)
+         * --> a thread which added c41-41 will not be blocked
+         * t42: c32-1(1GB, RU) c38-33(192MB, RUM) c39-39(32MB, RUM) 
c40-40(32MB, RUM) c41-41(32MB, RU) c42-42(32MB, RU)
+         * --> a thread which added c42-42 will not be blocked
+         * t43: c32-1(1GB, RU) c38-33(192MB, RUM) c39-39(32MB, RUM) 
c40-40(32MB, RUM) c41-41(32MB, RU) c42-42(32MB, RU) c43-43(32MB, RU)
+         * --> a thread which added c43-43 will not be blocked and will not 
trigger a merge since there is an ongoing merge triggered in t1.
+         * t44: c32-1(1GB, RU) c38-33(192MB, RUM) c39-39(32MB, RUM) 
c40-40(32MB, RUM) c41-41(32MB, RU) c42-42(32MB, RU) c43-43(32MB, RU) 
'c44-44(32MB, RU)'
+         * --> a thread which will add c44-44 (even if the disk component is 
created, but not added to index instance disk components yet)
+         * will be blocked until the number of RU components < 
maxToleranceComponentCount
+         * t45: c32-1(1GB, RU) *c40-33(256MB, RU)* c41-41(32MB, RU) 
c42-42(32MB, RU) c43-43(32MB, RU) 'c44-44(32MB, RU)'
+         * --> a thread which completed the merge triggered in t1 added c40-33 
and will go ahead and trigger the next merge with c40-33,c41-41,c42-42,c43-43.
+         * Still, the blocked thread will continue being blocked and the 
c44-44 was not included in the merge since it's not added yet.
+         * t46: c32-1(1GB, RU) c40-33(256MB, RUM) c41-41(32MB, RUM) 
c42-42(32MB, RUM) c43-43(32MB, RUM) c44-44(32MB, RUM)
+         * --> the merge triggered in t45 is going on and the merge unblocked 
the blocked thread, so c44-44 was added.
+         * t47: c32-1(1GB, RU) *c43-33(320MB, RU)* c44-44(32MB, RUM)
+         * --> a thread completed the merge triggered in t45 and added c43-33.
+         * t48: c32-1(1GB, RU) c43-33(320MB, RU) c44-44(32MB, RUM) 
c48-48(32MB, RU)
+         * --> a thread added c48-48 and will not be blocked and will trigger 
a merge with c43-44, c44-44, c48-48.
+         * ... continues ...
+         * ----------------------------------------
+         * legend:
+         * For example, C32-1 represents a disk component, more specifically, 
disk component name, where 32-1 represents a timestamp range from t1 to time 
t32.
+         * This means that the component C32-1 is a component resulting from a 
merge operation that merged components C1-1 to C32-32.
+         * This also implies that if two timestamps in a component name are 
equal, the component has not been merged yet after it was created.
+         * RU and RUM are possible state of disk components, where RU 
represents READABLE_UNWRITABLE and RUM represents READABLE_UNWRITABLE_MERGING.
+         * Now, c32-1(1GB, RU) represents a disk component resulted from 
merging c1-1 ~ c32-32 and the component size is 1GB.
+         * ----------------------------------------
+         * The flow control allows at most maxToleranceComponentCount mergable 
components,
+         * where the mergable components are disk components whose i) state == 
RU and ii) size < maxMergableComponentSize.
+         */
+
+        List<ILSMComponent> immutableComponents = 
index.getImmutableComponents();
+        int totalImmutableComponentCount = immutableComponents.size();
+        int mergableImmutableComponentCount = 0;
+        int i = 0;
+        for (i = 0; i < totalImmutableComponentCount; i++) {
+            AbstractDiskLSMComponent c = (AbstractDiskLSMComponent) 
immutableComponents.get(i);
+            if (c.getComponentSize() > maxMergableComponentSize) {
+                break;
+            }
+            if (c.getState() != ComponentState.READABLE_UNWRITABLE) {
+                break;
+            }
+            ++mergableImmutableComponentCount;
+            if (mergableImmutableComponentCount == maxToleranceComponentCount) 
{
+                return true;
+            }
+        }
+        return false;
+    }
 }

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/795
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: Ide99c022861f96cd60bc8f5795c4964ab02b3e14
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Young-Seok Kim <[email protected]>

Reply via email to