[ASTERIXDB-2161][TEST] Add indexes to MultiPartitionTest

- user model changes: no
- storage format changes: no
- interface changes: no

Details:
- This change adds secondary indexes to multi partition
  LSM indexes tests.
- This enables testing of specific concurrency scenarios
  and ensuring properties stored in primary and secondary
  indexes are consistent.
- In addition, the call for flushDataset in
  DatasetLifecycleManager now throws an
  IllegalStateException if the number of active
  operations is not 0. Some tests used to call this
  function when there are ongoing operations and that
  is expected to never be the case in the actual system.

Change-Id: I5aea71a87f149b01f6c7310867fc15b5a340b93c
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2173
Tested-by: Jenkins <jenk...@fulliautomatix.ics.uci.edu>
Reviewed-by: Luo Chen <cl...@uci.edu>
Integration-Tests: Jenkins <jenk...@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <jenk...@fulliautomatix.ics.uci.edu>


Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/c5a0a197
Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/c5a0a197
Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/c5a0a197

Branch: refs/heads/master
Commit: c5a0a1974d36d647a22d606d53bdaafd85f641df
Parents: f9e6bae
Author: Abdullah Alamoudi <bamou...@gmail.com>
Authored: Mon Nov 27 11:53:30 2017 -0800
Committer: abdullah alamoudi <bamou...@gmail.com>
Committed: Mon Nov 27 13:11:06 2017 -0800

----------------------------------------------------------------------
 .../app/bootstrap/TestNodeController.java       | 213 ++++++-
 .../test/dataflow/ComponentRollbackTest.java    | 139 +++--
 .../asterix/test/dataflow/LogMarkerTest.java    |   2 +-
 .../dataflow/MultiPartitionLSMIndexTest.java    | 561 +++++++++++++++++--
 .../SearchCursorComponentSwitchTest.java        |  21 +-
 .../TestLsmBTreeResourceFactoryProvider.java    |   4 +
 .../TestLsmBtreeIoOpCallbackFactory.java        |  40 ++
 .../TestPrimaryIndexOperationTracker.java       |  68 +++
 ...TestPrimaryIndexOperationTrackerFactory.java |  68 +++
 .../asterix/test/logging/CheckpointingTest.java |   2 +-
 .../asterix/test/storage/DiskIsFullTest.java    |   2 +-
 .../multi-partition-test-configuration.xml      | 112 ++++
 .../common/context/DatasetLifecycleManager.java |  25 +-
 .../lsm/common/api/ILSMIOOperationCallback.java |   1 +
 .../am/lsm/btree/impl/ITestOpCallback.java      |   6 +-
 .../btree/impl/IVirtualBufferCacheCallback.java |  23 +
 .../storage/am/lsm/btree/impl/TestLsmBtree.java | 182 +++++-
 .../btree/impl/TestLsmBtreeLocalResource.java   |   7 +
 .../btree/impl/TestLsmBtreeSearchCursor.java    |   5 +-
 .../lsm/btree/impl/TestVirtualBufferCache.java  | 215 +++++++
 20 files changed, 1539 insertions(+), 157 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/asterixdb/blob/c5a0a197/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
index 72a9b44..352a5f8 100644
--- 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
+++ 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
@@ -69,6 +69,8 @@ import 
org.apache.asterix.transaction.management.service.logging.LogReader;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.runtime.base.IPushRuntime;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import 
org.apache.hyracks.algebricks.runtime.operators.std.AssignRuntimeFactory;
 import 
org.apache.hyracks.algebricks.runtime.operators.std.EmptyTupleSourceRuntimeFactory;
 import org.apache.hyracks.api.comm.IFrameWriter;
 import org.apache.hyracks.api.comm.VSizeFrame;
@@ -89,6 +91,7 @@ import org.apache.hyracks.api.job.JobSpecification;
 import org.apache.hyracks.api.util.HyracksConstants;
 import org.apache.hyracks.dataflow.common.utils.TaskUtil;
 import org.apache.hyracks.dataflow.std.file.ConstantFileSplitProvider;
+import org.apache.hyracks.dataflow.std.file.IFileSplitProvider;
 import 
org.apache.hyracks.storage.am.btree.dataflow.BTreeSearchOperatorDescriptor;
 import 
org.apache.hyracks.storage.am.btree.dataflow.BTreeSearchOperatorNodePushable;
 import org.apache.hyracks.storage.am.common.api.IIndexBuilder;
@@ -166,33 +169,93 @@ public class TestNodeController {
     public Pair<LSMInsertDeleteOperatorNodePushable, CommitRuntime> 
getInsertPipeline(IHyracksTaskContext ctx,
             Dataset dataset, IAType[] primaryKeyTypes, ARecordType recordType, 
ARecordType metaType, int[] filterFields,
             int[] primaryKeyIndexes, List<Integer> primaryKeyIndicators,
-            StorageComponentProvider storageComponentProvider)
+            StorageComponentProvider storageComponentProvider, Index 
secondaryIndex)
             throws AlgebricksException, HyracksDataException, RemoteException, 
