This is an automated email from the ASF dual-hosted git repository.

alsuliman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 4cced3e  [NO ISSUE][STO] Fix search when switching from memory to disk 
component
     new f2b2ac1  Merge branch 'gerrit/neo'
4cced3e is described below

commit 4cced3e30c31d8330e69f0a10f5b0a46ee13d7bf
Author: Ali Alsuliman <ali.al.solai...@gmail.com>
AuthorDate: Mon Jan 24 20:40:17 2022 -0800

    [NO ISSUE][STO] Fix search when switching from memory to disk component
    
    - user model changes: no
    - storage format changes: no
    - interface changes: no
    
    Details:
    - When searching the index and making the switch from the memory
      components to the disk components, keep the states of the queue and
      the cursors on the switched-to disk components the same as their
      states were on the memory components. If a cursor was the one who
      produced the outputElement, then do not push the next element into
      the queue from the cursor since there should not be an element in
      the queue from this cursor. Restart the search operation at the
      elements that the cursors were at and consume them since they were
      already consumed before we make the switch.
    
    - add test case.
    
    Change-Id: I647641f6044c1edf1477049be1c5d1b697f404c1
    Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/14885
    Tested-by: Jenkins <jenk...@fulliautomatix.ics.uci.edu>
    Integration-Tests: Jenkins <jenk...@fulliautomatix.ics.uci.edu>
    Reviewed-by: Murtadha Hubail <mhub...@apache.org>
---
 .../asterix/app/bootstrap/TestNodeController.java  | 36 +++++----
 .../dataflow/SearchCursorComponentSwitchTest.java  | 87 +++++++++++++++++++++-
 .../lsm/btree/impls/LSMBTreeRangeSearchCursor.java | 35 +++++++--
 .../common/impls/ComponentReplacementContext.java  |  2 +-
 .../am/lsm/common/impls/LSMIndexSearchCursor.java  |  3 +-
 5 files changed, 138 insertions(+), 25 deletions(-)

diff --git 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
index f40628fb..0b80881 100644
--- 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
+++ 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
@@ -52,6 +52,7 @@ import org.apache.asterix.metadata.utils.DatasetUtil;
 import org.apache.asterix.metadata.utils.MetadataUtil;
 import org.apache.asterix.metadata.utils.SplitsAndConstraintsUtil;
 import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.om.types.BuiltinType;
 import org.apache.asterix.om.types.IAType;
 import org.apache.asterix.runtime.formats.FormatUtils;
 import org.apache.asterix.runtime.formats.NonTaggedDataFormat;
@@ -66,6 +67,8 @@ import 
org.apache.asterix.transaction.management.runtime.CommitRuntime;
 import org.apache.asterix.transaction.management.service.logging.LogReader;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.data.ISerializerDeserializerProvider;
+import org.apache.hyracks.algebricks.data.ITypeTraitProvider;
 import org.apache.hyracks.algebricks.runtime.base.IPushRuntime;
 import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
 import org.apache.hyracks.algebricks.runtime.operators.base.SinkRuntimeFactory;