ACIDException {
-        MetadataTransactionContext mdTxnCtx = 
MetadataManager.INSTANCE.beginTransaction();
-        
org.apache.hyracks.algebricks.common.utils.Pair<ILSMMergePolicyFactory, 
Map<String, String>> mergePolicy =
-                DatasetUtil.getMergePolicyFactory(dataset, mdTxnCtx);
-        MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
-        PrimaryIndexInfo primaryIndexInfo = new PrimaryIndexInfo(dataset, 
primaryKeyTypes, recordType, metaType,
-                mergePolicy.first, mergePolicy.second, filterFields, 
primaryKeyIndexes, primaryKeyIndicators);
-        IndexOperation op = IndexOperation.INSERT;
-        IModificationOperationCallbackFactory modOpCallbackFactory =
-                new 
PrimaryIndexModificationOperationCallbackFactory(getTxnJobId(ctx), 
dataset.getDatasetId(),
-                        primaryIndexInfo.primaryKeyIndexes, 
TXN_SUBSYSTEM_PROVIDER, Operation.get(op),
-                        ResourceType.LSM_BTREE);
-        IRecordDescriptorProvider recordDescProvider = 
primaryIndexInfo.getInsertRecordDescriptorProvider();
-        IIndexDataflowHelperFactory indexHelperFactory = new 
IndexDataflowHelperFactory(
-                storageComponentProvider.getStorageManager(), 
primaryIndexInfo.getFileSplitProvider());
-        LSMInsertDeleteOperatorNodePushable insertOp =
-                new LSMInsertDeleteOperatorNodePushable(ctx, 
ctx.getTaskAttemptId().getTaskId().getPartition(),
-                        primaryIndexInfo.primaryIndexInsertFieldsPermutations,
-                        recordDescProvider.getInputRecordDescriptor(new 
ActivityId(new OperatorDescriptorId(0), 0), 0),
-                        op, true, indexHelperFactory, modOpCallbackFactory, 
null);
-        CommitRuntime commitOp =
-                new CommitRuntime(ctx, getTxnJobId(ctx), 
dataset.getDatasetId(), primaryIndexInfo.primaryKeyIndexes,
-                        true, 
ctx.getTaskAttemptId().getTaskId().getPartition(), true);
-        insertOp.setOutputFrameWriter(0, commitOp, primaryIndexInfo.rDesc);
-        commitOp.setInputRecordDescriptor(0, primaryIndexInfo.rDesc);
-        return Pair.of(insertOp, commitOp);
+        CcApplicationContext appCtx =
+                (CcApplicationContext) 
ExecutionTestUtil.integrationUtil.cc.getApplicationContext();
+        MetadataProvider mdProvider = new MetadataProvider(appCtx, null);
+        try {
+            MetadataTransactionContext mdTxnCtx = 
MetadataManager.INSTANCE.beginTransaction();
+            
org.apache.hyracks.algebricks.common.utils.Pair<ILSMMergePolicyFactory, 
Map<String, String>> mergePolicy =
+                    DatasetUtil.getMergePolicyFactory(dataset, mdTxnCtx);
+            MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+            PrimaryIndexInfo primaryIndexInfo = new PrimaryIndexInfo(dataset, 
primaryKeyTypes, recordType, metaType,
+                    mergePolicy.first, mergePolicy.second, filterFields, 
primaryKeyIndexes, primaryKeyIndicators);
+            IndexOperation op = IndexOperation.INSERT;
+            IModificationOperationCallbackFactory modOpCallbackFactory =
+                    new 
PrimaryIndexModificationOperationCallbackFactory(getTxnJobId(ctx), 
dataset.getDatasetId(),
+                            primaryIndexInfo.primaryKeyIndexes, 
TXN_SUBSYSTEM_PROVIDER, Operation.get(op),
+                            ResourceType.LSM_BTREE);
+            IRecordDescriptorProvider recordDescProvider = 
primaryIndexInfo.getInsertRecordDescriptorProvider();
+            RecordDescriptor recordDesc =
+                    recordDescProvider.getInputRecordDescriptor(new 
ActivityId(new OperatorDescriptorId(0), 0), 0);
+            IIndexDataflowHelperFactory indexHelperFactory = new 
IndexDataflowHelperFactory(
+                    storageComponentProvider.getStorageManager(), 
primaryIndexInfo.getFileSplitProvider());
+            LSMInsertDeleteOperatorNodePushable insertOp =
+                    new LSMInsertDeleteOperatorNodePushable(ctx, 
ctx.getTaskAttemptId().getTaskId().getPartition(),
+                            
primaryIndexInfo.primaryIndexInsertFieldsPermutations, recordDesc, op, true,
+                            indexHelperFactory, modOpCallbackFactory, null);
+
+            // For now, this assumes a single secondary index. recordDesc is 
always <pk-record-meta>
+            // for the index, we will have to create an assign operator that 
extract the sk
+            // then the secondary LSMInsertDeleteOperatorNodePushable
+            if (secondaryIndex != null) {
+                List<List<String>> skNames = secondaryIndex.getKeyFieldNames();
+                List<Integer> indicators = 
secondaryIndex.getKeyFieldSourceIndicators();
+                IScalarEvaluatorFactory[] secondaryFieldAccessEvalFactories =
+                        new IScalarEvaluatorFactory[skNames.size()];
+                for (int i = 0; i < skNames.size(); i++) {
+                    ARecordType sourceType = dataset.hasMetaPart()
+                            ? indicators.get(i).intValue() == 
Index.RECORD_INDICATOR ? recordType : metaType
+                            : recordType;
+                    int pos = skNames.get(i).size() > 1 ? -1 : 
sourceType.getFieldIndex(skNames.get(i).get(0));
+                    secondaryFieldAccessEvalFactories[i] = 
mdProvider.getDataFormat().getFieldAccessEvaluatorFactory(
+                            mdProvider.getFunctionManager(), sourceType, 
secondaryIndex.getKeyFieldNames().get(i), pos);
+                }
+                // outColumns are computed inside the assign runtime
+                int[] outColumns = new int[skNames.size()];
+                // projection list include old and new (primary and secondary 
keys)
+                int[] projectionList = new int[skNames.size() + 
primaryIndexInfo.index.getKeyFieldNames().size()];
+                for (int i = 0; i < secondaryFieldAccessEvalFactories.length; 
i++) {
+                    outColumns[i] = primaryIndexInfo.rDesc.getFieldCount() + i;
+                }
+                int projCount = 0;
+                for (int i = 0; i < secondaryFieldAccessEvalFactories.length; 
i++) {
+                    projectionList[projCount++] = 
primaryIndexInfo.rDesc.getFieldCount() + i;
+                }
+                for (int i = 0; i < 
primaryIndexInfo.index.getKeyFieldNames().size(); i++) {
+                    projectionList[projCount++] = i;
+                }
+                IPushRuntime assignOp =
+                        new AssignRuntimeFactory(outColumns, 
secondaryFieldAccessEvalFactories, projectionList, true)
+                                .createPushRuntime(ctx);
+                insertOp.setOutputFrameWriter(0, assignOp, 
primaryIndexInfo.rDesc);
+                assignOp.setInputRecordDescriptor(0, primaryIndexInfo.rDesc);
+                SecondaryIndexInfo secondaryIndexInfo = new 
SecondaryIndexInfo(primaryIndexInfo, secondaryIndex);
+                IIndexDataflowHelperFactory secondaryIndexHelperFactory = new 
IndexDataflowHelperFactory(
+                        storageComponentProvider.getStorageManager(), 
secondaryIndexInfo.fileSplitProvider);
+                LSMInsertDeleteOperatorNodePushable secondaryInsertOp =
+                        new LSMInsertDeleteOperatorNodePushable(ctx, 
ctx.getTaskAttemptId().getTaskId().getPartition(),
+                                secondaryIndexInfo.insertFieldsPermutations, 
secondaryIndexInfo.rDesc, op, false,
+                                secondaryIndexHelperFactory, 
NoOpOperationCallbackFactory.INSTANCE, null);
+                assignOp.setOutputFrameWriter(0, secondaryInsertOp, 
secondaryIndexInfo.rDesc);
+                CommitRuntime commitOp = new CommitRuntime(ctx, 
getTxnJobId(ctx), dataset.getDatasetId(),
+                        secondaryIndexInfo.primaryKeyIndexes, true, 
ctx.getTaskAttemptId().getTaskId().getPartition(),
+                        true);
+                secondaryInsertOp.setOutputFrameWriter(0, commitOp, 
secondaryIndexInfo.rDesc);
+                commitOp.setInputRecordDescriptor(0, secondaryIndexInfo.rDesc);
+                return Pair.of(insertOp, commitOp);
+            } else {
+                CommitRuntime commitOp = new CommitRuntime(ctx, 
getTxnJobId(ctx), dataset.getDatasetId(),
+                        primaryIndexInfo.primaryKeyIndexes, true, 
ctx.getTaskAttemptId().getTaskId().getPartition(),
+                        true);
+                insertOp.setOutputFrameWriter(0, commitOp, 
primaryIndexInfo.rDesc);
+                commitOp.setInputRecordDescriptor(0, primaryIndexInfo.rDesc);
+                return Pair.of(insertOp, commitOp);
+            }
+        } finally {
+            mdProvider.getLocks().unlock();
+        }
     }
 
     public IPushRuntime getFullScanPipeline(IFrameWriter countOp, 
IHyracksTaskContext ctx, Dataset dataset,
@@ -271,6 +334,34 @@ public class TestNodeController {
         return primaryIndexInfo;
     }
 
+    public SecondaryIndexInfo createSecondaryIndex(PrimaryIndexInfo 
primaryIndexInfo, Index secondaryIndex,
+            IStorageComponentProvider storageComponentProvider, int partition)
+            throws AlgebricksException, HyracksDataException, RemoteException, 
ACIDException {
+        MetadataTransactionContext mdTxnCtx = 
MetadataManager.INSTANCE.beginTransaction();
+        
org.apache.hyracks.algebricks.common.utils.Pair<ILSMMergePolicyFactory, 
Map<String, String>> mergePolicy =
+                DatasetUtil.getMergePolicyFactory(primaryIndexInfo.dataset, 
mdTxnCtx);
+        MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+        Dataverse dataverse = new 
Dataverse(primaryIndexInfo.dataset.getDataverseName(),
+                NonTaggedDataFormat.class.getName(), 
MetadataUtil.PENDING_NO_OP);
+        MetadataProvider mdProvider = new MetadataProvider(
+                (ICcApplicationContext) 
ExecutionTestUtil.integrationUtil.cc.getApplicationContext(), dataverse);
+        SecondaryIndexInfo secondaryIndexInfo = new 
SecondaryIndexInfo(primaryIndexInfo, secondaryIndex);
+        try {
+
+            IResourceFactory resourceFactory = 
primaryIndexInfo.dataset.getResourceFactory(mdProvider, secondaryIndex,
+                    primaryIndexInfo.recordType, primaryIndexInfo.metaType, 
mergePolicy.first, mergePolicy.second);
+            IndexBuilderFactory indexBuilderFactory =
+                    new 
IndexBuilderFactory(storageComponentProvider.getStorageManager(),
+                            secondaryIndexInfo.fileSplitProvider, 
resourceFactory, true);
+            IHyracksTaskContext ctx = createTestContext(newJobId(), partition, 
false);
+            IIndexBuilder indexBuilder = indexBuilderFactory.create(ctx, 
partition);
+            indexBuilder.build();
+        } finally {
+            mdProvider.getLocks().unlock();
+        }
+        return secondaryIndexInfo;
+    }
+
     public static ISerializerDeserializer<?>[] createPrimaryIndexSerdes(int 
primaryIndexNumOfTupleFields,
             IAType[] primaryKeyTypes, ARecordType recordType, ARecordType 
metaType) {
         int i = 0;
@@ -286,6 +377,22 @@ public class TestNodeController {
         return primaryIndexSerdes;
     }
 
+    public static ISerializerDeserializer<?>[] 
createSecondaryIndexSerdes(ARecordType recordType, ARecordType metaType,
+            IAType[] primaryKeyTypes, IAType[] secondaryKeyTypes) {
+        ISerializerDeserializer<?>[] secondaryIndexSerdes =
+                new ISerializerDeserializer<?>[secondaryKeyTypes.length + 
primaryKeyTypes.length];
+        int i = 0;
+        for (; i < secondaryKeyTypes.length; i++) {
+            secondaryIndexSerdes[i] =
+                    
SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(secondaryKeyTypes[i]);
+        }
+        for (; i < primaryKeyTypes.length; i++) {
+            secondaryIndexSerdes[i] =
+                    
SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(primaryKeyTypes[i]);
+        }
+        return secondaryIndexSerdes;
+    }
+
     public static ITypeTraits[] createPrimaryIndexTypeTraits(int 
primaryIndexNumOfTupleFields, IAType[] primaryKeyTypes,
             ARecordType recordType, ARecordType metaType) {
         ITypeTraits[] primaryIndexTypeTraits = new 
ITypeTraits[primaryIndexNumOfTupleFields];
@@ -300,6 +407,19 @@ public class TestNodeController {
         return primaryIndexTypeTraits;
     }
 
+    public static ITypeTraits[] createSecondaryIndexTypeTraits(ARecordType 
recordType, ARecordType metaType,
+            IAType[] primaryKeyTypes, IAType[] secondaryKeyTypes) {
+        ITypeTraits[] secondaryIndexTypeTraits = new 
ITypeTraits[secondaryKeyTypes.length + primaryKeyTypes.length];
+        int i = 0;
+        for (; i < secondaryKeyTypes.length; i++) {
+            secondaryIndexTypeTraits[i] = 
TypeTraitProvider.INSTANCE.getTypeTrait(secondaryKeyTypes[i]);
+        }
+        for (; i < primaryKeyTypes.length; i++) {
+            secondaryIndexTypeTraits[i] = 
TypeTraitProvider.INSTANCE.getTypeTrait(primaryKeyTypes[i]);
+        }
+        return secondaryIndexTypeTraits;
+    }
+
     public IHyracksTaskContext createTestContext(JobId jobId, int partition, 
boolean withMessaging)
             throws HyracksDataException {
         IHyracksTaskContext ctx = TestUtils.create(KB32);
@@ -337,7 +457,47 @@ public class TestNodeController {
         return (DatasetLifecycleManager) 
getAppRuntimeContext().getDatasetLifecycleManager();
     }
 
+    public static class SecondaryIndexInfo {
+        private int[] primaryKeyIndexes;
+        private PrimaryIndexInfo primaryIndexInfo;
+        private Index secondaryIndex;
+        private ConstantFileSplitProvider fileSplitProvider;
+        private RecordDescriptor rDesc;
+        private int[] insertFieldsPermutations;
+
+        public SecondaryIndexInfo(PrimaryIndexInfo primaryIndexInfo, Index 
secondaryIndex) {
+            this.primaryIndexInfo = primaryIndexInfo;
+            this.secondaryIndex = secondaryIndex;
+            List<String> nodes = 
Collections.singletonList(ExecutionTestUtil.integrationUtil.ncs[0].getId());
+            CcApplicationContext appCtx =
+                    (CcApplicationContext) 
ExecutionTestUtil.integrationUtil.cc.getApplicationContext();
+            FileSplit[] splits = 
SplitsAndConstraintsUtil.getIndexSplits(appCtx.getClusterStateManager(),
+                    primaryIndexInfo.dataset, secondaryIndex.getIndexName(), 
nodes);
+            fileSplitProvider = new ConstantFileSplitProvider(splits);
+            ITypeTraits[] secondaryIndexTypeTraits = 
createSecondaryIndexTypeTraits(primaryIndexInfo.recordType,
+                    primaryIndexInfo.metaType, 
primaryIndexInfo.primaryKeyTypes,
+                    secondaryIndex.getKeyFieldTypes().toArray(new 
IAType[secondaryIndex.getKeyFieldTypes().size()]));
+            ISerializerDeserializer<?>[] secondaryIndexSerdes = 
createSecondaryIndexSerdes(primaryIndexInfo.recordType,
+                    primaryIndexInfo.metaType, 
primaryIndexInfo.primaryKeyTypes,
+                    secondaryIndex.getKeyFieldTypes().toArray(new 
IAType[secondaryIndex.getKeyFieldTypes().size()]));
+            rDesc = new RecordDescriptor(secondaryIndexSerdes, 
secondaryIndexTypeTraits);
+            insertFieldsPermutations = new 
int[secondaryIndexTypeTraits.length];
+            for (int i = 0; i < insertFieldsPermutations.length; i++) {
+                insertFieldsPermutations[i] = i;
+            }
+            primaryKeyIndexes = new 
int[primaryIndexInfo.primaryKeyIndexes.length];
+            for (int i = 0; i < primaryKeyIndexes.length; i++) {
+                primaryKeyIndexes[i] = i + 
secondaryIndex.getKeyFieldNames().size();
+            }
+        }
+
+        public IFileSplitProvider getFileSplitProvider() {
+            return fileSplitProvider;
+        }
+    }
+
     public static class PrimaryIndexInfo {
+        private Dataset dataset;
         private IAType[] primaryKeyTypes;
         private ARecordType recordType;
         private ARecordType metaType;
@@ -356,6 +516,7 @@ public class TestNodeController {
                 ILSMMergePolicyFactory mergePolicyFactory, Map<String, String> 
mergePolicyProperties,
                 int[] filterFields, int[] primaryKeyIndexes, List<Integer> 
primaryKeyIndicators)
                 throws AlgebricksException {
+            this.dataset = dataset;
             this.primaryKeyTypes = primaryKeyTypes;
             this.recordType = recordType;
             this.metaType = metaType;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/c5a0a197/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ComponentRollbackTest.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ComponentRollbackTest.java
 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ComponentRollbackTest.java
index a239210..e923d93 100644
--- 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ComponentRollbackTest.java
+++ 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ComponentRollbackTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.asterix.test.dataflow;
 
+import java.io.File;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -27,6 +28,7 @@ import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
+import java.util.concurrent.Semaphore;
 import java.util.function.Predicate;
 
 import org.apache.asterix.app.bootstrap.TestNodeController;
@@ -37,6 +39,7 @@ import 
org.apache.asterix.app.data.gen.TupleGenerator.GenerationFunction;
 import org.apache.asterix.app.nc.NCAppRuntimeContext;
 import org.apache.asterix.common.api.IDatasetLifecycleManager;
 import org.apache.asterix.common.config.DatasetConfig.DatasetType;
+import org.apache.asterix.common.context.PrimaryIndexOperationTracker;
 import org.apache.asterix.common.dataflow.LSMInsertDeleteOperatorNodePushable;
 import org.apache.asterix.common.ioopcallbacks.AbstractLSMIOOperationCallback;
 import org.apache.asterix.common.transactions.ITransactionContext;
@@ -67,9 +70,11 @@ import 
org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
 import org.apache.hyracks.storage.am.common.api.IIndexDataflowHelper;
 import 
org.apache.hyracks.storage.am.common.dataflow.IndexDataflowHelperFactory;
 import org.apache.hyracks.storage.am.common.impls.NoOpIndexAccessParameters;
+import org.apache.hyracks.storage.am.lsm.btree.impl.ITestOpCallback;
 import org.apache.hyracks.storage.am.lsm.btree.impl.TestLsmBtree;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMMemoryComponent;
 import org.apache.hyracks.storage.am.lsm.common.impls.NoMergePolicyFactory;
@@ -112,12 +117,24 @@ public class ComponentRollbackTest {
     private static IIndexDataflowHelper indexDataflowHelper;
     private static ITransactionContext txnCtx;
     private static LSMInsertDeleteOperatorNodePushable insertOp;
+    public static final ITestOpCallback<Semaphore> ALLOW_CALLBACK = new 
ITestOpCallback<Semaphore>() {
+        @Override
+        public void before(Semaphore smeaphore) {
+            smeaphore.release();
+        }
+
+        @Override
+        public void after() {
+        }
+    };
 
     @BeforeClass
     public static void setUp() throws Exception {
         System.out.println("SetUp: ");
         TestHelper.deleteExistingInstanceFiles();
-        nc = new TestNodeController(null, false);
+        String configPath = System.getProperty("user.dir") + File.separator + 
"src" + File.separator + "test"
+                + File.separator + "resources" + File.separator + 
"multi-partition-test-configuration.xml";
+        nc = new TestNodeController(configPath, false);
         nc.init();
         ncAppCtx = nc.getAppRuntimeContext();
         dsLifecycleMgr = ncAppCtx.getDatasetLifecycleManager();
@@ -153,7 +170,7 @@ public class ComponentRollbackTest {
         txnCtx = 
nc.getTransactionManager().beginTransaction(nc.getTxnJobId(ctx),
                 new 
TransactionOptions(ITransactionManager.AtomicityLevel.ENTITY_LEVEL));
         insertOp = nc.getInsertPipeline(ctx, dataset, KEY_TYPES, RECORD_TYPE, 
META_TYPE, null, KEY_INDEXES,
-                KEY_INDICATORS_LIST, storageManager).getLeft();
+                KEY_INDICATORS_LIST, storageManager, null).getLeft();
     }
 
     @After
@@ -162,10 +179,10 @@ public class ComponentRollbackTest {
     }
 
     static void allowAllOps(TestLsmBtree lsmBtree) {
-        lsmBtree.addModifyCallback(sem -> sem.release());
-        lsmBtree.addFlushCallback(sem -> sem.release());
-        lsmBtree.addSearchCallback(sem -> sem.release());
-        lsmBtree.addMergeCallback(sem -> sem.release());
+        lsmBtree.addModifyCallback(ALLOW_CALLBACK);
+        lsmBtree.addFlushCallback(ALLOW_CALLBACK);
+        lsmBtree.addSearchCallback(ALLOW_CALLBACK);
+        lsmBtree.addMergeCallback(ALLOW_CALLBACK);
     }
 
     @Test
@@ -184,7 +201,7 @@ public class ComponentRollbackTest {
                     if (tupleAppender.getTupleCount() > 0) {
                         tupleAppender.write(insertOp, true);
                     }
-                    dsLifecycleMgr.flushDataset(dataset.getDatasetId(), false);
+                    flush(false);
                 }
                 ITupleReference tuple = tupleGenerator.next();
                 DataflowUtils.addTupleToFrame(tupleAppender, tuple, insertOp);
@@ -222,6 +239,16 @@ public class ComponentRollbackTest {
         }
     }
 
+    public void flush(boolean async) throws Exception {
+        flush(dsLifecycleMgr, lsmBtree, dataset, async);
+    }
+
+    public static void flush(IDatasetLifecycleManager dsLifecycleMgr, 
TestLsmBtree lsmBtree, Dataset dataset,
+            boolean async) throws Exception {
+        waitForOperations(lsmBtree);
+        dsLifecycleMgr.flushDataset(dataset.getDatasetId(), async);
+    }
+
     @Test
     public void testRollbackThenInsert() {
         try {
@@ -238,7 +265,7 @@ public class ComponentRollbackTest {
                     if (tupleAppender.getTupleCount() > 0) {
                         tupleAppender.write(insertOp, true);
                     }
-                    dsLifecycleMgr.flushDataset(dataset.getDatasetId(), false);
+                    flush(false);
                 }
                 ITupleReference tuple = tupleGenerator.next();
                 DataflowUtils.addTupleToFrame(tupleAppender, tuple, insertOp);
@@ -267,7 +294,7 @@ public class ComponentRollbackTest {
             txnCtx = 
nc.getTransactionManager().beginTransaction(nc.getTxnJobId(ctx),
                     new 
TransactionOptions(ITransactionManager.AtomicityLevel.ENTITY_LEVEL));
             insertOp = nc.getInsertPipeline(ctx, dataset, KEY_TYPES, 
RECORD_TYPE, META_TYPE, null, KEY_INDEXES,
-                    KEY_INDICATORS_LIST, storageManager).getLeft();
+                    KEY_INDICATORS_LIST, storageManager, null).getLeft();
             insertOp.open();
             for (int j = 0; j < RECORDS_PER_COMPONENT; j++) {
                 ITupleReference tuple = tupleGenerator.next();
@@ -311,7 +338,7 @@ public class ComponentRollbackTest {
                     if (tupleAppender.getTupleCount() > 0) {
                         tupleAppender.write(insertOp, true);
                     }
-                    dsLifecycleMgr.flushDataset(dataset.getDatasetId(), false);
+                    flush(false);
                 }
                 ITupleReference tuple = tupleGenerator.next();
                 DataflowUtils.addTupleToFrame(tupleAppender, tuple, insertOp);
@@ -339,7 +366,7 @@ public class ComponentRollbackTest {
             lsmAccessor.deleteComponents(
                     c -> (c instanceof ILSMMemoryComponent && 
((ILSMMemoryComponent) c).isModified()));
             // now that the rollback has completed, we will unblock the search
-            lsmBtree.addSearchCallback(sem -> sem.release());
+            lsmBtree.addSearchCallback(ALLOW_CALLBACK);
             lsmBtree.allowSearch(1);
             Assert.assertTrue(firstSearcher.result());
             // search now and ensure
@@ -359,7 +386,7 @@ public class ComponentRollbackTest {
             DiskComponentLsnPredicate pred = new 
DiskComponentLsnPredicate(lsn);
             lsmAccessor.deleteComponents(pred);
             // now that the rollback has completed, we will unblock the search
-            lsmBtree.addSearchCallback(sem -> sem.release());
+            lsmBtree.addSearchCallback(ALLOW_CALLBACK);
             lsmBtree.allowSearch(1);
             Assert.assertTrue(secondSearcher.result());
             searchAndAssertCount(nc, 0, dataset, storageManager, 
TOTAL_NUM_OF_RECORDS - (2 * RECORDS_PER_COMPONENT));
@@ -385,7 +412,7 @@ public class ComponentRollbackTest {
                     if (tupleAppender.getTupleCount() > 0) {
                         tupleAppender.write(insertOp, true);
                     }
-                    dsLifecycleMgr.flushDataset(dataset.getDatasetId(), false);
+                    flush(false);
                 }
                 ITupleReference tuple = tupleGenerator.next();
                 DataflowUtils.addTupleToFrame(tupleAppender, tuple, insertOp);
@@ -404,7 +431,7 @@ public class ComponentRollbackTest {
             // disable flushes
             lsmBtree.clearFlushCallbacks();
             Flusher firstFlusher = new Flusher(lsmBtree);
-            dsLifecycleMgr.flushDataset(dataset.getDatasetId(), true);
+            flush(true);
             firstFlusher.waitUntilCount(1);
             // now that we enetered, we will rollback. This will not proceed 
since it is waiting for the flush to complete
             Rollerback rollerback = new Rollerback(lsmBtree, 
memoryComponentsPredicate);
@@ -441,7 +468,7 @@ public class ComponentRollbackTest {
                     if (tupleAppender.getTupleCount() > 0) {
                         tupleAppender.write(insertOp, true);
                     }
-                    dsLifecycleMgr.flushDataset(dataset.getDatasetId(), false);
+                    flush(false);
                 }
                 ITupleReference tuple = tupleGenerator.next();
                 DataflowUtils.addTupleToFrame(tupleAppender, tuple, insertOp);
@@ -505,7 +532,7 @@ public class ComponentRollbackTest {
                     if (tupleAppender.getTupleCount() > 0) {
                         tupleAppender.write(insertOp, true);
                     }
-                    dsLifecycleMgr.flushDataset(dataset.getDatasetId(), false);
+                    flush(false);
                 }
                 ITupleReference tuple = tupleGenerator.next();
                 DataflowUtils.addTupleToFrame(tupleAppender, tuple, insertOp);
@@ -526,7 +553,7 @@ public class ComponentRollbackTest {
             lsmBtree.clearFlushCallbacks();
             lsmBtree.clearSearchCallbacks();
             Flusher firstFlusher = new Flusher(lsmBtree);
-            dsLifecycleMgr.flushDataset(dataset.getDatasetId(), true);
+            flush(true);
             firstFlusher.waitUntilCount(1);
             Searcher firstSearcher = new Searcher(nc, 0, dataset, 
storageManager, lsmBtree, TOTAL_NUM_OF_RECORDS);
             // wait till firstSearcher enter the components
@@ -535,7 +562,7 @@ public class ComponentRollbackTest {
             Rollerback rollerback = new Rollerback(lsmBtree, 
memoryComponentsPredicate);
             //unblock the flush
             lsmBtree.allowFlush(1);
-            lsmBtree.addSearchCallback(sem -> sem.release());
+            lsmBtree.addSearchCallback(ALLOW_CALLBACK);
             lsmBtree.allowSearch(1);
             Assert.assertTrue(firstSearcher.result());
             // ensure current mem component is not modified
@@ -565,7 +592,7 @@ public class ComponentRollbackTest {
                     if (tupleAppender.getTupleCount() > 0) {
                         tupleAppender.write(insertOp, true);
                     }
-                    dsLifecycleMgr.flushDataset(dataset.getDatasetId(), false);
+                    flush(false);
                 }
                 ITupleReference tuple = tupleGenerator.next();
                 DataflowUtils.addTupleToFrame(tupleAppender, tuple, insertOp);
@@ -585,7 +612,7 @@ public class ComponentRollbackTest {
             // disable searches
             lsmBtree.clearFlushCallbacks();
             Flusher firstFlusher = new Flusher(lsmBtree);
-            dsLifecycleMgr.flushDataset(dataset.getDatasetId(), true);
+            flush(true);
             firstFlusher.waitUntilCount(1);
             lsmBtree.clearSearchCallbacks();
             Searcher firstSearcher = new Searcher(nc, 0, dataset, 
storageManager, lsmBtree, TOTAL_NUM_OF_RECORDS);
@@ -594,7 +621,7 @@ public class ComponentRollbackTest {
             // now that we enetered, we will rollback
             Rollerback rollerback = new Rollerback(lsmBtree, 
memoryComponentsPredicate);
             // The rollback will be waiting for the flush to complete
-            lsmBtree.addSearchCallback(sem -> sem.release());
+            lsmBtree.addSearchCallback(ALLOW_CALLBACK);
             lsmBtree.allowSearch(1);
             Assert.assertTrue(firstSearcher.result());
             //unblock the flush
@@ -627,7 +654,7 @@ public class ComponentRollbackTest {
                     if (tupleAppender.getTupleCount() > 0) {
                         tupleAppender.write(insertOp, true);
                     }
-                    dsLifecycleMgr.flushDataset(dataset.getDatasetId(), false);
+                    flush(false);
                 }
                 ITupleReference tuple = tupleGenerator.next();
                 DataflowUtils.addTupleToFrame(tupleAppender, tuple, insertOp);
@@ -666,7 +693,7 @@ public class ComponentRollbackTest {
             // unblock the merge
             lsmBtree.allowMerge(1);
             // unblock the search
-            lsmBtree.addSearchCallback(sem -> sem.release());
+            lsmBtree.addSearchCallback(ALLOW_CALLBACK);
             lsmBtree.allowSearch(1);
             Assert.assertTrue(firstSearcher.result());
             rollerback.complete();
@@ -698,7 +725,7 @@ public class ComponentRollbackTest {
                     if (tupleAppender.getTupleCount() > 0) {
                         tupleAppender.write(insertOp, true);
                     }
-                    dsLifecycleMgr.flushDataset(dataset.getDatasetId(), false);
+                    flush(false);
                 }
                 ITupleReference tuple = tupleGenerator.next();
                 DataflowUtils.addTupleToFrame(tupleAppender, tuple, insertOp);
@@ -734,7 +761,7 @@ public class ComponentRollbackTest {
             // now that we enetered, we will rollback
             Rollerback rollerBack = new Rollerback(lsmBtree, new 
DiskComponentLsnPredicate(lsn));
             // unblock the search
-            lsmBtree.addSearchCallback(sem -> sem.release());
+            lsmBtree.addSearchCallback(ALLOW_CALLBACK);
             lsmBtree.allowSearch(1);
             Assert.assertTrue(firstSearcher.result());
             // even though rollback has been called, it is still waiting for 
the merge to complete
@@ -790,10 +817,18 @@ public class ComponentRollbackTest {
 
         public Searcher(TestNodeController nc, int partition, Dataset dataset, 
StorageComponentProvider storageManager,
                 TestLsmBtree lsmBtree, int numOfRecords) {
-            lsmBtree.addSearchCallback(sem -> {
-                synchronized (Searcher.this) {
-                    entered = true;
-                    Searcher.this.notifyAll();
+            lsmBtree.addSearchCallback(new ITestOpCallback<Semaphore>() {
+
+                @Override
+                public void before(Semaphore sem) {
+                    synchronized (Searcher.this) {
+                        entered = true;
+                        Searcher.this.notifyAll();
+                    }
+                }
+
+                @Override
+                public void after() {
                 }
             });
             Callable<Boolean> callable = new Callable<Boolean>() {
@@ -821,10 +856,18 @@ public class ComponentRollbackTest {
         private volatile int count = 0;
 
         public Merger(TestLsmBtree lsmBtree) {
-            lsmBtree.addMergeCallback(sem -> {
-                synchronized (Merger.this) {
-                    count++;
-                    Merger.this.notifyAll();
+            lsmBtree.addMergeCallback(new ITestOpCallback<Semaphore>() {
+
+                @Override
+                public void before(Semaphore smeaphore) {
+                    synchronized (Merger.this) {
+                        count++;
+                        Merger.this.notifyAll();
+                    }
+                }
+
+                @Override
+                public void after() {
                 }
             });
         }
@@ -840,10 +883,18 @@ public class ComponentRollbackTest {
         private volatile int count = 0;
 
         public Flusher(TestLsmBtree lsmBtree) {
-            lsmBtree.addFlushCallback(sem -> {
-                synchronized (Flusher.this) {
-                    count++;
-                    Flusher.this.notifyAll();
+            lsmBtree.addFlushCallback(new ITestOpCallback<Semaphore>() {
+
+                @Override
+                public void before(Semaphore smeaphore) {
+                    synchronized (Flusher.this) {
+                        count++;
+                        Flusher.this.notifyAll();
+                    }
+                }
+
+                @Override
+                public void after() {
                 }
             });
         }
@@ -889,6 +940,20 @@ public class ComponentRollbackTest {
         Assert.assertEquals(numOfRecords, countOp.getCount());
     }
 
+    public static void waitForOperations(ILSMIndex index) throws 
InterruptedException {
+        // wait until number of activeOperation reaches 0
+        PrimaryIndexOperationTracker opTracker = 
(PrimaryIndexOperationTracker) index.getOperationTracker();
+        long maxWaitTime = 60000L; // 1 minute
+        long before = System.currentTimeMillis();
+        while (opTracker.getNumActiveOperations() > 0) {
+            Thread.sleep(5); // NOSONAR: Test code with a timeout
+            if (System.currentTimeMillis() - before > maxWaitTime) {
+                throw new IllegalStateException(
+                        (System.currentTimeMillis() - before) + "ms passed 
without completing the frame operation");
+            }
+        }
+    }
+
     public static TestTupleCounterFrameWriter create(RecordDescriptor 
recordDescriptor,
             Collection<FrameWriterOperation> exceptionThrowingOperations,
             Collection<FrameWriterOperation> errorThrowingOperations, boolean 
deepCopyInputFrames) {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/c5a0a197/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LogMarkerTest.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LogMarkerTest.java
 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LogMarkerTest.java
index 963cded..0a968c8 100644
--- 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LogMarkerTest.java
+++ 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LogMarkerTest.java
@@ -124,7 +124,7 @@ public class LogMarkerTest {
                 ITransactionContext txnCtx = 
nc.getTransactionManager().beginTransaction(nc.getTxnJobId(ctx),
                         new 
TransactionOptions(ITransactionManager.AtomicityLevel.ENTITY_LEVEL));
                 LSMInsertDeleteOperatorNodePushable insertOp = 
nc.getInsertPipeline(ctx, dataset, KEY_TYPES,
-                        RECORD_TYPE, META_TYPE, null, KEY_INDEXES, 
KEY_INDICATORS_LIST, storageManager).getLeft();
+                        RECORD_TYPE, META_TYPE, null, KEY_INDEXES, 
KEY_INDICATORS_LIST, storageManager, null).getLeft();
                 insertOp.open();
                 TupleGenerator tupleGenerator = new 
TupleGenerator(RECORD_TYPE, META_TYPE, KEY_INDEXES, KEY_INDICATORS,
                         RECORD_GEN_FUNCTION, UNIQUE_RECORD_FIELDS, 
META_GEN_FUNCTION, UNIQUE_META_FIELDS);

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/c5a0a197/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/MultiPartitionLSMIndexTest.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/MultiPartitionLSMIndexTest.java
 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/MultiPartitionLSMIndexTest.java
index 2c8141ce..c12f50c 100644
--- 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/MultiPartitionLSMIndexTest.java
+++ 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/MultiPartitionLSMIndexTest.java
@@ -18,18 +18,23 @@
  */
 package org.apache.asterix.test.dataflow;
 
+import java.io.File;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.asterix.app.bootstrap.TestNodeController;
 import org.apache.asterix.app.bootstrap.TestNodeController.PrimaryIndexInfo;
+import org.apache.asterix.app.bootstrap.TestNodeController.SecondaryIndexInfo;
 import org.apache.asterix.app.data.gen.TupleGenerator;
 import org.apache.asterix.app.data.gen.TupleGenerator.GenerationFunction;
 import org.apache.asterix.app.nc.NCAppRuntimeContext;
 import org.apache.asterix.common.api.IDatasetLifecycleManager;
 import org.apache.asterix.common.config.DatasetConfig.DatasetType;
+import org.apache.asterix.common.config.DatasetConfig.IndexType;
 import org.apache.asterix.common.dataflow.LSMInsertDeleteOperatorNodePushable;
 import org.apache.asterix.common.transactions.ITransactionContext;
 import org.apache.asterix.common.transactions.ITransactionManager;
@@ -44,14 +49,21 @@ import org.apache.asterix.om.types.ARecordType;
 import org.apache.asterix.om.types.BuiltinType;
 import org.apache.asterix.om.types.IAType;
 import org.apache.asterix.test.common.TestHelper;
+import org.apache.commons.lang3.mutable.MutableBoolean;
 import org.apache.hyracks.api.comm.VSizeFrame;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.util.SingleThreadEventProcessor;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
 import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
 import org.apache.hyracks.storage.am.common.api.IIndexDataflowHelper;
 import 
org.apache.hyracks.storage.am.common.dataflow.IndexDataflowHelperFactory;
+import org.apache.hyracks.storage.am.lsm.btree.impl.ITestOpCallback;
+import 
org.apache.hyracks.storage.am.lsm.btree.impl.IVirtualBufferCacheCallback;
 import org.apache.hyracks.storage.am.lsm.btree.impl.TestLsmBtree;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMMemoryComponent;
 import org.apache.hyracks.storage.am.lsm.common.impls.NoMergePolicyFactory;
 import org.junit.After;
 import org.junit.AfterClass;
@@ -59,8 +71,18 @@ import org.junit.Assert;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
+@RunWith(Parameterized.class)
 public class MultiPartitionLSMIndexTest {
+    static final int REPREAT_TEST_COUNT = 1;
+
+    @Parameterized.Parameters
+    public static List<Object[]> data() {
+        return Arrays.asList(new Object[REPREAT_TEST_COUNT][0]);
+    }
+
     private static final IAType[] KEY_TYPES = { BuiltinType.AINT32 };
     private static final ARecordType RECORD_TYPE = new 
ARecordType("TestRecordType", new String[] { "key", "value" },
             new IAType[] { BuiltinType.AINT32, BuiltinType.AINT64 }, false);
@@ -73,30 +95,42 @@ public class MultiPartitionLSMIndexTest {
     private static final int[] KEY_INDEXES = { 0 };
     private static final int[] KEY_INDICATORS = { Index.RECORD_INDICATOR };
     private static final List<Integer> KEY_INDICATORS_LIST = Arrays.asList(new 
Integer[] { Index.RECORD_INDICATOR });
-    private static final int TOTAL_NUM_OF_RECORDS = 10000;
-    private static final int RECORDS_PER_COMPONENT = 1000;
+    private static final int TOTAL_NUM_OF_RECORDS = 5000;
+    private static final int RECORDS_PER_COMPONENT = 500;
     private static final int DATASET_ID = 101;
     private static final String DATAVERSE_NAME = "TestDV";
     private static final String DATASET_NAME = "TestDS";
+    private static final String INDEX_NAME = "TestIdx";
     private static final String DATA_TYPE_NAME = "DUMMY";
     private static final String NODE_GROUP_NAME = "DEFAULT";
+    private static final IndexType INDEX_TYPE = IndexType.BTREE;
+    private static final List<List<String>> INDEX_FIELD_NAMES =
+            Arrays.asList(Arrays.asList(RECORD_TYPE.getFieldNames()[1]));
+    private static final List<Integer> INDEX_FIELD_INDICATORS = 
Arrays.asList(Index.RECORD_INDICATOR);
+    private static final List<IAType> INDEX_FIELD_TYPES = 
Arrays.asList(BuiltinType.AINT64);
     private static final StorageComponentProvider storageManager = new 
StorageComponentProvider();
     private static final int NUM_PARTITIONS = 2;
     private static TestNodeController nc;
     private static NCAppRuntimeContext ncAppCtx;
     private static IDatasetLifecycleManager dsLifecycleMgr;
     private static Dataset dataset;
+    private static Index secondaryIndex;
     private static ITransactionContext txnCtx;
-    private static TestLsmBtree[] primarylsmBtrees;
+    private static TestLsmBtree[] primaryLsmBtrees;
+    private static TestLsmBtree[] secondaryLsmBtrees;
     private static IHyracksTaskContext[] taskCtxs;
-    private static IIndexDataflowHelper[] indexDataflowHelpers;
+    private static IIndexDataflowHelper[] primaryIndexDataflowHelpers;
+    private static IIndexDataflowHelper[] secondaryIndexDataflowHelpers;
     private static LSMInsertDeleteOperatorNodePushable[] insertOps;
+    private static Actor[] actors;
 
     @BeforeClass
     public static void setUp() throws Exception {
         System.out.println("SetUp: ");
         TestHelper.deleteExistingInstanceFiles();
-        nc = new TestNodeController(null, false);
+        String configPath = System.getProperty("user.dir") + File.separator + 
"src" + File.separator + "test"
+                + File.separator + "resources" + File.separator + 
"multi-partition-test-configuration.xml";
+        nc = new TestNodeController(configPath, false);
         nc.init();
         ncAppCtx = nc.getAppRuntimeContext();
         dsLifecycleMgr = ncAppCtx.getDatasetLifecycleManager();
@@ -113,84 +147,515 @@ public class MultiPartitionLSMIndexTest {
     public void createIndex() throws Exception {
         List<List<String>> partitioningKeys = new ArrayList<>();
         partitioningKeys.add(Collections.singletonList("key"));
-        dataset = new TestDataset(DATAVERSE_NAME, DATASET_NAME, 
DATAVERSE_NAME, DATA_TYPE_NAME, NODE_GROUP_NAME,
-                NoMergePolicyFactory.NAME, null, new 
InternalDatasetDetails(null, PartitioningStrategy.HASH,
-                        partitioningKeys, null, null, null, false, null),
-                null, DatasetType.INTERNAL, DATASET_ID, 0);
+        dataset =
+                new TestDataset(DATAVERSE_NAME, DATASET_NAME, DATAVERSE_NAME, 
DATA_TYPE_NAME, NODE_GROUP_NAME,
+                        NoMergePolicyFactory.NAME, null, new 
InternalDatasetDetails(null, PartitioningStrategy.HASH,
+                                partitioningKeys, null, null, null, false, 
null),
+                        null, DatasetType.INTERNAL, DATASET_ID, 0);
+        secondaryIndex = new Index(DATAVERSE_NAME, DATASET_NAME, INDEX_NAME, 
INDEX_TYPE, INDEX_FIELD_NAMES,
+                INDEX_FIELD_INDICATORS, INDEX_FIELD_TYPES, false, false, 
false, 0);
         taskCtxs = new IHyracksTaskContext[NUM_PARTITIONS];
-        indexDataflowHelpers = new IIndexDataflowHelper[NUM_PARTITIONS];
-        primarylsmBtrees = new TestLsmBtree[NUM_PARTITIONS];
+        primaryIndexDataflowHelpers = new IIndexDataflowHelper[NUM_PARTITIONS];
+        secondaryIndexDataflowHelpers = new 
IIndexDataflowHelper[NUM_PARTITIONS];
+        primaryLsmBtrees = new TestLsmBtree[NUM_PARTITIONS];
+        secondaryLsmBtrees = new TestLsmBtree[NUM_PARTITIONS];
         insertOps = new LSMInsertDeleteOperatorNodePushable[NUM_PARTITIONS];
         JobId jobId = nc.newJobId();
         txnCtx = 
nc.getTransactionManager().beginTransaction(nc.getTxnJobId(jobId),
                 new 
TransactionOptions(ITransactionManager.AtomicityLevel.ENTITY_LEVEL));
+        actors = new Actor[NUM_PARTITIONS];
         for (int i = 0; i < taskCtxs.length; i++) {
             taskCtxs[i] = nc.createTestContext(jobId, i, false);
             PrimaryIndexInfo primaryIndexInfo = nc.createPrimaryIndex(dataset, 
KEY_TYPES, RECORD_TYPE, META_TYPE, null,
                     storageManager, KEY_INDEXES, KEY_INDICATORS_LIST, i);
+            SecondaryIndexInfo secondaryIndexInfo =
+                    nc.createSecondaryIndex(primaryIndexInfo, secondaryIndex, 
storageManager, i);
             IndexDataflowHelperFactory iHelperFactory =
                     new IndexDataflowHelperFactory(nc.getStorageManager(), 
primaryIndexInfo.getFileSplitProvider());
-            indexDataflowHelpers[i] = 
iHelperFactory.create(taskCtxs[i].getJobletContext().getServiceContext(), i);
-            indexDataflowHelpers[i].open();
-            primarylsmBtrees[i] = (TestLsmBtree) 
indexDataflowHelpers[i].getIndexInstance();
-            indexDataflowHelpers[i].close();
+            primaryIndexDataflowHelpers[i] =
+                    
iHelperFactory.create(taskCtxs[i].getJobletContext().getServiceContext(), i);
+            primaryIndexDataflowHelpers[i].open();
+            primaryLsmBtrees[i] = (TestLsmBtree) 
primaryIndexDataflowHelpers[i].getIndexInstance();
+            iHelperFactory =
+                    new IndexDataflowHelperFactory(nc.getStorageManager(), 
secondaryIndexInfo.getFileSplitProvider());
+            secondaryIndexDataflowHelpers[i] =
+                    
iHelperFactory.create(taskCtxs[i].getJobletContext().getServiceContext(), i);
+            secondaryIndexDataflowHelpers[i].open();
+            secondaryLsmBtrees[i] = (TestLsmBtree) 
secondaryIndexDataflowHelpers[i].getIndexInstance();
+            secondaryIndexDataflowHelpers[i].close();
+            primaryIndexDataflowHelpers[i].close();
             insertOps[i] = nc.getInsertPipeline(taskCtxs[i], dataset, 
KEY_TYPES, RECORD_TYPE, META_TYPE, null,
-                    KEY_INDEXES, KEY_INDICATORS_LIST, 
storageManager).getLeft();
+                    KEY_INDEXES, KEY_INDICATORS_LIST, storageManager, 
secondaryIndex).getLeft();
+            actors[i] = new Actor("player-" + i, i);
+        }
+        // allow all operations
+        for (int i = 0; i < NUM_PARTITIONS; i++) {
+            ComponentRollbackTest.allowAllOps(primaryLsmBtrees[i]);
+            ComponentRollbackTest.allowAllOps(secondaryLsmBtrees[i]);
+            actors[i].add(new Request(Request.Action.INSERT_OPEN));
         }
     }
 
     @After
     public void destroyIndex() throws Exception {
-        for (IIndexDataflowHelper indexDataflowHelper : indexDataflowHelpers) {
+        for (int i = 0; i < NUM_PARTITIONS; i++) {
+            Request close = new Request(Request.Action.INSERT_CLOSE);
+            actors[i].add(close);
+            close.await();
+        }
+        nc.getTransactionManager().commitTransaction(txnCtx.getTxnId());
+        for (IIndexDataflowHelper indexDataflowHelper : 
secondaryIndexDataflowHelpers) {
+            indexDataflowHelper.destroy();
+        }
+        for (IIndexDataflowHelper indexDataflowHelper : 
primaryIndexDataflowHelpers) {
             indexDataflowHelper.destroy();
         }
+        for (Actor actor : actors) {
+            actor.stop();
+        }
     }
 
     @Test
     public void testFlushOneFullOneEmpty() {
         try {
-            // allow all operations
-            for (int i = 0; i < NUM_PARTITIONS; i++) {
-                ComponentRollbackTest.allowAllOps(primarylsmBtrees[i]);
+            int totalNumOfComponents = TOTAL_NUM_OF_RECORDS / 
RECORDS_PER_COMPONENT;
+            for (int j = 0; j < totalNumOfComponents; j++) {
+                actors[0].add(new Request(Request.Action.INSERT_PATCH));
+                actors[0].add(new Request(Request.Action.FLUSH_DATASET));
             }
-
-            insertOps[0].open();
-            TupleGenerator tupleGenerator = new TupleGenerator(RECORD_TYPE, 
META_TYPE, KEY_INDEXES, KEY_INDICATORS,
-                    RECORD_GEN_FUNCTION, UNIQUE_RECORD_FIELDS, 
META_GEN_FUNCTION, UNIQUE_META_FIELDS);
-            VSizeFrame frame = new VSizeFrame(taskCtxs[0]);
-            FrameTupleAppender tupleAppender = new FrameTupleAppender(frame);
-            int numFlushes = 0;
-            for (int j = 0; j < TOTAL_NUM_OF_RECORDS; j++) {
-                // flush every RECORDS_PER_COMPONENT records
-                if (j % RECORDS_PER_COMPONENT == (RECORDS_PER_COMPONENT - 1) 
&& j + 1 != TOTAL_NUM_OF_RECORDS) {
-                    if (tupleAppender.getTupleCount() > 0) {
-                        tupleAppender.write(insertOps[0], true);
-                    }
-                    dsLifecycleMgr.flushDataset(dataset.getDatasetId(), false);
-                    numFlushes++;
-                }
-                ITupleReference tuple = tupleGenerator.next();
-                DataflowUtils.addTupleToFrame(tupleAppender, tuple, 
insertOps[0]);
-            }
-            if (tupleAppender.getTupleCount() > 0) {
-                tupleAppender.write(insertOps[0], true);
-            }
-            insertOps[0].close();
-            dsLifecycleMgr.flushDataset(dataset.getDatasetId(), false);
-            numFlushes++;
-            nc.getTransactionManager().commitTransaction(txnCtx.getTxnId());
+            ensureDone(actors[0]);
             // search now and ensure partition 0 has all the records
             ComponentRollbackTest.searchAndAssertCount(nc, 0, dataset, 
storageManager, TOTAL_NUM_OF_RECORDS);
             // and that partition 1 has no records
             ComponentRollbackTest.searchAndAssertCount(nc, 1, dataset, 
storageManager, 0);
             // and that partition 0 has numFlushes disk components
-            Assert.assertEquals(numFlushes, 
primarylsmBtrees[0].getDiskComponents().size());
+            Assert.assertEquals(totalNumOfComponents, 
primaryLsmBtrees[0].getDiskComponents().size());
             // and that partition 1 has no disk components
-            Assert.assertEquals(0, 
primarylsmBtrees[1].getDiskComponents().size());
+            Assert.assertEquals(0, 
primaryLsmBtrees[1].getDiskComponents().size());
+            // and that in partition 0, all secondary components has a 
corresponding primary
+            List<ILSMDiskComponent> secondaryDiskComponents = 
secondaryLsmBtrees[0].getDiskComponents();
+            List<ILSMDiskComponent> primaryDiskComponents = 
primaryLsmBtrees[0].getDiskComponents();
+            for (int i = 0; i < secondaryDiskComponents.size(); i++) {
+                Assert.assertEquals(secondaryDiskComponents.get(i).getId(), 
primaryDiskComponents.get(i).getId());
+            }
         } catch (Throwable e) {
             e.printStackTrace();
             Assert.fail(e.getMessage());
         }
     }
 
+    private void ensureDone(Actor actor) throws InterruptedException {
+        Request req = new Request(Request.Action.DUMMY);
+        actor.add(req);
+        req.await();
+    }
+
+    /**
+     * This test update partition 0, schedule flush and modify partition 1
+     * Then ensure that in partition 1, primary and secondary have different 
component ids
+     */
+    @Test
+    public void testAllocateWhileFlushIsScheduled() {
+        try {
+            // when the vbc becomes full, we want to know
+            AtomicBoolean isFull = new AtomicBoolean(false);
+            MutableBoolean proceedToScheduleFlush = new MutableBoolean(false);
+            primaryLsmBtrees[0].addVirtuablBufferCacheCallback(new 
IVirtualBufferCacheCallback() {
+                @Override
+                public void isFullChanged(boolean newValue) {
+                    synchronized (isFull) {
+                        isFull.set(newValue);
+                        isFull.notifyAll();
+                    }
+                    synchronized (proceedToScheduleFlush) {
+                        while (!proceedToScheduleFlush.booleanValue()) {
+                            try {
+                                proceedToScheduleFlush.wait();
+                            } catch (InterruptedException e) {
+                                Thread.currentThread().interrupt();
+                                return;
+                            }
+                        }
+                    }
+                }
+            });
+            Request insertReq = new Request(Request.Action.INSERT_PATCH);
+            actors[0].add(insertReq);
+            while (true) {
+                Thread.sleep(100);
+                if (insertReq.done) {
+                    // if done, then flush was not triggered, then we need to 
insert a new patch
+                    insertReq = new Request(Request.Action.INSERT_PATCH);
+                    actors[0].add(insertReq);
+                } else if (isFull.get()) {
+                    break;
+                }
+            }
+
+            // now, we need to do two things
+            // allocate primary in partition 1 but not proceed
+            MutableBoolean proceedToAllocateSecondaryIndex = new 
MutableBoolean(false);
+            MutableBoolean allocated = new MutableBoolean(false);
+            primaryLsmBtrees[1].addAllocateCallback(new 
ITestOpCallback<Void>() {
+                @Override
+                public void before(Void t) {
+                    // Nothing
+                }
+
+                @Override
+                public void after() {
+                    synchronized (allocated) {
+                        allocated.setValue(true);
+                        allocated.notifyAll();
+                    }
+                    synchronized (proceedToAllocateSecondaryIndex) {
+                        while 
(!proceedToAllocateSecondaryIndex.booleanValue()) {
+                            try {
+                                proceedToAllocateSecondaryIndex.wait();
+                            } catch (InterruptedException e) {
+                                Thread.currentThread().interrupt();
+                                return;
+                            }
+                        }
+                    }
+                }
+            });
+            insertReq = new Request(Request.Action.INSERT_PATCH);
+            actors[1].add(insertReq);
+            // Wait for the allocation to happen
+            synchronized (allocated) {
+                while (!allocated.booleanValue()) {
+                    allocated.wait();
+                }
+            }
+            // The memory component has been allocated. now we allow the first 
actor to proceed to schedule flush
+            MutableBoolean flushStarted = new MutableBoolean(false);
+            primaryLsmBtrees[0].addFlushCallback(new 
ITestOpCallback<Semaphore>() {
+                @Override
+                public void before(Semaphore t) {
+                    synchronized (flushStarted) {
+                        flushStarted.setValue(true);
+                        flushStarted.notifyAll();
+                    }
+                }
+
+                @Override
+                public void after() {
+                }
+            });
+            synchronized (proceedToScheduleFlush) {
+                proceedToScheduleFlush.setValue(true);
+                proceedToScheduleFlush.notifyAll();
+            }
+            // Now we need to know that the flush has been scheduled
+            synchronized (flushStarted) {
+                while (!flushStarted.booleanValue()) {
+                    flushStarted.wait();
+                }
+            }
+
+            // we now allow the allocation to proceed
+            synchronized (proceedToAllocateSecondaryIndex) {
+                proceedToAllocateSecondaryIndex.setValue(true);
+                proceedToAllocateSecondaryIndex.notifyAll();
+            }
+            // ensure the insert patch has completed
+            insertReq.await();
+            primaryLsmBtrees[1].clearAllocateCallbacks();
+            // check the Ids of the memory components of partition 1
+            // This shows the bug
+            
Assert.assertEquals(primaryLsmBtrees[1].getCurrentMemoryComponent().getId(),
+                    secondaryLsmBtrees[1].getCurrentMemoryComponent().getId());
+        } catch (Throwable e) {
+            e.printStackTrace();
+            Assert.fail(e.getMessage());
+        }
+    }
+
+    @Test
+    public void testRecycleWhileFlushIsScheduled() {
+        try {
+            Request insertReq = new Request(Request.Action.INSERT_PATCH);
+            actors[0].add(insertReq);
+            Request flushReq = new Request(Request.Action.FLUSH_DATASET);
+            actors[0].add(flushReq);
+            flushReq.await();
+            // ensure that index switched to second component
+            Assert.assertEquals(1, 
primaryLsmBtrees[0].getCurrentMemoryComponentIndex());
+            insertReq = new Request(Request.Action.INSERT_PATCH);
+            actors[0].add(insertReq);
+            flushReq = new Request(Request.Action.FLUSH_DATASET);
+            actors[0].add(flushReq);
+            flushReq.await();
+            // ensure we switched back to first component
+            Assert.assertEquals(0, 
primaryLsmBtrees[0].getCurrentMemoryComponentIndex());
+            // flush first component of partition 1
+            insertReq = new Request(Request.Action.INSERT_PATCH);
+            actors[1].add(insertReq);
+            flushReq = new Request(Request.Action.FLUSH_DATASET);
+            actors[1].add(flushReq);
+            flushReq.await();
+            // ensure partition 1 is now on second component
+            Assert.assertEquals(1, 
primaryLsmBtrees[1].getCurrentMemoryComponentIndex());
+            // now we want to control when schedule flush is executed
+            AtomicBoolean arrivedAtSchduleFlush = new AtomicBoolean(false);
+            AtomicBoolean finishedSchduleFlush = new AtomicBoolean(false);
+            MutableBoolean proceedToScheduleFlush = new MutableBoolean(false);
+            addOpTrackerCallback(primaryLsmBtrees[0], new 
ITestOpCallback<Void>() {
+                @Override
+                public void before(Void t) {
+                    synchronized (arrivedAtSchduleFlush) {
+                        arrivedAtSchduleFlush.set(true);
+                        arrivedAtSchduleFlush.notifyAll();
+                    }
+                    synchronized (proceedToScheduleFlush) {
+                        while (!proceedToScheduleFlush.booleanValue()) {
+                            try {
+                                proceedToScheduleFlush.wait();
+                            } catch (InterruptedException e) {
+                                Thread.currentThread().interrupt();
+                                return;
+                            }
+                        }
+                    }
+                }
+
+                @Override
+                public void after() {
+                    synchronized (finishedSchduleFlush) {
+                        finishedSchduleFlush.set(true);
+                        finishedSchduleFlush.notifyAll();
+                    }
+                }
+            });
+            AtomicBoolean isFull = new AtomicBoolean(false);
+            MutableBoolean proceedAfterIsFullChanged = new 
MutableBoolean(false);
+            primaryLsmBtrees[1].addVirtuablBufferCacheCallback(new 
IVirtualBufferCacheCallback() {
+                @Override
+                public void isFullChanged(boolean newValue) {
+                    synchronized (isFull) {
+                        isFull.set(newValue);
+                        isFull.notifyAll();
+                    }
+                    synchronized (proceedAfterIsFullChanged) {
+                        while (!proceedAfterIsFullChanged.booleanValue()) {
+                            try {
+                                proceedAfterIsFullChanged.wait();
+                            } catch (InterruptedException e) {
+                                Thread.currentThread().interrupt();
+                                return;
+                            }
+                        }
+                    }
+                }
+            });
+
+            // now we start adding records to partition 1 until flush is 
triggerred
+            insertReq = new Request(Request.Action.INSERT_PATCH);
+            actors[1].add(insertReq);
+            while (true) {
+                Thread.sleep(100);
+                if (insertReq.done) {
+                    // if done, then flush was not triggered, then we need to 
insert a new patch
+                    insertReq = new Request(Request.Action.INSERT_PATCH);
+                    actors[1].add(insertReq);
+                } else if (isFull.get()) {
+                    break;
+                }
+            }
+            // Now we know that vbc is full and flush will be scheduled, we 
allow this to proceed
+            synchronized (proceedAfterIsFullChanged) {
+                proceedAfterIsFullChanged.setValue(true);
+                proceedAfterIsFullChanged.notifyAll();
+            }
+
+            // now we want to control the recycling of components in partition 0
+            MutableBoolean recycledPrimary = new MutableBoolean(false);
+            MutableBoolean proceedAfterRecyclePrimary = new 
MutableBoolean(false);
+            ITestOpCallback<ILSMMemoryComponent> primaryRecycleCallback = new 
ITestOpCallback<ILSMMemoryComponent>() {
+                @Override
+                public void before(ILSMMemoryComponent t) {
+                }
+
+                @Override
+                public void after() {
+                    synchronized (recycledPrimary) {
+                        recycledPrimary.setValue(true);
+                        recycledPrimary.notifyAll();
+                    }
+                    synchronized (proceedAfterRecyclePrimary) {
+                        while (!proceedAfterRecyclePrimary.booleanValue()) {
+                            try {
+                                proceedAfterRecyclePrimary.wait();
+                            } catch (InterruptedException e) {
+                                Thread.currentThread().interrupt();
+                                return;
+                            }
+                        }
+                    }
+                }
+            };
+            primaryLsmBtrees[0].addIoRecycleCallback(primaryRecycleCallback);
+
+            MutableBoolean arrivedToRecycleSecondary = new 
MutableBoolean(false);
+            MutableBoolean proceedToRecycleSecondary = new 
MutableBoolean(false);
+            ITestOpCallback<ILSMMemoryComponent> secondaryRecycleCallback = 
new ITestOpCallback<ILSMMemoryComponent>() {
+                @Override
+                public void before(ILSMMemoryComponent t) {
+                    synchronized (arrivedToRecycleSecondary) {
+                        arrivedToRecycleSecondary.setValue(true);
+                        arrivedToRecycleSecondary.notifyAll();
+                    }
+                    synchronized (proceedToRecycleSecondary) {
+                        while (!proceedToRecycleSecondary.booleanValue()) {
+                            try {
+                                proceedToRecycleSecondary.wait();
+                            } catch (InterruptedException e) {
+                                Thread.currentThread().interrupt();
+                                return;
+                            }
+                        }
+                    }
+                }
+
+                @Override
+                public void after() {
+                }
+            };
+            
secondaryLsmBtrees[0].addIoRecycleCallback(secondaryRecycleCallback);
+            // we first ensure that schedule flush arrived
+            synchronized (arrivedAtSchduleFlush) {
+                while (!arrivedAtSchduleFlush.get()) {
+                    arrivedAtSchduleFlush.wait();
+                }
+            }
+
+            // we insert a single frame into partition 0
+            insertReq = new Request(Request.Action.INSERT_PATCH);
+            actors[0].add(insertReq);
+            // wait until component has been recycled
+            synchronized (recycledPrimary) {
+                while (!recycledPrimary.booleanValue()) {
+                    recycledPrimary.wait();
+                }
+            }
+            synchronized (proceedAfterRecyclePrimary) {
+                proceedAfterRecyclePrimary.setValue(true);
+                proceedAfterRecyclePrimary.notifyAll();
+            }
+            // now, we know that the primary has been recycled. we allow it to 
proceed
+            // we allow the scheduleFlush to proceed
+            synchronized (proceedToScheduleFlush) {
+                proceedToScheduleFlush.setValue(true);
+                proceedToScheduleFlush.notifyAll();
+            }
+            // wait until scheduleFlushCompletes
+            synchronized (finishedSchduleFlush) {
+                while (!finishedSchduleFlush.get()) {
+                    finishedSchduleFlush.wait();
+                }
+            }
+            // allow recycling of secondary
+            synchronized (proceedToRecycleSecondary) {
+                proceedToRecycleSecondary.setValue(true);
+                proceedToRecycleSecondary.notifyAll();
+            }
+            // ensure that the insert completes
+            insertReq.await();
+            // ensure the two memory components at partition 0 have different 
component ids
+            
Assert.assertEquals(primaryLsmBtrees[0].getCurrentMemoryComponent().getId(),
+                    secondaryLsmBtrees[0].getCurrentMemoryComponent().getId());
+        } catch (Throwable e) {
+            e.printStackTrace();
+            Assert.fail(e.getMessage());
+        }
+    }
+
+    private static class Request {
+        enum Action {
+            DUMMY,
+            INSERT_OPEN,
+            INSERT_PATCH,
+            FLUSH_DATASET,
+            INSERT_CLOSE
+        }
+
+        private final Action action;
+        private volatile boolean done;
+
+        public Request(Action action) {
+            this.action = action;
+            done = false;
+        }
+
+        synchronized void complete() {
+            done = true;
+            notifyAll();
+        }
+
+        synchronized void await() throws InterruptedException {
+            while (!done) {
+                wait();
+            }
+        }
+
+    }
+
+    public class Actor extends SingleThreadEventProcessor<Request> {
+        private final int partition;
+        private final TupleGenerator tupleGenerator;
+        private final VSizeFrame frame;
+        private final FrameTupleAppender tupleAppender;
+
+        public Actor(String name, int partition) throws HyracksDataException {
+            super(name);
+            this.partition = partition;
+            tupleGenerator = new TupleGenerator(RECORD_TYPE, META_TYPE, 
KEY_INDEXES, KEY_INDICATORS,
+                    RECORD_GEN_FUNCTION, UNIQUE_RECORD_FIELDS, 
META_GEN_FUNCTION, UNIQUE_META_FIELDS);
+            frame = new VSizeFrame(taskCtxs[partition]);
+            tupleAppender = new FrameTupleAppender(frame);
+        }
+
+        @Override
+        protected void handle(Request req) throws Exception {
+            try {
+                switch (req.action) {
+                    case FLUSH_DATASET:
+                        if (tupleAppender.getTupleCount() > 0) {
+                            tupleAppender.write(insertOps[partition], true);
+                        }
+                        dsLifecycleMgr.flushDataset(dataset.getDatasetId(), 
false);
+                        break;
+                    case INSERT_CLOSE:
+                        insertOps[partition].close();
+                        break;
+                    case INSERT_OPEN:
+                        insertOps[partition].open();
+                        break;
+                    case INSERT_PATCH:
+                        for (int j = 0; j < RECORDS_PER_COMPONENT; j++) {
+                            ITupleReference tuple = tupleGenerator.next();
+                            DataflowUtils.addTupleToFrame(tupleAppender, 
tuple, insertOps[partition]);
+                        }
+                        if (tupleAppender.getTupleCount() > 0) {
+                            tupleAppender.write(insertOps[partition], true);
+                        }
+                        
ComponentRollbackTest.waitForOperations(primaryLsmBtrees[partition]);
+                        break;
+                    default:
+                        break;
+                }
+            } finally {
+                req.complete();
+            }
+        }
+    }
+
+    private static void addOpTrackerCallback(TestLsmBtree lsmBtree, 
ITestOpCallback<Void> callback) {
+        if (!lsmBtree.isPrimaryIndex()) {
+            throw new IllegalArgumentException("Can only add callbacks to 
primary opTracker");
+        }
+        TestPrimaryIndexOperationTracker opTracker = 
(TestPrimaryIndexOperationTracker) lsmBtree.getOperationTracker();
+        opTracker.addCallback(callback);
+    }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/c5a0a197/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/SearchCursorComponentSwitchTest.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/SearchCursorComponentSwitchTest.java
 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/SearchCursorComponentSwitchTest.java
index b65ba03..652616a 100644
--- 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/SearchCursorComponentSwitchTest.java
+++ 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/SearchCursorComponentSwitchTest.java
@@ -121,10 +121,11 @@ public class SearchCursorComponentSwitchTest {
     public void createIndex() throws Exception {
         List<List<String>> partitioningKeys = new ArrayList<>();
         partitioningKeys.add(Collections.singletonList("key"));
-        dataset = new TestDataset(DATAVERSE_NAME, DATASET_NAME, 
DATAVERSE_NAME, DATA_TYPE_NAME, NODE_GROUP_NAME,
-                NoMergePolicyFactory.NAME, null, new 
InternalDatasetDetails(null, PartitioningStrategy.HASH,
-                        partitioningKeys, null, null, null, false, null),
-                null, DatasetType.INTERNAL, DATASET_ID, 0);
+        dataset =
+                new TestDataset(DATAVERSE_NAME, DATASET_NAME, DATAVERSE_NAME, 
DATA_TYPE_NAME, NODE_GROUP_NAME,
+                        NoMergePolicyFactory.NAME, null, new 
InternalDatasetDetails(null, PartitioningStrategy.HASH,
+                                partitioningKeys, null, null, null, false, 
null),
+                        null, DatasetType.INTERNAL, DATASET_ID, 0);
         PrimaryIndexInfo primaryIndexInfo = nc.createPrimaryIndex(dataset, 
KEY_TYPES, RECORD_TYPE, META_TYPE, null,
                 storageManager, KEY_INDEXES, KEY_INDICATORS_LIST, 0);
         IndexDataflowHelperFactory iHelperFactory =
@@ -138,7 +139,7 @@ public class SearchCursorComponentSwitchTest {
         txnCtx = 
nc.getTransactionManager().beginTransaction(nc.getTxnJobId(ctx),
                 new 
TransactionOptions(ITransactionManager.AtomicityLevel.ENTITY_LEVEL));
         insertOp = nc.getInsertPipeline(ctx, dataset, KEY_TYPES, RECORD_TYPE, 
META_TYPE, null, KEY_INDEXES,
-                KEY_INDICATORS_LIST, storageManager).getLeft();
+                KEY_INDICATORS_LIST, storageManager, null).getLeft();
     }
 
     @After
@@ -147,7 +148,7 @@ public class SearchCursorComponentSwitchTest {
     }
 
     void unblockSearch(TestLsmBtree lsmBtree) {
-        lsmBtree.addSearchCallback(sem -> sem.release());
+        lsmBtree.addSearchCallback(ComponentRollbackTest.ALLOW_CALLBACK);
         lsmBtree.allowSearch(1);
     }
 
@@ -170,7 +171,7 @@ public class SearchCursorComponentSwitchTest {
                     if (tupleAppender.getTupleCount() > 0) {
                         tupleAppender.write(insertOp, true);
                     }
-                    dsLifecycleMgr.flushDataset(dataset.getDatasetId(), false);
+                    ComponentRollbackTest.flush(dsLifecycleMgr, lsmBtree, 
dataset, false);
                 }
                 ITupleReference tuple = tupleGenerator.next();
                 DataflowUtils.addTupleToFrame(tupleAppender, tuple, insertOp);
@@ -183,7 +184,7 @@ public class SearchCursorComponentSwitchTest {
             firstSearcher = new Searcher(nc, 0, dataset, storageManager, 
lsmBtree, TOTAL_NUM_OF_RECORDS);
             // wait till firstSearcher enter the components
             firstSearcher.waitUntilEntered();
-            dsLifecycleMgr.flushDataset(dataset.getDatasetId(), false);
+            ComponentRollbackTest.flush(dsLifecycleMgr, lsmBtree, dataset, 
false);
             nc.getTransactionManager().commitTransaction(txnCtx.getTxnId());
             // unblock the search
             unblockSearch(lsmBtree);
@@ -216,7 +217,7 @@ public class SearchCursorComponentSwitchTest {
                     if (tupleAppender.getTupleCount() > 0) {
                         tupleAppender.write(insertOp, true);
                     }
-                    dsLifecycleMgr.flushDataset(dataset.getDatasetId(), false);
+                    ComponentRollbackTest.flush(dsLifecycleMgr, lsmBtree, 
dataset, false);
                 }
                 ITupleReference tuple = tupleGenerator.next();
                 DataflowUtils.addTupleToFrame(tupleAppender, tuple, insertOp);
@@ -229,7 +230,7 @@ public class SearchCursorComponentSwitchTest {
             firstSearcher = new Searcher(nc, 0, dataset, storageManager, 
lsmBtree, TOTAL_NUM_OF_RECORDS);
             // wait till firstSearcher enter the components
             firstSearcher.waitUntilEntered();
-            dsLifecycleMgr.flushDataset(dataset.getDatasetId(), false);
+            ComponentRollbackTest.flush(dsLifecycleMgr, lsmBtree, dataset, 
false);
             nc.getTransactionManager().commitTransaction(txnCtx.getTxnId());
             // merge all components
             ILSMIndexAccessor mergeAccessor = 
lsmBtree.createAccessor(NoOpIndexAccessParameters.INSTANCE);

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/c5a0a197/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestLsmBTreeResourceFactoryProvider.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestLsmBTreeResourceFactoryProvider.java
 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestLsmBTreeResourceFactoryProvider.java
index 7268296..8d37878 100644
--- 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestLsmBTreeResourceFactoryProvider.java
+++ 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestLsmBTreeResourceFactoryProvider.java
@@ -33,6 +33,7 @@ import org.apache.asterix.metadata.entities.Index;
 import org.apache.asterix.metadata.utils.IndexUtil;
 import org.apache.asterix.om.types.ARecordType;
 import org.apache.asterix.om.types.IAType;
+import 
org.apache.asterix.transaction.management.opcallbacks.PrimaryIndexOperationTrackerFactory;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.common.utils.Pair;
 import org.apache.hyracks.algebricks.data.IBinaryComparatorFactoryProvider;
@@ -68,6 +69,9 @@ public class TestLsmBTreeResourceFactoryProvider implements 
IResourceFactoryProv
         int[] bloomFilterFields = getBloomFilterFields(dataset, index);
         double bloomFilterFalsePositiveRate = 
mdProvider.getStorageProperties().getBloomFilterFalsePositiveRate();
         ILSMOperationTrackerFactory opTrackerFactory = 
dataset.getIndexOperationTrackerFactory(index);
+        if (opTrackerFactory instanceof PrimaryIndexOperationTrackerFactory) {
+            opTrackerFactory = new 
TestPrimaryIndexOperationTrackerFactory(dataset.getDatasetId());
+        }
         ILSMIOOperationCallbackFactory ioOpCallbackFactory = 
dataset.getIoOperationCallbackFactory(index);
         IStorageManager storageManager = 
storageComponentProvider.getStorageManager();
         IMetadataPageManagerFactory metadataPageManagerFactory =

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/c5a0a197/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestLsmBtreeIoOpCallbackFactory.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestLsmBtreeIoOpCallbackFactory.java
 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestLsmBtreeIoOpCallbackFactory.java
index ddcb5b5..fea6cd8 100644
--- 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestLsmBtreeIoOpCallbackFactory.java
+++ 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestLsmBtreeIoOpCallbackFactory.java
@@ -18,14 +18,20 @@
  */
 package org.apache.asterix.test.dataflow;
 
+import java.util.List;
+
 import org.apache.asterix.common.ioopcallbacks.LSMBTreeIOOperationCallback;
 import 
org.apache.asterix.common.ioopcallbacks.LSMBTreeIOOperationCallbackFactory;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.storage.am.lsm.btree.impl.TestLsmBtree;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentIdGenerator;
 import 
org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentIdGeneratorFactory;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
 import 
org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation.LSMIOOperationType;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMMemoryComponent;
 import org.apache.hyracks.storage.am.lsm.common.impls.EmptyComponent;
 
 public class TestLsmBtreeIoOpCallbackFactory extends 
LSMBTreeIOOperationCallbackFactory {
@@ -92,12 +98,31 @@ public class TestLsmBtreeIoOpCallbackFactory extends 
LSMBTreeIOOperationCallback
     }
 
     public class TestLsmBtreeIoOpCallback extends LSMBTreeIOOperationCallback {
+        private final TestLsmBtree lsmBtree;
+
         public TestLsmBtreeIoOpCallback(ILSMIndex index, 
ILSMComponentIdGenerator idGenerator) {
             super(index, idGenerator);
+            lsmBtree = (TestLsmBtree) index;
+        }
+
+        @Override
+        public void beforeOperation(LSMIOOperationType opType) throws 
HyracksDataException {
+            lsmBtree.beforeIoOperationCalled();
+            super.beforeOperation(opType);
+            lsmBtree.beforeIoOperationReturned();
+        }
+
+        @Override
+        public void afterOperation(LSMIOOperationType opType, 
List<ILSMComponent> oldComponents,
+                ILSMDiskComponent newComponent) throws HyracksDataException {
+            lsmBtree.afterIoOperationCalled();
+            super.afterOperation(opType, oldComponents, newComponent);
+            lsmBtree.afterIoOperationReturned();
         }
 
         @Override
         public void afterFinalize(LSMIOOperationType opType, ILSMDiskComponent 
newComponent) {
+            lsmBtree.afterIoFinalizeCalled();
             super.afterFinalize(opType, newComponent);
             synchronized (TestLsmBtreeIoOpCallbackFactory.this) {
                 if (newComponent != null) {
@@ -119,6 +144,21 @@ public class TestLsmBtreeIoOpCallbackFactory extends 
LSMBTreeIOOperationCallback
                 }
                 TestLsmBtreeIoOpCallbackFactory.this.notifyAll();
             }
+            lsmBtree.afterIoFinalizeReturned();
+        }
+
+        @Override
+        public void recycled(ILSMMemoryComponent component, boolean advance) 
throws HyracksDataException {
+            lsmBtree.recycledCalled(component);
+            super.recycled(component, advance);
+            lsmBtree.recycledReturned(component);
+        }
+
+        @Override
+        public void allocated(ILSMMemoryComponent component) throws 
HyracksDataException {
+            lsmBtree.allocatedCalled(component);
+            super.allocated(component);
+            lsmBtree.allocatedReturned(component);
         }
 
         private void recordFailure(LSMIOOperationType opType) {

Reply via email to