@@ -813,10 +816,10 @@ public class TestNodeController {
         RecordDescriptor upsertOutRecDesc = 
getUpsertOutRecDesc(primaryIndexInfo.rDesc, dataset,
                 filterFields == null ? 0 : filterFields.length, recordType, 
metaType);
         // fix pk fields
-        int diff = upsertOutRecDesc.getFieldCount() - 
primaryIndexInfo.rDesc.getFieldCount();
+        int start = 1 + (dataset.hasMetaPart() ? 2 : 1) + (filterFields == 
null ? 0 : filterFields.length);
         int[] pkFieldsInCommitOp = new int[dataset.getPrimaryKeys().size()];
         for (int i = 0; i < pkFieldsInCommitOp.length; i++) {
-            pkFieldsInCommitOp[i] = diff + i;
+            pkFieldsInCommitOp[i] = start++;
         }
         CommitRuntime commitOp = new CommitRuntime(ctx, getTxnJobId(ctx), 
dataset.getDatasetId(), pkFieldsInCommitOp,
                 true, ctx.getTaskAttemptId().getTaskId().getPartition(), true);
@@ -827,19 +830,26 @@ public class TestNodeController {
 
     private RecordDescriptor getUpsertOutRecDesc(RecordDescriptor 
inputRecordDesc, Dataset dataset, int numFilterFields,
             ARecordType itemType, ARecordType metaItemType) throws Exception {
-        ITypeTraits[] outputTypeTraits =
-                new ITypeTraits[inputRecordDesc.getFieldCount() + 
(dataset.hasMetaPart() ? 2 : 1) + numFilterFields];
-        ISerializerDeserializer<?>[] outputSerDes = new 
ISerializerDeserializer[inputRecordDesc.getFieldCount()
-                + (dataset.hasMetaPart() ? 2 : 1) + numFilterFields];
+        // 1 boolean field at the beginning to indicate whether the operation 
was upsert or delete
+        int numOutFields = 1 + (dataset.hasMetaPart() ? 2 : 1) + 
numFilterFields + inputRecordDesc.getFieldCount();
+        ITypeTraits[] outputTypeTraits = new ITypeTraits[numOutFields];
+        ISerializerDeserializer<?>[] outputSerDes = new 
ISerializerDeserializer[numOutFields];
 
-        // add the previous record first
+        ISerializerDeserializerProvider serdeProvider = 
FormatUtils.getDefaultFormat().getSerdeProvider();
+        ITypeTraitProvider typeTraitProvider = 
FormatUtils.getDefaultFormat().getTypeTraitProvider();
         int f = 0;
-        outputSerDes[f] = 
FormatUtils.getDefaultFormat().getSerdeProvider().getSerializerDeserializer(itemType);
+        // add the upsert indicator boolean field
+        outputSerDes[f] = 
serdeProvider.getSerializerDeserializer(BuiltinType.AINT8);
+        outputTypeTraits[f] = 
typeTraitProvider.getTypeTrait(BuiltinType.AINT8);
+        f++;
+        // add the previous record
+        outputSerDes[f] = serdeProvider.getSerializerDeserializer(itemType);
+        outputTypeTraits[f] = typeTraitProvider.getTypeTrait(itemType);
         f++;
         // add the previous meta second
         if (dataset.hasMetaPart()) {
-            outputSerDes[f] = 
FormatUtils.getDefaultFormat().getSerdeProvider().getSerializerDeserializer(metaItemType);
-            outputTypeTraits[f] = 
FormatUtils.getDefaultFormat().getTypeTraitProvider().getTypeTrait(metaItemType);
+            outputSerDes[f] = 
serdeProvider.getSerializerDeserializer(metaItemType);
+            outputTypeTraits[f] = typeTraitProvider.getTypeTrait(metaItemType);
             f++;
         }
         // add the previous filter third
@@ -854,10 +864,8 @@ public class TestNodeController {
                 }
             }
             fieldIdx = i;
-            outputTypeTraits[f] = 
FormatUtils.getDefaultFormat().getTypeTraitProvider()
-                    .getTypeTrait(itemType.getFieldTypes()[fieldIdx]);
-            outputSerDes[f] = FormatUtils.getDefaultFormat().getSerdeProvider()
-                    
.getSerializerDeserializer(itemType.getFieldTypes()[fieldIdx]);
+            outputTypeTraits[f] = 
typeTraitProvider.getTypeTrait(itemType.getFieldTypes()[fieldIdx]);
+            outputSerDes[f] = 
serdeProvider.getSerializerDeserializer(itemType.getFieldTypes()[fieldIdx]);
             f++;
         }
         for (int j = 0; j < inputRecordDesc.getFieldCount(); j++) {
diff --git 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/SearchCursorComponentSwitchTest.java
 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/SearchCursorComponentSwitchTest.java
index 24379a3..c5292d3 100644
--- 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/SearchCursorComponentSwitchTest.java
+++ 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/SearchCursorComponentSwitchTest.java
@@ -19,7 +19,6 @@
 package org.apache.asterix.test.dataflow;
 
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 
@@ -45,6 +44,7 @@ import org.apache.asterix.om.types.ARecordType;
 import org.apache.asterix.om.types.BuiltinType;
 import org.apache.asterix.om.types.IAType;
 import 
org.apache.asterix.runtime.operators.LSMPrimaryInsertOperatorNodePushable;
+import 
org.apache.asterix.runtime.operators.LSMPrimaryUpsertOperatorNodePushable;
 import org.apache.asterix.test.common.TestHelper;
 import org.apache.asterix.test.dataflow.StorageTestUtils.Searcher;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
@@ -53,18 +53,28 @@ import org.apache.hyracks.api.comm.VSizeFrame;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.util.CleanupUtils;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
 import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
+import org.apache.hyracks.dataflow.common.utils.TupleUtils;
+import org.apache.hyracks.storage.am.btree.impls.RangePredicate;
+import org.apache.hyracks.storage.am.btree.util.BTreeUtils;
 import org.apache.hyracks.storage.am.common.api.IIndexDataflowHelper;
 import 
org.apache.hyracks.storage.am.common.dataflow.IndexDataflowHelperFactory;
 import org.apache.hyracks.storage.am.common.impls.NoOpIndexAccessParameters;
+import org.apache.hyracks.storage.am.common.util.ResourceReleaseUtils;
 import org.apache.hyracks.storage.am.lsm.btree.impl.AllowTestOpCallback;
 import org.apache.hyracks.storage.am.lsm.btree.impl.TestLsmBtree;
+import org.apache.hyracks.storage.am.lsm.btree.impls.LSMBTreeSearchCursor;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
 import 
org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation.LSMIOOperationStatus;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
+import org.apache.hyracks.storage.am.lsm.common.impls.LSMIndexSearchCursor;
+import org.apache.hyracks.storage.am.lsm.common.impls.LSMTreeIndexAccessor;
 import org.apache.hyracks.storage.am.lsm.common.impls.NoMergePolicyFactory;
+import org.apache.hyracks.storage.common.IIndexCursor;
+import org.apache.hyracks.storage.common.MultiComparator;
 import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Assert;
@@ -84,7 +94,7 @@ public class SearchCursorComponentSwitchTest {
     private static final boolean[] UNIQUE_META_FIELDS = null;
     private static final int[] KEY_INDEXES = { 0 };
     private static final int[] KEY_INDICATORS = { Index.RECORD_INDICATOR };
-    private static final List<Integer> KEY_INDICATORS_LIST = Arrays.asList(new 
Integer[] { Index.RECORD_INDICATOR });
+    private static final List<Integer> KEY_INDICATORS_LIST = 
List.of(Index.RECORD_INDICATOR);
     private static final int TOTAL_NUM_OF_RECORDS = 2000;
     private static final int RECORDS_PER_COMPONENT = 1000;
     private static final int DATASET_ID = 101;
@@ -102,6 +112,7 @@ public class SearchCursorComponentSwitchTest {
     private static IIndexDataflowHelper indexDataflowHelper;
     private static ITransactionContext txnCtx;
     private static LSMPrimaryInsertOperatorNodePushable insertOp;
+    private static LSMPrimaryUpsertOperatorNodePushable upsertOp;
 
     @BeforeClass
     public static void setUp() throws Exception {
@@ -143,6 +154,8 @@ public class SearchCursorComponentSwitchTest {
                 new 
TransactionOptions(ITransactionManager.AtomicityLevel.ENTITY_LEVEL));
         insertOp = nc.getInsertPipeline(ctx, dataset, KEY_TYPES, RECORD_TYPE, 
META_TYPE, null, KEY_INDEXES,
                 KEY_INDICATORS_LIST, storageManager, null, null).getLeft();
+        upsertOp = nc.getUpsertPipeline(ctx, dataset, KEY_TYPES, RECORD_TYPE, 
META_TYPE, null, KEY_INDEXES,
+                KEY_INDICATORS_LIST, storageManager, null, false).getLeft();
     }
 
     @After
@@ -202,6 +215,63 @@ public class SearchCursorComponentSwitchTest {
     }
 
     @Test
+    public void testCursorSwitchSucceedWithNoDuplicates() {
+        try {
+            StorageTestUtils.allowAllOps(lsmBtree);
+            lsmBtree.clearSearchCallbacks();
+            RecordTupleGenerator tupleGenerator = new 
RecordTupleGenerator(RECORD_TYPE, META_TYPE, KEY_INDEXES,
+                    KEY_INDICATORS, RECORD_GEN_FUNCTION, UNIQUE_RECORD_FIELDS, 
META_GEN_FUNCTION, UNIQUE_META_FIELDS);
+            VSizeFrame frame = new VSizeFrame(ctx);
+            FrameTupleAppender tupleAppender = new FrameTupleAppender(frame);
+            int totalNumRecords = LSMIndexSearchCursor.SWITCH_COMPONENT_CYCLE 
+ 2;
+            ITupleReference[] upsertTuples = new 
ITupleReference[totalNumRecords];
+            for (int j = 0; j < totalNumRecords; j++) {
+                ITupleReference tuple = tupleGenerator.next();
+                upsertTuples[j] = TupleUtils.copyTuple(tuple);
+            }
+
+            // upsert and flush the tuples to create a disk component
+            upsert(tupleAppender, totalNumRecords, upsertTuples, true);
+            // upsert but don't flush the tuples to create a memory component
+            upsert(tupleAppender, totalNumRecords, upsertTuples, false);
+
+            // do the search operation
+            ILSMIndexAccessor accessor = new 
LSMTreeIndexAccessor(lsmBtree.getHarness(),
+                    
lsmBtree.createOpContext(NoOpIndexAccessParameters.INSTANCE), 
LSMBTreeSearchCursor::new);
+            IIndexCursor searchCursor = accessor.createSearchCursor(false);
+            MultiComparator lowKeySearchCmp =
+                    
BTreeUtils.getSearchMultiComparator(lsmBtree.getComparatorFactories(), null);
+            MultiComparator highKeySearchCmp =
+                    
BTreeUtils.getSearchMultiComparator(lsmBtree.getComparatorFactories(), null);
+            RangePredicate rangePredicate =
+                    new RangePredicate(null, null, true, true, 
lowKeySearchCmp, highKeySearchCmp, null, null);
+
+            accessor.search(searchCursor, rangePredicate);
+
+            int count = 0;
+            while (searchCursor.hasNext()) {
+                searchCursor.next();
+                count++;
+                // flush the memory component to disk so that we make the 
switch to it when we hit the switch cycle
+                if (count == 1) {
+                    StorageTestUtils.flush(dsLifecycleMgr, lsmBtree, dataset, 
false);
+                }
+            }
+
+            Throwable failure = ResourceReleaseUtils.close(searchCursor, null);
+            failure = CleanupUtils.destroy(failure, searchCursor, accessor);
+            Assert.assertEquals("Records count not matching", totalNumRecords, 
count);
+            if (failure != null) {
+                Assert.fail(failure.getMessage());
+            }
+            nc.getTransactionManager().commitTransaction(txnCtx.getTxnId());
+        } catch (Throwable e) {
+            e.printStackTrace();
+            Assert.fail(e.getMessage());
+        }
+    }
+
+    @Test
     public void testCursorSwitchFails() {
         try {
             // allow all operations
@@ -268,4 +338,17 @@ public class SearchCursorComponentSwitchTest {
         emptyTupleOp.close();
         Assert.assertEquals(numOfRecords, countOp.getCount());
     }
+
+    private void upsert(FrameTupleAppender tupleAppender, int totalNumRecords, 
ITupleReference[] upsertTuples,
+            boolean flush) throws Exception {
+        upsertOp.open();
+        for (int j = 0; j < totalNumRecords; j++) {
+            DataflowUtils.addTupleToFrame(tupleAppender, upsertTuples[j], 
upsertOp);
+        }
+        tupleAppender.write(upsertOp, true);
+        if (flush) {
+            StorageTestUtils.flush(dsLifecycleMgr, lsmBtree, dataset, false);
+        }
+        upsertOp.close();
+    }
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeRangeSearchCursor.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeRangeSearchCursor.java
index 46d279f..2c5fb50 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeRangeSearchCursor.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeRangeSearchCursor.java
@@ -236,10 +236,15 @@ public class LSMBTreeRangeSearchCursor extends 
LSMIndexSearchCursor {
                     rangeCursors[i].close();
                     btreeAccessors[i].reset(btree, iap);
                     btreeAccessors[i].search(rangeCursors[i], reusablePred);
-                    
pushIntoQueueFromCursorAndReplaceThisElement(switchedElements[i]);
+                    // consume the element that we restarted the search at 
since before the switch it was consumed
+                    if (rangeCursors[i].hasNext()) {
+                        rangeCursors[i].next();
+                        switchedElements[i].reset(rangeCursors[i].getTuple());
+                    }
                 }
             }
             switchRequest[i] = false;
+            switchedElements[i] = null;
             // any failed switch makes further switches pointless
             switchPossible = switchPossible && 
operationalComponents.get(i).getType() == LSMComponentType.DISK;
         }
@@ -264,14 +269,18 @@ public class LSMBTreeRangeSearchCursor extends 
LSMIndexSearchCursor {
                 if (replaceFrom < 0) {
                     replaceFrom = i;
                 }
-                // we return the outputElement to the priority queue if it 
came from this component
+
+                PriorityQueueElement element;
                 if (outputElement != null && outputElement.getCursorIndex() == 
i) {
-                    
pushIntoQueueFromCursorAndReplaceThisElement(outputElement);
-                    needPushElementIntoQueue = false;
-                    outputElement = null;
-                    canCallProceed = true;
+                    // there should be no element from this cursor in the 
queue since the element was polled
+                    if (findElement(outputPriorityQueue, i) != null) {
+                        throw new IllegalStateException("found element in the 
queue from the cursor of output element");
+                    }
+                    element = outputElement;
+                } else {
+                    element = findElement(outputPriorityQueue, i);
                 }
-                PriorityQueueElement element = remove(outputPriorityQueue, i);
+
                 // if this cursor is still active (has an element)
                 // then we copy the search key to restart the operation after
                 // replacing the component
@@ -331,6 +340,18 @@ public class LSMBTreeRangeSearchCursor extends 
LSMIndexSearchCursor {
         return null;
     }
 
+    private PriorityQueueElement 
findElement(PriorityQueue<PriorityQueueElement> outputPriorityQueue, int 
cursorIndex) {
+        // Scans the PQ for the component's element
+        Iterator<PriorityQueueElement> it = outputPriorityQueue.iterator();
+        while (it.hasNext()) {
+            PriorityQueueElement e = it.next();
+            if (e.getCursorIndex() == cursorIndex) {
+                return e;
+            }
+        }
+        return null;
+    }
+
     @Override
     public void doOpen(ICursorInitialState initialState, ISearchPredicate 
searchPred) throws HyracksDataException {
         LSMBTreeCursorInitialState lsmInitialState = 
(LSMBTreeCursorInitialState) initialState;
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ComponentReplacementContext.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ComponentReplacementContext.java
index 70f6d9e..5b0363a 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ComponentReplacementContext.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ComponentReplacementContext.java
@@ -193,7 +193,7 @@ public class ComponentReplacementContext implements 
ILSMIndexOperationContext {
             for (int i = 0; i < count; i++) {
                 ILSMComponent removed = 
ctx.getComponentHolder().remove(swapIndexes[i]);
                 if (removed.getType() == LSMComponentType.MEMORY) {
-                    LOGGER.info("Removed a memory component from the search 
operation");
+                    LOGGER.debug("Removed memory component {} from the search 
operation", removed);
                 } else {
                     throw new IllegalStateException("Disk components can't be 
removed from the search operation");
                 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexSearchCursor.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexSearchCursor.java
index b6f6e26..27875c0 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexSearchCursor.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexSearchCursor.java
@@ -40,7 +40,7 @@ import org.apache.hyracks.storage.common.IIndexCursorStats;
 import org.apache.hyracks.storage.common.MultiComparator;
 
 public abstract class LSMIndexSearchCursor extends EnforcedIndexCursor 
implements ILSMIndexCursor {
-    protected static final int SWITCH_COMPONENT_CYCLE = 100;
+    public static final int SWITCH_COMPONENT_CYCLE = 100;
     protected final ILSMIndexOperationContext opCtx;
     protected final boolean returnDeletedTuples;
     protected PriorityQueueElement outputElement;
@@ -119,6 +119,7 @@ public abstract class LSMIndexSearchCursor extends 
EnforcedIndexCursor implement
         needPushElementIntoQueue = false;
         for (int i = 0; i < switchRequest.length; i++) {
             switchRequest[i] = false;
+            switchedElements[i] = null;
         }
         try {
             if (outputPriorityQueue != null) {

Reply via email to